You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by hj...@apache.org on 2020/07/14 05:48:31 UTC

[pulsar] branch master updated: Add async state manupulation methods in java functions (#7468)

This is an automated email from the ASF dual-hosted git repository.

hjf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b197962  Add async state manupulation methods in java functions (#7468)
b197962 is described below

commit b197962d3eb55b8a7c01f61453042236d3d62c7c
Author: HuanliMeng <48...@users.noreply.github.com>
AuthorDate: Tue Jul 14 13:48:14 2020 +0800

    Add async state manupulation methods in java functions (#7468)
---
 site2/docs/functions-develop.md                    |  68 +++++++-
 .../versioned_docs/version-2.4.0/functions-api.md  |   4 +
 .../version-2.4.0/functions-state.md               | 177 +++++++++++++++++++++
 .../version-2.4.1/functions-develop.md             |  68 +++++++-
 .../version-2.4.2/functions-develop.md             |  66 +++++++-
 .../version-2.5.0/functions-develop.md             |  68 +++++++-
 .../version-2.5.1/functions-develop.md             |  68 +++++++-
 .../version-2.5.2/functions-develop.md             |  68 +++++++-
 .../version-2.6.0/functions-develop.md             |  68 +++++++-
 9 files changed, 634 insertions(+), 21 deletions(-)

diff --git a/site2/docs/functions-develop.md b/site2/docs/functions-develop.md
index c2bdd16..0f1a447 100644
--- a/site2/docs/functions-develop.md
+++ b/site2/docs/functions-develop.md
@@ -296,10 +296,14 @@ public interface Context {
     String getFunctionVersion();
     Logger getLogger();
     void incrCounter(String key, long amount);
+    void incrCounterAsync(String key, long amount);
     long getCounter(String key);
+    long getCounterAsync(String key);
     void putState(String key, ByteBuffer value);
+    void putStateAsync(String key, ByteBuffer value);
     void deleteState(String key);
     ByteBuffer getState(String key);
+    ByteBuffer getStateAsync(String key);
     Map<String, Object> getUserConfigMap();
     Optional<Object> getUserConfigValue(String key);
     Object getUserConfigValueOrDefault(String key, Object defaultValue);
@@ -789,7 +793,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv
 
 States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
 
-You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
+You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`,  `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) option [...]
 
 > Note  
 > State storage is not available in Go.
@@ -811,7 +815,22 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
     void incrCounter(String key, long amount);
 ```
 
-Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+     /**
+     * Increment the builtin distributed counter referred by key
+     * but dont wait for the completion of the increment operation
+     *
+     * @param key The name of the key
+     * @param amount The amount to be incremented
+     */
+    CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
+
+The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.
 
 #### getCounter
 
@@ -825,11 +844,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
     long getCounter(String key);
 ```
 
-Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
 
 Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
 general key/value state.
 
+#### getCounterAsync
+
+```java
+     /**
+     * Retrieve the counter value for the key, but don't wait
+     * for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
 #### putState
 
 ```java
@@ -842,6 +876,20 @@ general key/value state.
     void putState(String key, ByteBuffer value);
 ```
 
+#### putStateAsync
+
+```java
+    /**
+     * Update the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @param value state value of the key
+     */
+    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
 #### getState
 
 ```java
@@ -854,6 +902,20 @@ general key/value state.
     ByteBuffer getState(String key);
 ```
 
+#### getStateAsync
+
+```java
+    /**
+     * Retrieve the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
 #### deleteState
 
 ```java
diff --git a/site2/website/versioned_docs/version-2.4.0/functions-api.md b/site2/website/versioned_docs/version-2.4.0/functions-api.md
index 1b68aa7..fa2efae 100644
--- a/site2/website/versioned_docs/version-2.4.0/functions-api.md
+++ b/site2/website/versioned_docs/version-2.4.0/functions-api.md
@@ -267,9 +267,13 @@ public interface Context {
     String getFunctionVersion();
     Logger getLogger();
     void incrCounter(String key, long amount);
+    void incrCounterAsync(String key, long amount);
     long getCounter(String key);
+    long getCounterAsync(String key);
     void putState(String key, ByteBuffer value);
+    void putStateAsync(String key, ByteBuffer value);
     ByteBuffer getState(String key);
+    ByteBuffer getStateAsync(String key);
     Map<String, Object> getUserConfigMap();
     Optional<Object> getUserConfigValue(String key);
     Object getUserConfigValueOrDefault(String key, Object defaultValue);
diff --git a/site2/website/versioned_docs/version-2.4.0/functions-state.md b/site2/website/versioned_docs/version-2.4.0/functions-state.md
new file mode 100644
index 0000000..2302321
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.4.0/functions-state.md
@@ -0,0 +1,177 @@
+---
+id: version-2.4.0-functions-state
+title: Pulsar Functions State Storage (Developer Preview)
+sidebar_label: State Storage
+original_id: functions-state
+---
+
+Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table service](https://docs.google.com/document/d/155xAwWv5IdOitHh1NVMEwCMGgB28M3FyMiQSxEpjE-Y/edit#heading=h.56rbh52koe3f)
+for storing the `State` for functions. For example, A `WordCount` function can store its `counters` state into BookKeeper's table service via Pulsar Functions [State API](#api).
+
+## API
+
+### Java API
+
+Currently Pulsar Functions expose following APIs for mutating and accessing State. These APIs are avaible in the [Context](functions-api.md#context) object when
+you are using [Java SDK](functions-api.md#java-sdk-functions) functions.
+
+#### incrCounter
+
+```java
+    /**
+     * Increment the builtin distributed counter refered by key
+     * @param key The name of the key
+     * @param amount The amount to be incremented
+     */
+    void incrCounter(String key, long amount);
+```
+
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+     /**
+     * Increment the builtin distributed counter referred by key
+     * but dont wait for the completion of the increment operation
+     *
+     * @param key The name of the key
+     * @param amount The amount to be incremented
+     */
+    CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
+
+The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.
+
+#### getCounter
+
+```java
+    /**
+     * Retrieve the counter value for the key.
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    long getCounter(String key);
+```
+
+The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+
+Besides the `counter` API, Pulsar also exposes a general key/value API for functions to store
+general key/value state.
+
+#### getCounterAsync
+
+```java
+     /**
+     * Retrieve the counter value for the key, but don't wait
+     * for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
+#### putState
+
+```java
+    /**
+     * Update the state value for the key.
+     *
+     * @param key name of the key
+     * @param value state value of the key
+     */
+    void putState(String key, ByteBuffer value);
+```
+
+#### putStateAsync
+
+```java
+    /**
+     * Update the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @param value state value of the key
+     */
+    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
+#### getState
+
+```
+    /**
+     * Retrieve the state value for the key.
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    ByteBuffer getState(String key);
+```
+
+#### getStateAsync
+
+```java
+    /**
+     * Retrieve the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
+### Python API
+
+State currently is not supported at [Python SDK](functions-api.md#python-sdk-functions).
+
+## Query State
+
+A Pulsar Function can use the [State API](#api) for storing state into Pulsar's state storage
+and retrieving state back from Pulsar's state storage. Additionally Pulsar also provides
+CLI commands for querying its state.
+
+```shell
+$ bin/pulsar-admin functions querystate \
+    --tenant <tenant> \
+    --namespace <namespace> \
+    --name <function-name> \
+    --state-storage-url <bookkeeper-service-url> \
+    --key <state-key> \
+    [---watch]
+```
+
+If `--watch` is specified, the CLI will watch the value of the provided `state-key`.
+
+## Example
+
+### Java Example
+
+{@inject: github:`WordCountFunction`:/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java} is a very good example
+demonstrating on how Application can easily store `state` in Pulsar Functions.
+
+```java
+public class WordCountFunction implements Function<String, Void> {
+    @Override
+    public Void process(String input, Context context) throws Exception {
+        Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1));
+        return null;
+    }
+}
+```
+
+The logic of this `WordCount` function is pretty simple and straightforward:
+
+1. The function first splits the received `String` into multiple words using regex `\\.`.
+2. For each `word`, the function increments the corresponding `counter` by 1 (via `incrCounter(key, amount)`).
+
+### Python Example
+
+State currently is not supported at [Python SDK](functions-api.md#python-sdk-functions).
+
diff --git a/site2/website/versioned_docs/version-2.4.1/functions-develop.md b/site2/website/versioned_docs/version-2.4.1/functions-develop.md
index 598d06d..a2aff48 100644
--- a/site2/website/versioned_docs/version-2.4.1/functions-develop.md
+++ b/site2/website/versioned_docs/version-2.4.1/functions-develop.md
@@ -287,10 +287,14 @@ public interface Context {
     String getFunctionVersion();
     Logger getLogger();
     void incrCounter(String key, long amount);
+    void incrCounterAsync(String key, long amount);
     long getCounter(String key);
+    long getCounterAsync(String key);
     void putState(String key, ByteBuffer value);
+    void putStateAsync(String key, ByteBuffer value);
     void deleteState(String key);
     ByteBuffer getState(String key);
+    ByteBuffer getStateAsync(String key);
     Map<String, Object> getUserConfigMap();
     Optional<Object> getUserConfigValue(String key);
     Object getUserConfigValueOrDefault(String key, Object defaultValue);
@@ -713,7 +717,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv
 
 States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
 
-You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
+You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`,  `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) option [...]
 
 ### API
 
@@ -732,7 +736,22 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
     void incrCounter(String key, long amount);
 ```
 
-Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+     /**
+     * Increment the builtin distributed counter referred by key
+     * but dont wait for the completion of the increment operation
+     *
+     * @param key The name of the key
+     * @param amount The amount to be incremented
+     */
+    CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
+
+The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.
 
 #### getCounter
 
@@ -746,11 +765,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
     long getCounter(String key);
 ```
 
-Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+The pplication can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
 
 Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
 general key/value state.
 
+#### getCounterAsync
+
+```java
+     /**
+     * Retrieve the counter value for the key, but don't wait
+     * for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
 #### putState
 
 ```java
@@ -763,6 +797,20 @@ general key/value state.
     void putState(String key, ByteBuffer value);
 ```
 
+#### putStateAsync
+
+```java
+    /**
+     * Update the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @param value state value of the key
+     */
+    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
 #### getState
 
 ```java
@@ -775,6 +823,20 @@ general key/value state.
     ByteBuffer getState(String key);
 ```
 
+#### getStateAsync
+
+```java
+    /**
+     * Retrieve the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
 #### deleteState
 
 ```java
diff --git a/site2/website/versioned_docs/version-2.4.2/functions-develop.md b/site2/website/versioned_docs/version-2.4.2/functions-develop.md
index 27b2b58..be1a12a 100644
--- a/site2/website/versioned_docs/version-2.4.2/functions-develop.md
+++ b/site2/website/versioned_docs/version-2.4.2/functions-develop.md
@@ -287,10 +287,14 @@ public interface Context {
     String getFunctionVersion();
     Logger getLogger();
     void incrCounter(String key, long amount);
+    oid incrCounterAsync(String key, long amount);
     long getCounter(String key);
+    long getCounterAsync(String key);
     void putState(String key, ByteBuffer value);
+    void putStateAsync(String key, ByteBuffer value);
     void deleteState(String key);
     ByteBuffer getState(String key);
+    ByteBuffer getStateAsync(String key);
     Map<String, Object> getUserConfigMap();
     Optional<Object> getUserConfigValue(String key);
     Object getUserConfigValueOrDefault(String key, Object defaultValue);
@@ -713,7 +717,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv
 
 States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
 
-You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
+You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`,  `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) option [...]
 
 ### API
 
@@ -732,7 +736,20 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
     void incrCounter(String key, long amount);
 ```
 
-Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+     /**
+     * Increment the builtin distributed counter referred by key
+     * but dont wait for the completion of the increment operation
+     *
+     * @param key The name of the key
+     * @param amount The amount to be incremented
+     */
+    CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
 
 #### getCounter
 
@@ -746,11 +763,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
     long getCounter(String key);
 ```
 
-Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
 
 Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
 general key/value state.
 
+#### getCounterAsync
+
+```java
+     /**
+     * Retrieve the counter value for the key, but don't wait
+     * for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
 #### putState
 
 ```java
@@ -763,6 +795,20 @@ general key/value state.
     void putState(String key, ByteBuffer value);
 ```
 
+#### putStateAsync
+
+```java
+    /**
+     * Update the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @param value state value of the key
+     */
+    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
 #### getState
 
 ```java
@@ -775,6 +821,20 @@ general key/value state.
     ByteBuffer getState(String key);
 ```
 
+#### getStateAsync
+
+```java
+    /**
+     * Retrieve the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
 #### deleteState
 
 ```java
diff --git a/site2/website/versioned_docs/version-2.5.0/functions-develop.md b/site2/website/versioned_docs/version-2.5.0/functions-develop.md
index adb4292..14299dc 100644
--- a/site2/website/versioned_docs/version-2.5.0/functions-develop.md
+++ b/site2/website/versioned_docs/version-2.5.0/functions-develop.md
@@ -297,10 +297,14 @@ public interface Context {
     String getFunctionVersion();
     Logger getLogger();
     void incrCounter(String key, long amount);
+    void incrCounterAsync(String key, long amount);
     long getCounter(String key);
+    long getCounterAsync(String key);
     void putState(String key, ByteBuffer value);
+    void putStateAsync(String key, ByteBuffer value);
     void deleteState(String key);
     ByteBuffer getState(String key);
+    ByteBuffer getStateAsync(String key);
     Map<String, Object> getUserConfigMap();
     Optional<Object> getUserConfigValue(String key);
     Object getUserConfigValueOrDefault(String key, Object defaultValue);
@@ -789,7 +793,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv
 
 States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
 
-You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
+You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`,  `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) option [...]
 
 > Note  
 > State storage is not available in Go.
@@ -811,7 +815,22 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
     void incrCounter(String key, long amount);
 ```
 
-Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+     /**
+     * Increment the builtin distributed counter referred by key
+     * but dont wait for the completion of the increment operation
+     *
+     * @param key The name of the key
+     * @param amount The amount to be incremented
+     */
+    CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
+
+The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.
 
 #### getCounter
 
@@ -825,11 +844,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
     long getCounter(String key);
 ```
 
-Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
 
 Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
 general key/value state.
 
+#### getCounterAsync
+
+```java
+     /**
+     * Retrieve the counter value for the key, but don't wait
+     * for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
 #### putState
 
 ```java
@@ -842,6 +876,20 @@ general key/value state.
     void putState(String key, ByteBuffer value);
 ```
 
+#### putStateAsync
+
+```java
+    /**
+     * Update the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @param value state value of the key
+     */
+    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
 #### getState
 
 ```java
@@ -854,6 +902,20 @@ general key/value state.
     ByteBuffer getState(String key);
 ```
 
+#### getStateAsync
+
+```java
+    /**
+     * Retrieve the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
 #### deleteState
 
 ```java
diff --git a/site2/website/versioned_docs/version-2.5.1/functions-develop.md b/site2/website/versioned_docs/version-2.5.1/functions-develop.md
index 570b5cf..87f6795 100644
--- a/site2/website/versioned_docs/version-2.5.1/functions-develop.md
+++ b/site2/website/versioned_docs/version-2.5.1/functions-develop.md
@@ -297,10 +297,14 @@ public interface Context {
     String getFunctionVersion();
     Logger getLogger();
     void incrCounter(String key, long amount);
+    void incrCounterAsync(String key, long amount);
     long getCounter(String key);
+    long getCounterAsync(String key);
     void putState(String key, ByteBuffer value);
+    void putStateAsync(String key, ByteBuffer value);
     void deleteState(String key);
     ByteBuffer getState(String key);
+    ByteBuffer getStateAsync(String key);
     Map<String, Object> getUserConfigMap();
     Optional<Object> getUserConfigValue(String key);
     Object getUserConfigValueOrDefault(String key, Object defaultValue);
@@ -789,7 +793,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv
 
 States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
 
-You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
+You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`,  `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) option [...]
 
 > Note  
 > State storage is not available in Go.
@@ -811,7 +815,22 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
     void incrCounter(String key, long amount);
 ```
 
-Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+     /**
+     * Increment the builtin distributed counter referred by key
+     * but dont wait for the completion of the increment operation
+     *
+     * @param key The name of the key
+     * @param amount The amount to be incremented
+     */
+    CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
+
+The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.
 
 #### getCounter
 
@@ -825,11 +844,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
     long getCounter(String key);
 ```
 
-Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
 
 Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
 general key/value state.
 
+#### getCounterAsync
+
+```java
+     /**
+     * Retrieve the counter value for the key, but don't wait
+     * for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
 #### putState
 
 ```java
@@ -842,6 +876,20 @@ general key/value state.
     void putState(String key, ByteBuffer value);
 ```
 
+#### putStateAsync
+
+```java
+    /**
+     * Update the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @param value state value of the key
+     */
+    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
 #### getState
 
 ```java
@@ -854,6 +902,20 @@ general key/value state.
     ByteBuffer getState(String key);
 ```
 
+#### getStateAsync
+
+```java
+    /**
+     * Retrieve the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
 #### deleteState
 
 ```java
diff --git a/site2/website/versioned_docs/version-2.5.2/functions-develop.md b/site2/website/versioned_docs/version-2.5.2/functions-develop.md
index 4cf91ef..6b8db60 100644
--- a/site2/website/versioned_docs/version-2.5.2/functions-develop.md
+++ b/site2/website/versioned_docs/version-2.5.2/functions-develop.md
@@ -297,10 +297,14 @@ public interface Context {
     String getFunctionVersion();
     Logger getLogger();
     void incrCounter(String key, long amount);
+    void incrCounterAsync(String key, long amount);
     long getCounter(String key);
+    long getCounterAsync(String key);
     void putState(String key, ByteBuffer value);
+    void putStateAsync(String key, ByteBuffer value);
     void deleteState(String key);
     ByteBuffer getState(String key);
+    ByteBuffer getStateAsync(String key);
     Map<String, Object> getUserConfigMap();
     Optional<Object> getUserConfigValue(String key);
     Object getUserConfigValueOrDefault(String key, Object defaultValue);
@@ -789,7 +793,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv
 
 States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
 
-You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
+You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`,  `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) option [...]
 
 > Note  
 > State storage is not available in Go.
@@ -811,7 +815,22 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
     void incrCounter(String key, long amount);
 ```
 
-Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+     /**
+     * Increment the builtin distributed counter referred by key
+     * but dont wait for the completion of the increment operation
+     *
+     * @param key The name of the key
+     * @param amount The amount to be incremented
+     */
+    CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
+
+The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.
 
 #### getCounter
 
@@ -825,11 +844,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
     long getCounter(String key);
 ```
 
-Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
 
 Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
 general key/value state.
 
+#### getCounterAsync
+
+```java
+     /**
+     * Retrieve the counter value for the key, but don't wait
+     * for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
 #### putState
 
 ```java
@@ -842,6 +876,20 @@ general key/value state.
     void putState(String key, ByteBuffer value);
 ```
 
+#### putStateAsync
+
+```java
+    /**
+     * Update the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @param value state value of the key
+     */
+    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
 #### getState
 
 ```java
@@ -854,6 +902,20 @@ general key/value state.
     ByteBuffer getState(String key);
 ```
 
+#### getStateAsync
+
+```java
+    /**
+     * Retrieve the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
 #### deleteState
 
 ```java
diff --git a/site2/website/versioned_docs/version-2.6.0/functions-develop.md b/site2/website/versioned_docs/version-2.6.0/functions-develop.md
index 2b179ec..56c19c4 100644
--- a/site2/website/versioned_docs/version-2.6.0/functions-develop.md
+++ b/site2/website/versioned_docs/version-2.6.0/functions-develop.md
@@ -297,10 +297,14 @@ public interface Context {
     String getFunctionVersion();
     Logger getLogger();
     void incrCounter(String key, long amount);
+    void incrCounterAsync(String key, long amount);
     long getCounter(String key);
+    long getCounterAsync(String key);
     void putState(String key, ByteBuffer value);
+    void putStateAsync(String key, ByteBuffer value);
     void deleteState(String key);
     ByteBuffer getState(String key);
+    ByteBuffer getStateAsync(String key);
     Map<String, Object> getUserConfigMap();
     Optional<Object> getUserConfigValue(String key);
     Object getUserConfigValueOrDefault(String key, Object defaultValue);
@@ -790,7 +794,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv
 
 States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
 
-You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
+You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`,  `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) option [...]
 
 > Note  
 > State storage is not available in Go.
@@ -812,7 +816,22 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
     void incrCounter(String key, long amount);
 ```
 
-Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+     /**
+     * Increment the builtin distributed counter referred by key
+     * but dont wait for the completion of the increment operation
+     *
+     * @param key The name of the key
+     * @param amount The amount to be incremented
+     */
+    CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
+
+The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.
 
 #### getCounter
 
@@ -826,11 +845,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
     long getCounter(String key);
 ```
 
-Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
 
 Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
 general key/value state.
 
+#### getCounterAsync
+
+```java
+     /**
+     * Retrieve the counter value for the key, but don't wait
+     * for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
 #### putState
 
 ```java
@@ -843,6 +877,20 @@ general key/value state.
     void putState(String key, ByteBuffer value);
 ```
 
+#### putStateAsync
+
+```java
+    /**
+     * Update the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @param value state value of the key
+     */
+    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
 #### getState
 
 ```java
@@ -855,6 +903,20 @@ general key/value state.
     ByteBuffer getState(String key);
 ```
 
+#### getStateAsync
+
+```java
+    /**
+     * Retrieve the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
 #### deleteState
 
 ```java