You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by hj...@apache.org on 2020/07/14 05:48:31 UTC
[pulsar] branch master updated: Add async state manupulation
methods in java functions (#7468)
This is an automated email from the ASF dual-hosted git repository.
hjf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b197962 Add async state manupulation methods in java functions (#7468)
b197962 is described below
commit b197962d3eb55b8a7c01f61453042236d3d62c7c
Author: HuanliMeng <48...@users.noreply.github.com>
AuthorDate: Tue Jul 14 13:48:14 2020 +0800
Add async state manupulation methods in java functions (#7468)
---
site2/docs/functions-develop.md | 68 +++++++-
.../versioned_docs/version-2.4.0/functions-api.md | 4 +
.../version-2.4.0/functions-state.md | 177 +++++++++++++++++++++
.../version-2.4.1/functions-develop.md | 68 +++++++-
.../version-2.4.2/functions-develop.md | 66 +++++++-
.../version-2.5.0/functions-develop.md | 68 +++++++-
.../version-2.5.1/functions-develop.md | 68 +++++++-
.../version-2.5.2/functions-develop.md | 68 +++++++-
.../version-2.6.0/functions-develop.md | 68 +++++++-
9 files changed, 634 insertions(+), 21 deletions(-)
diff --git a/site2/docs/functions-develop.md b/site2/docs/functions-develop.md
index c2bdd16..0f1a447 100644
--- a/site2/docs/functions-develop.md
+++ b/site2/docs/functions-develop.md
@@ -296,10 +296,14 @@ public interface Context {
String getFunctionVersion();
Logger getLogger();
void incrCounter(String key, long amount);
+ void incrCounterAsync(String key, long amount);
long getCounter(String key);
+ long getCounterAsync(String key);
void putState(String key, ByteBuffer value);
+ void putStateAsync(String key, ByteBuffer value);
void deleteState(String key);
ByteBuffer getState(String key);
+ ByteBuffer getStateAsync(String key);
Map<String, Object> getUserConfigMap();
Optional<Object> getUserConfigValue(String key);
Object getUserConfigValueOrDefault(String key, Object defaultValue);
@@ -789,7 +793,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv
States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
-You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
+You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`, `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) option [...]
> Note
> State storage is not available in Go.
@@ -811,7 +815,22 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
void incrCounter(String key, long amount);
```
-Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+ /**
+ * Increment the builtin distributed counter referred by key
+ * but dont wait for the completion of the increment operation
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
+
+The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.
#### getCounter
@@ -825,11 +844,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
long getCounter(String key);
```
-Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
general key/value state.
+#### getCounterAsync
+
+```java
+ /**
+ * Retrieve the counter value for the key, but don't wait
+ * for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
#### putState
```java
@@ -842,6 +876,20 @@ general key/value state.
void putState(String key, ByteBuffer value);
```
+#### putStateAsync
+
+```java
+ /**
+ * Update the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
#### getState
```java
@@ -854,6 +902,20 @@ general key/value state.
ByteBuffer getState(String key);
```
+#### getStateAsync
+
+```java
+ /**
+ * Retrieve the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
#### deleteState
```java
diff --git a/site2/website/versioned_docs/version-2.4.0/functions-api.md b/site2/website/versioned_docs/version-2.4.0/functions-api.md
index 1b68aa7..fa2efae 100644
--- a/site2/website/versioned_docs/version-2.4.0/functions-api.md
+++ b/site2/website/versioned_docs/version-2.4.0/functions-api.md
@@ -267,9 +267,13 @@ public interface Context {
String getFunctionVersion();
Logger getLogger();
void incrCounter(String key, long amount);
+ void incrCounterAsync(String key, long amount);
long getCounter(String key);
+ long getCounterAsync(String key);
void putState(String key, ByteBuffer value);
+ void putStateAsync(String key, ByteBuffer value);
ByteBuffer getState(String key);
+ ByteBuffer getStateAsync(String key);
Map<String, Object> getUserConfigMap();
Optional<Object> getUserConfigValue(String key);
Object getUserConfigValueOrDefault(String key, Object defaultValue);
diff --git a/site2/website/versioned_docs/version-2.4.0/functions-state.md b/site2/website/versioned_docs/version-2.4.0/functions-state.md
new file mode 100644
index 0000000..2302321
--- /dev/null
+++ b/site2/website/versioned_docs/version-2.4.0/functions-state.md
@@ -0,0 +1,177 @@
+---
+id: version-2.4.0-functions-state
+title: Pulsar Functions State Storage (Developer Preview)
+sidebar_label: State Storage
+original_id: functions-state
+---
+
+Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table service](https://docs.google.com/document/d/155xAwWv5IdOitHh1NVMEwCMGgB28M3FyMiQSxEpjE-Y/edit#heading=h.56rbh52koe3f)
+for storing the `State` for functions. For example, A `WordCount` function can store its `counters` state into BookKeeper's table service via Pulsar Functions [State API](#api).
+
+## API
+
+### Java API
+
+Currently Pulsar Functions expose following APIs for mutating and accessing State. These APIs are avaible in the [Context](functions-api.md#context) object when
+you are using [Java SDK](functions-api.md#java-sdk-functions) functions.
+
+#### incrCounter
+
+```java
+ /**
+ * Increment the builtin distributed counter refered by key
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ void incrCounter(String key, long amount);
+```
+
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+ /**
+ * Increment the builtin distributed counter referred by key
+ * but dont wait for the completion of the increment operation
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
+
+The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.
+
+#### getCounter
+
+```java
+ /**
+ * Retrieve the counter value for the key.
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ long getCounter(String key);
+```
+
+The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+
+Besides the `counter` API, Pulsar also exposes a general key/value API for functions to store
+general key/value state.
+
+#### getCounterAsync
+
+```java
+ /**
+ * Retrieve the counter value for the key, but don't wait
+ * for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
+#### putState
+
+```java
+ /**
+ * Update the state value for the key.
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ void putState(String key, ByteBuffer value);
+```
+
+#### putStateAsync
+
+```java
+ /**
+ * Update the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
+#### getState
+
+```
+ /**
+ * Retrieve the state value for the key.
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ ByteBuffer getState(String key);
+```
+
+#### getStateAsync
+
+```java
+ /**
+ * Retrieve the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
+### Python API
+
+State currently is not supported at [Python SDK](functions-api.md#python-sdk-functions).
+
+## Query State
+
+A Pulsar Function can use the [State API](#api) for storing state into Pulsar's state storage
+and retrieving state back from Pulsar's state storage. Additionally Pulsar also provides
+CLI commands for querying its state.
+
+```shell
+$ bin/pulsar-admin functions querystate \
+ --tenant <tenant> \
+ --namespace <namespace> \
+ --name <function-name> \
+ --state-storage-url <bookkeeper-service-url> \
+ --key <state-key> \
+ [---watch]
+```
+
+If `--watch` is specified, the CLI will watch the value of the provided `state-key`.
+
+## Example
+
+### Java Example
+
+{@inject: github:`WordCountFunction`:/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java} is a very good example
+demonstrating on how Application can easily store `state` in Pulsar Functions.
+
+```java
+public class WordCountFunction implements Function<String, Void> {
+ @Override
+ public Void process(String input, Context context) throws Exception {
+ Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1));
+ return null;
+ }
+}
+```
+
+The logic of this `WordCount` function is pretty simple and straightforward:
+
+1. The function first splits the received `String` into multiple words using regex `\\.`.
+2. For each `word`, the function increments the corresponding `counter` by 1 (via `incrCounter(key, amount)`).
+
+### Python Example
+
+State currently is not supported at [Python SDK](functions-api.md#python-sdk-functions).
+
diff --git a/site2/website/versioned_docs/version-2.4.1/functions-develop.md b/site2/website/versioned_docs/version-2.4.1/functions-develop.md
index 598d06d..a2aff48 100644
--- a/site2/website/versioned_docs/version-2.4.1/functions-develop.md
+++ b/site2/website/versioned_docs/version-2.4.1/functions-develop.md
@@ -287,10 +287,14 @@ public interface Context {
String getFunctionVersion();
Logger getLogger();
void incrCounter(String key, long amount);
+ void incrCounterAsync(String key, long amount);
long getCounter(String key);
+ long getCounterAsync(String key);
void putState(String key, ByteBuffer value);
+ void putStateAsync(String key, ByteBuffer value);
void deleteState(String key);
ByteBuffer getState(String key);
+ ByteBuffer getStateAsync(String key);
Map<String, Object> getUserConfigMap();
Optional<Object> getUserConfigValue(String key);
Object getUserConfigValueOrDefault(String key, Object defaultValue);
@@ -713,7 +717,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv
States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
-You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
+You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`, `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) option [...]
### API
@@ -732,7 +736,22 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
void incrCounter(String key, long amount);
```
-Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+ /**
+ * Increment the builtin distributed counter referred by key
+ * but dont wait for the completion of the increment operation
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
+
+The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.
#### getCounter
@@ -746,11 +765,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
long getCounter(String key);
```
-Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+The pplication can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
general key/value state.
+#### getCounterAsync
+
+```java
+ /**
+ * Retrieve the counter value for the key, but don't wait
+ * for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
#### putState
```java
@@ -763,6 +797,20 @@ general key/value state.
void putState(String key, ByteBuffer value);
```
+#### putStateAsync
+
+```java
+ /**
+ * Update the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
#### getState
```java
@@ -775,6 +823,20 @@ general key/value state.
ByteBuffer getState(String key);
```
+#### getStateAsync
+
+```java
+ /**
+ * Retrieve the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
#### deleteState
```java
diff --git a/site2/website/versioned_docs/version-2.4.2/functions-develop.md b/site2/website/versioned_docs/version-2.4.2/functions-develop.md
index 27b2b58..be1a12a 100644
--- a/site2/website/versioned_docs/version-2.4.2/functions-develop.md
+++ b/site2/website/versioned_docs/version-2.4.2/functions-develop.md
@@ -287,10 +287,14 @@ public interface Context {
String getFunctionVersion();
Logger getLogger();
void incrCounter(String key, long amount);
+ oid incrCounterAsync(String key, long amount);
long getCounter(String key);
+ long getCounterAsync(String key);
void putState(String key, ByteBuffer value);
+ void putStateAsync(String key, ByteBuffer value);
void deleteState(String key);
ByteBuffer getState(String key);
+ ByteBuffer getStateAsync(String key);
Map<String, Object> getUserConfigMap();
Optional<Object> getUserConfigValue(String key);
Object getUserConfigValueOrDefault(String key, Object defaultValue);
@@ -713,7 +717,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv
States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
-You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
+You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`, `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) option [...]
### API
@@ -732,7 +736,20 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
void incrCounter(String key, long amount);
```
-Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+ /**
+ * Increment the builtin distributed counter referred by key
+ * but dont wait for the completion of the increment operation
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
#### getCounter
@@ -746,11 +763,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
long getCounter(String key);
```
-Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
general key/value state.
+#### getCounterAsync
+
+```java
+ /**
+ * Retrieve the counter value for the key, but don't wait
+ * for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
#### putState
```java
@@ -763,6 +795,20 @@ general key/value state.
void putState(String key, ByteBuffer value);
```
+#### putStateAsync
+
+```java
+ /**
+ * Update the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
#### getState
```java
@@ -775,6 +821,20 @@ general key/value state.
ByteBuffer getState(String key);
```
+#### getStateAsync
+
+```java
+ /**
+ * Retrieve the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
#### deleteState
```java
diff --git a/site2/website/versioned_docs/version-2.5.0/functions-develop.md b/site2/website/versioned_docs/version-2.5.0/functions-develop.md
index adb4292..14299dc 100644
--- a/site2/website/versioned_docs/version-2.5.0/functions-develop.md
+++ b/site2/website/versioned_docs/version-2.5.0/functions-develop.md
@@ -297,10 +297,14 @@ public interface Context {
String getFunctionVersion();
Logger getLogger();
void incrCounter(String key, long amount);
+ void incrCounterAsync(String key, long amount);
long getCounter(String key);
+ long getCounterAsync(String key);
void putState(String key, ByteBuffer value);
+ void putStateAsync(String key, ByteBuffer value);
void deleteState(String key);
ByteBuffer getState(String key);
+ ByteBuffer getStateAsync(String key);
Map<String, Object> getUserConfigMap();
Optional<Object> getUserConfigValue(String key);
Object getUserConfigValueOrDefault(String key, Object defaultValue);
@@ -789,7 +793,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv
States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
-You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
+You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`, `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) option [...]
> Note
> State storage is not available in Go.
@@ -811,7 +815,22 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
void incrCounter(String key, long amount);
```
-Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+ /**
+ * Increment the builtin distributed counter referred by key
+ * but dont wait for the completion of the increment operation
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
+
+The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.
#### getCounter
@@ -825,11 +844,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
long getCounter(String key);
```
-Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
general key/value state.
+#### getCounterAsync
+
+```java
+ /**
+ * Retrieve the counter value for the key, but don't wait
+ * for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
#### putState
```java
@@ -842,6 +876,20 @@ general key/value state.
void putState(String key, ByteBuffer value);
```
+#### putStateAsync
+
+```java
+ /**
+ * Update the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
#### getState
```java
@@ -854,6 +902,20 @@ general key/value state.
ByteBuffer getState(String key);
```
+#### getStateAsync
+
+```java
+ /**
+ * Retrieve the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
#### deleteState
```java
diff --git a/site2/website/versioned_docs/version-2.5.1/functions-develop.md b/site2/website/versioned_docs/version-2.5.1/functions-develop.md
index 570b5cf..87f6795 100644
--- a/site2/website/versioned_docs/version-2.5.1/functions-develop.md
+++ b/site2/website/versioned_docs/version-2.5.1/functions-develop.md
@@ -297,10 +297,14 @@ public interface Context {
String getFunctionVersion();
Logger getLogger();
void incrCounter(String key, long amount);
+ void incrCounterAsync(String key, long amount);
long getCounter(String key);
+ long getCounterAsync(String key);
void putState(String key, ByteBuffer value);
+ void putStateAsync(String key, ByteBuffer value);
void deleteState(String key);
ByteBuffer getState(String key);
+ ByteBuffer getStateAsync(String key);
Map<String, Object> getUserConfigMap();
Optional<Object> getUserConfigValue(String key);
Object getUserConfigValueOrDefault(String key, Object defaultValue);
@@ -789,7 +793,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv
States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
-You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
+You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`, `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) option [...]
> Note
> State storage is not available in Go.
@@ -811,7 +815,22 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
void incrCounter(String key, long amount);
```
-Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+ /**
+ * Increment the builtin distributed counter referred by key
+ * but dont wait for the completion of the increment operation
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
+
+The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.
#### getCounter
@@ -825,11 +844,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
long getCounter(String key);
```
-Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
general key/value state.
+#### getCounterAsync
+
+```java
+ /**
+ * Retrieve the counter value for the key, but don't wait
+ * for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
#### putState
```java
@@ -842,6 +876,20 @@ general key/value state.
void putState(String key, ByteBuffer value);
```
+#### putStateAsync
+
+```java
+ /**
+ * Update the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
#### getState
```java
@@ -854,6 +902,20 @@ general key/value state.
ByteBuffer getState(String key);
```
+#### getStateAsync
+
+```java
+ /**
+ * Retrieve the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
#### deleteState
```java
diff --git a/site2/website/versioned_docs/version-2.5.2/functions-develop.md b/site2/website/versioned_docs/version-2.5.2/functions-develop.md
index 4cf91ef..6b8db60 100644
--- a/site2/website/versioned_docs/version-2.5.2/functions-develop.md
+++ b/site2/website/versioned_docs/version-2.5.2/functions-develop.md
@@ -297,10 +297,14 @@ public interface Context {
String getFunctionVersion();
Logger getLogger();
void incrCounter(String key, long amount);
+ void incrCounterAsync(String key, long amount);
long getCounter(String key);
+ long getCounterAsync(String key);
void putState(String key, ByteBuffer value);
+ void putStateAsync(String key, ByteBuffer value);
void deleteState(String key);
ByteBuffer getState(String key);
+ ByteBuffer getStateAsync(String key);
Map<String, Object> getUserConfigMap();
Optional<Object> getUserConfigValue(String key);
Object getUserConfigValueOrDefault(String key, Object defaultValue);
@@ -789,7 +793,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv
States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
-You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
+You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`, `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) option [...]
> Note
> State storage is not available in Go.
@@ -811,7 +815,22 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
void incrCounter(String key, long amount);
```
-Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+ /**
+ * Increment the builtin distributed counter referred by key
+ * but dont wait for the completion of the increment operation
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
+
+The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.
#### getCounter
@@ -825,11 +844,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
long getCounter(String key);
```
-Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
general key/value state.
+#### getCounterAsync
+
+```java
+ /**
+ * Retrieve the counter value for the key, but don't wait
+ * for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
#### putState
```java
@@ -842,6 +876,20 @@ general key/value state.
void putState(String key, ByteBuffer value);
```
+#### putStateAsync
+
+```java
+ /**
+ * Update the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
#### getState
```java
@@ -854,6 +902,20 @@ general key/value state.
ByteBuffer getState(String key);
```
+#### getStateAsync
+
+```java
+ /**
+ * Retrieve the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
#### deleteState
```java
diff --git a/site2/website/versioned_docs/version-2.6.0/functions-develop.md b/site2/website/versioned_docs/version-2.6.0/functions-develop.md
index 2b179ec..56c19c4 100644
--- a/site2/website/versioned_docs/version-2.6.0/functions-develop.md
+++ b/site2/website/versioned_docs/version-2.6.0/functions-develop.md
@@ -297,10 +297,14 @@ public interface Context {
String getFunctionVersion();
Logger getLogger();
void incrCounter(String key, long amount);
+ void incrCounterAsync(String key, long amount);
long getCounter(String key);
+ long getCounterAsync(String key);
void putState(String key, ByteBuffer value);
+ void putStateAsync(String key, ByteBuffer value);
void deleteState(String key);
ByteBuffer getState(String key);
+ ByteBuffer getStateAsync(String key);
Map<String, Object> getUserConfigMap();
Optional<Object> getUserConfigValue(String key);
Object getUserConfigValueOrDefault(String key, Object defaultValue);
@@ -790,7 +794,7 @@ Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper [table serv
States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.
-You can access states within Pulsar Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) options to `pulsar-admin functions`.
+You can access states within Pulsar Java Functions using the `putState`, `putStateAsync`, `getState`, `getStateAsync`, `incrCounter`, `incrCounterAsync`, `getCounter`, `getCounterAsync` and `deleteState` calls on the context object. You can access states within Pulsar Python Functions using the `putState`, `getState`, `incrCounter`, `getCounter` and `deleteState` calls on the context object. You can also manage states using the [querystate](#query-state) and [putstate](#putstate) option [...]
> Note
> State storage is not available in Go.
@@ -812,7 +816,22 @@ Currently Pulsar Functions expose the following APIs for mutating and accessing
void incrCounter(String key, long amount);
```
-Application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+The application can use `incrCounter` to change the counter of a given `key` by the given `amount`.
+
+#### incrCounterAsync
+
+```java
+ /**
+ * Increment the builtin distributed counter referred by key
+ * but dont wait for the completion of the increment operation
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ CompletableFuture<Void> incrCounterAsync(String key, long amount);
+```
+
+The application can use `incrCounterAsync` to asynchronously change the counter of a given `key` by the given `amount`.
#### getCounter
@@ -826,11 +845,26 @@ Application can use `incrCounter` to change the counter of a given `key` by the
long getCounter(String key);
```
-Application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
+The application can use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.
Except the `counter` API, Pulsar also exposes a general key/value API for functions to store
general key/value state.
+#### getCounterAsync
+
+```java
+ /**
+ * Retrieve the counter value for the key, but don't wait
+ * for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ CompletableFuture<Long> getCounterAsync(String key);
+```
+
+The application can use `getCounterAsync` to asynchronously retrieve the counter of a given `key` mutated by `incrCounterAsync`.
+
#### putState
```java
@@ -843,6 +877,20 @@ general key/value state.
void putState(String key, ByteBuffer value);
```
+#### putStateAsync
+
+```java
+ /**
+ * Update the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+```
+
+The application can use `putStateAsync` to asynchronously update the state of a given `key`.
+
#### getState
```java
@@ -855,6 +903,20 @@ general key/value state.
ByteBuffer getState(String key);
```
+#### getStateAsync
+
+```java
+ /**
+ * Retrieve the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ CompletableFuture<ByteBuffer> getStateAsync(String key);
+```
+
+The application can use `getStateAsync` to asynchronously retrieve the state of a given `key`.
+
#### deleteState
```java