You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/11/13 21:25:57 UTC
[pulsar] branch master updated: [Functions] Refactor Context and
State API to allow plugging different state store implementations (#8537)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9aaa1cc [Functions] Refactor Context and State API to allow plugging different state store implementations (#8537)
9aaa1cc is described below
commit 9aaa1ccefb3ee1c543e2f0daeb23094cf8773b6a
Author: Sijie Guo <si...@apache.org>
AuthorDate: Fri Nov 13 14:24:18 2020 -0700
[Functions] Refactor Context and State API to allow plugging different state store implementations (#8537)
*Motivation*
Currently, the state API is tied to bookkeeper table implementation. For users who already run a database, it
might be useful to allow users to configure to use a different state store backend.
This change refactors the context and state API to allow plugging different state store implementations
---
.../org/apache/pulsar/client/api/Consumer.java | 3 +-
.../org/apache/pulsar/client/api/Producer.java | 3 +-
.../org/apache/pulsar/functions/api/Context.java | 37 ++++-
.../apache/pulsar/functions/api/StateStore.java | 66 ++++++++
.../pulsar/functions/api/StateStoreContext.java | 30 ++++
.../functions/api/state/ByteBufferStateStore.java | 76 +++++++++
.../functions/api/state/CounterStateStore.java | 63 +++++++
.../pulsar/functions/api/state/package-info.java | 22 +++
.../pulsar/functions/instance/ContextImpl.java | 112 +++++++------
.../functions/instance/JavaInstanceRunnable.java | 179 +++++---------------
...StateContextImpl.java => BKStateStoreImpl.java} | 108 ++++++++++--
.../instance/state/BKStateStoreProviderImpl.java | 182 +++++++++++++++++++++
.../instance/state/DefaultStateStore.java | 28 ++++
.../instance/state/InstanceStateManager.java | 81 +++++++++
.../functions/instance/state/StateContext.java | 78 ---------
.../functions/instance/state/StateManager.java | 47 ++++++
.../instance/state/StateStoreContextImpl.java | 27 +++
.../instance/state/StateStoreProvider.java | 70 ++++++++
.../pulsar/functions/instance/ContextImplTest.java | 25 +--
...textImplTest.java => BKStateStoreImplTest.java} | 33 +++-
.../instance/state/InstanceStateManagerTest.java | 122 ++++++++++++++
.../apache/pulsar/io/core/ConnectorContext.java | 15 ++
22 files changed, 1097 insertions(+), 310 deletions(-)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index f1c4f0e..e21f653 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -22,11 +22,10 @@ import java.io.Closeable;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
-import org.apache.pulsar.client.api.transaction.Transaction;
-
/**
* An interface that abstracts behavior of Pulsar's consumer.
*
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
index 883cf2a..b94a6fe 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
@@ -20,11 +20,10 @@ package org.apache.pulsar.client.api;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
-import org.apache.pulsar.client.api.transaction.Transaction;
-
/**
* Producer is used to publish messages on a topic.
*
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 6fd4dd7..db986a1 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -19,7 +19,10 @@
package org.apache.pulsar.functions.api;
import java.nio.ByteBuffer;
-
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
@@ -28,11 +31,6 @@ import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.slf4j.Logger;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
/**
* Context provides contextual information to the executing function.
* Features like which message id we are handling, whats the topic name of the
@@ -127,6 +125,32 @@ public interface Context {
Logger getLogger();
/**
+ * Get the state store with the provided store name in the function tenant & namespace.
+ *
+ * @param name the state store name
+ * @param <S> the type of interface of the store to return
+ * @return the state store instance.
+ *
+ * @throws ClassCastException if the return type isn't a type
+ * or interface of the actual returned store.
+ */
+ <S extends StateStore> S getStateStore(String name);
+
+ /**
+ * Get the state store with the provided store name.
+ *
+ * @param tenant the state tenant name
+ * @param ns the state namespace name
+ * @param name the state store name
+ * @param <S> the type of interface of the store to return
+ * @return the state store instance.
+ *
+ * @throws ClassCastException if the return type isn't a type
+ * or interface of the actual returned store.
+ */
+ <S extends StateStore> S getStateStore(String tenant, String ns, String name);
+
+ /**
* Increment the builtin distributed counter referred by key.
*
* @param key The name of the key
@@ -134,7 +158,6 @@ public interface Context {
*/
void incrCounter(String key, long amount);
-
/**
* Increment the builtin distributed counter referred by key
* but dont wait for the completion of the increment operation
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/StateStore.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/StateStore.java
new file mode 100644
index 0000000..bbc39b2
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/StateStore.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api;
+
+import org.apache.pulsar.common.classification.InterfaceAudience.Public;
+import org.apache.pulsar.common.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public interface StateStore extends AutoCloseable {
+
+ /**
+ * The tenant of this store.
+ *
+ * @return the state store tenant.
+ */
+ String tenant();
+
+ /**
+ * The namespace of this store.
+ *
+ * @return the state store namespace.
+ */
+ String namespace();
+
+ /**
+ * The name of this store.
+ *
+ * @return the state store name.
+ */
+ String name();
+
+ /**
+ * The fully qualified state store name.
+ *
+ * @return the fully qualified state store name.
+ */
+ String fqsn();
+
+ /**
+ * Initialize the state store.
+ *
+ * @param ctx
+ */
+ void init(StateStoreContext ctx);
+
+ @Override
+ void close();
+
+}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/StateStoreContext.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/StateStoreContext.java
new file mode 100644
index 0000000..6a0379f
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/StateStoreContext.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api;
+
+import org.apache.pulsar.common.classification.InterfaceAudience.Public;
+import org.apache.pulsar.common.classification.InterfaceStability.Evolving;
+
+/**
+ * State Store Context Interface.
+ */
+@Public
+@Evolving
+public interface StateStoreContext {
+}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
new file mode 100644
index 0000000..7de1ef1
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.state;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.functions.api.StateStore;
+
+/**
+ * A key-value state store that stores values in {@link ByteBuffer}.
+ */
+public interface ByteBufferStateStore extends StateStore {
+
+ /**
+ * Update the state value for the key.
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ void put(String key, ByteBuffer value);
+
+ /**
+ * Update the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ CompletableFuture<Void> putAsync(String key, ByteBuffer value);
+
+ /**
+ * Delete the state value for the key.
+ *
+ * @param key name of the key
+ */
+ void delete(String key);
+
+ /**
+ * Delete the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ */
+ CompletableFuture<Void> deleteAsync(String key);
+
+ /**
+ * Retrieve the state value for the key.
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ ByteBuffer get(String key);
+
+ /**
+ * Retrieve the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ CompletableFuture<ByteBuffer> getAsync(String key);
+
+}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/CounterStateStore.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/CounterStateStore.java
new file mode 100644
index 0000000..548c60b0
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/CounterStateStore.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.state;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.functions.api.StateStore;
+
+/**
+ * The state store supports counters.
+ */
+public interface CounterStateStore extends StateStore {
+
+ /**
+ * Increment the builtin distributed counter referred by key.
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ void incrCounter(String key, long amount);
+
+ /**
+ * Increment the builtin distributed counter referred by key
+ * but dont wait for the completion of the increment operation
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ CompletableFuture<Void> incrCounterAsync(String key, long amount);
+
+ /**
+ * Retrieve the counter value for the key.
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ long getCounter(String key);
+
+ /**
+ * Retrieve the counter value for the key, but don't wait
+ * for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ CompletableFuture<Long> getCounterAsync(String key);
+
+}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/package-info.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/package-info.java
new file mode 100644
index 0000000..1059832
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * State Store API.
+ */
+package org.apache.pulsar.functions.api.state;
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 2be3e64..1a7da8f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -21,19 +21,37 @@ package org.apache.pulsar.functions.instance;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Summary;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.bookkeeper.api.kv.Table;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.api.StateStore;
+import org.apache.pulsar.functions.instance.state.DefaultStateStore;
+import org.apache.pulsar.functions.instance.state.StateManager;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
import org.apache.pulsar.functions.instance.stats.SinkStatsManager;
@@ -47,15 +65,8 @@ import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
import static com.google.common.base.Preconditions.checkState;
import static org.apache.pulsar.functions.instance.stats.FunctionStatsManager.USER_METRIC_PREFIX;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
/**
* This class implements the Context interface exposed to the user.
@@ -78,7 +89,10 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
private final Map<String, Object> secretsMap;
@VisibleForTesting
- StateContextImpl stateContext;
+ StateManager stateManager;
+ @VisibleForTesting
+ DefaultStateStore defaultStateStore;
+
private Map<String, Object> userConfigs;
private ComponentStatsManager statsManager;
@@ -100,7 +114,7 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
- Table<ByteBuf, ByteBuf> stateTable) {
+ StateManager stateManager) {
this.config = config;
this.logger = logger;
this.client = client;
@@ -166,10 +180,12 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
.quantile(0.999, 0.01)
.register(collectorRegistry);
this.componentType = componentType;
-
- if (null != stateTable) {
- this.stateContext = new StateContextImpl(stateTable);
- }
+ this.stateManager = stateManager;
+ this.defaultStateStore = (DefaultStateStore) stateManager.getStore(
+ config.getFunctionDetails().getTenant(),
+ config.getFunctionDetails().getNamespace(),
+ config.getFunctionDetails().getName()
+ );
}
public void setCurrentMessageContext(Record<?> record) {
@@ -288,88 +304,84 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
}
}
+ @Override
+ public <S extends StateStore> S getStateStore(String name) {
+ return getStateStore(
+ config.getFunctionDetails().getTenant(),
+ config.getFunctionDetails().getNamespace(),
+ name);
+ }
+
+ @Override
+ public <S extends StateStore> S getStateStore(String tenant, String ns, String name) {
+ return (S) stateManager.getStore(tenant, ns, name);
+ }
+
private void ensureStateEnabled() {
- checkState(null != stateContext, "State is not enabled.");
+ checkState(null != defaultStateStore, "State %s/%s/%s is not enabled.",
+ config.getFunctionDetails().getTenant(),
+ config.getFunctionDetails().getNamespace(),
+ config.getFunctionDetails().getName());
}
@Override
public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
ensureStateEnabled();
- return stateContext.incrCounter(key, amount);
+ return defaultStateStore.incrCounterAsync(key, amount);
}
@Override
public void incrCounter(String key, long amount) {
ensureStateEnabled();
- try {
- result(stateContext.incrCounter(key, amount));
- } catch (Exception e) {
- throw new RuntimeException("Failed to increment key '" + key + "' by amount '" + amount + "'", e);
- }
+ defaultStateStore.incrCounter(key, amount);
}
@Override
public CompletableFuture<Long> getCounterAsync(String key) {
ensureStateEnabled();
- return stateContext.getCounter(key);
+ return defaultStateStore.getCounterAsync(key);
}
@Override
public long getCounter(String key) {
ensureStateEnabled();
- try {
- return result(stateContext.getCounter(key));
- } catch (Exception e) {
- throw new RuntimeException("Failed to retrieve counter from key '" + key + "'");
- }
+ return defaultStateStore.getCounter(key);
}
@Override
public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
ensureStateEnabled();
- return stateContext.put(key, value);
+ return defaultStateStore.putAsync(key, value);
}
@Override
public void putState(String key, ByteBuffer value) {
ensureStateEnabled();
- try {
- result(stateContext.put(key, value));
- } catch (Exception e) {
- throw new RuntimeException("Failed to update the state value for key '" + key + "'");
- }
+ defaultStateStore.put(key, value);
}
@Override
public CompletableFuture<Void> deleteStateAsync(String key) {
ensureStateEnabled();
- return stateContext.delete(key);
+ return defaultStateStore.deleteAsync(key);
}
@Override
public void deleteState(String key) {
ensureStateEnabled();
- try {
- result(stateContext.delete(key));
- } catch (Exception e) {
- throw new RuntimeException("Failed to delete the state value for key '" + key + "'");
- }
+ defaultStateStore.delete(key);
}
@Override
public CompletableFuture<ByteBuffer> getStateAsync(String key) {
ensureStateEnabled();
- return stateContext.get(key);
+ return defaultStateStore.getAsync(key);
}
@Override
public ByteBuffer getState(String key) {
ensureStateEnabled();
- try {
- return result(stateContext.get(key));
- } catch (Exception e) {
- throw new RuntimeException("Failed to retrieve the state value for key '" + key + "'", e);
- }
+ return defaultStateStore.get(key);
}
@Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fd4697c..316d964 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,34 +19,19 @@
package org.apache.pulsar.functions.instance;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
-
-import com.google.common.base.Stopwatch;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
-import org.apache.bookkeeper.api.StorageClient;
-import org.apache.bookkeeper.api.kv.Table;
-import org.apache.bookkeeper.clients.SimpleStorageClientImpl;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
-import org.apache.bookkeeper.clients.admin.SimpleStorageAdminClientImpl;
-import org.apache.bookkeeper.clients.admin.StorageAdminClient;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.clients.exceptions.ClientException;
-import org.apache.bookkeeper.clients.exceptions.InternalServerException;
-import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
-import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
-import org.apache.bookkeeper.clients.utils.ClientResources;
-import org.apache.bookkeeper.common.util.Backoff.Jitter;
-import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
-import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
-import org.apache.bookkeeper.stream.proto.StorageType;
-import org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.LoggerContext;
@@ -62,6 +47,13 @@ import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.StateStore;
+import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl;
+import org.apache.pulsar.functions.instance.state.InstanceStateManager;
+import org.apache.pulsar.functions.instance.state.StateManager;
+import org.apache.pulsar.functions.instance.state.StateStoreContextImpl;
+import org.apache.pulsar.functions.instance.state.StateStoreProvider;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
@@ -82,16 +74,6 @@ import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
/**
* A function container implemented using java thread.
*/
@@ -109,8 +91,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
// provide tables for storing states
private final String stateStorageServiceUrl;
- private StorageClient storageClient;
- private Table<ByteBuf, ByteBuf> stateTable;
+ private StateStoreProvider stateStoreProvider;
+ private StateManager stateManager;
private JavaInstance javaInstance;
@Getter
@@ -221,7 +203,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
// start the state table
- setupStateTable();
+ setupStateStore();
ContextImpl contextImpl = setupContext();
@@ -239,7 +221,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
- collectorRegistry, metricsLabels, this.componentType, this.stats, stateTable);
+ collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager);
}
/**
@@ -334,99 +316,26 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
return fnClassLoader;
}
- private void createStateTableIfNotExist(String tableNs, String tableName, StorageClientSettings settings) throws Exception {
- try (StorageAdminClient storageAdminClient = new SimpleStorageAdminClientImpl(
- settings,
- ClientResources.create().scheduler())) {
- StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
- .setInitialNumRanges(4)
- .setMinNumRanges(4)
- .setStorageType(StorageType.TABLE)
- .build();
- Stopwatch elapsedWatch = Stopwatch.createStarted();
- Exception lastException = null;
- while (true) {
- try {
- result(storageAdminClient.getStream(tableNs, tableName), 30, TimeUnit.SECONDS);
- return;
- } catch (TimeoutException e){
- lastException = e;
- } catch (NamespaceNotFoundException nnfe) {
- try {
- result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder()
- .setDefaultStreamConf(streamConf)
- .build()));
- result(storageAdminClient.createStream(tableNs, tableName, streamConf));
- } catch (Exception e) {
- // there might be two clients conflicting at creating table, so let's retrieve the table again
- // to make sure the table is created.
- lastException = e;
- }
- } catch (StreamNotFoundException snfe) {
- try {
- result(storageAdminClient.createStream(tableNs, tableName, streamConf));
- } catch (Exception e) {
- // there might be two client conflicting at creating table, so let's retrieve it to make
- // sure the table is created.
- lastException = e;
- }
- } catch (ClientException ce) {
- lastException = ce;
- log.warn("Encountered issue {} on fetching state stable metadata, re-attempting in 100 milliseconds",
- ce.getMessage());
- TimeUnit.MILLISECONDS.sleep(100);
- }
- if (elapsedWatch.elapsed(TimeUnit.MINUTES) > 1) {
- if (lastException != null) {
- throw new RuntimeException("Failed to get or create state table within timeout", lastException);
- }
- throw new RuntimeException("Failed to get or create state table within timeout");
- }
- }
- }
- }
+ private void setupStateStore() throws Exception {
+ this.stateManager = new InstanceStateManager();
- private void setupStateTable() throws Exception {
if (null == stateStorageServiceUrl) {
- return;
- }
-
- String tableNs = FunctionCommon.getStateNamespace(
- instanceConfig.getFunctionDetails().getTenant(),
- instanceConfig.getFunctionDetails().getNamespace()
- );
- String tableName = instanceConfig.getFunctionDetails().getName();
-
- StorageClientSettings settings = StorageClientSettings.newBuilder()
- .serviceUri(stateStorageServiceUrl)
- .clientName("function-" + tableNs + "/" + tableName)
- // configure a maximum 2 minutes jitter backoff for accessing table service
- .backoffPolicy(Jitter.of(
- Type.EXPONENTIAL,
- 100,
- 2000,
- 60
- ))
- .build();
-
- // we defer creation of the state table until a java instance is running here.
- createStateTableIfNotExist(tableNs, tableName, settings);
-
- log.info("Starting state table for function {}", instanceConfig.getFunctionDetails().getName());
- this.storageClient = new SimpleStorageClientImpl(tableNs, settings);
-
- // NOTE: this is a workaround until we bump bk version to 4.9.0
- // table might just be created above, so it might not be ready for serving traffic
- Stopwatch openSw = Stopwatch.createStarted();
- while (openSw.elapsed(TimeUnit.MINUTES) < 1) {
- try {
- this.stateTable = result(storageClient.openTable(tableName));
- break;
- } catch (InternalServerException ise) {
- log.warn("Encountered internal server on opening table '{}', re-attempt in 100 milliseconds : {}",
- tableName, ise.getMessage());
- TimeUnit.MILLISECONDS.sleep(100);
- }
+ stateStoreProvider = StateStoreProvider.NULL;
+ } else {
+ stateStoreProvider = new BKStateStoreProviderImpl();
+ Map<String, Object> stateStoreProviderConfig = new HashMap();
+ stateStoreProviderConfig.put(BKStateStoreProviderImpl.STATE_STORAGE_SERVICE_URL, stateStorageServiceUrl);
+ stateStoreProvider.init(stateStoreProviderConfig, instanceConfig.getFunctionDetails());
+
+ StateStore store = stateStoreProvider.getStateStore(
+ instanceConfig.getFunctionDetails().getTenant(),
+ instanceConfig.getFunctionDetails().getNamespace(),
+ instanceConfig.getFunctionDetails().getName()
+ );
+ StateStoreContext context = new StateStoreContextImpl();
+ store.init(context);
+
+ stateManager.registerStore(store);
}
}
@@ -538,18 +447,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
javaInstance = null;
}
- // kill the state table
- if (null != stateTable) {
- stateTable.close();
- stateTable = null;
+ if (null != stateManager) {
+ stateManager.close();
}
- if (null != storageClient) {
- storageClient.closeAsync()
- .exceptionally(cause -> {
- log.warn("Failed to close state storage client", cause);
- return null;
- });
- storageClient = null;
+
+ if (null != stateStoreProvider) {
+ stateStoreProvider.close();
}
if (instanceCache != null) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
similarity index 56%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
index 1c2ca5d..8714d2c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
@@ -19,31 +19,71 @@
package org.apache.pulsar.functions.instance.state;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
-import org.apache.bookkeeper.api.kv.Table;
-import org.apache.bookkeeper.api.kv.options.Options;
-
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.api.kv.options.Options;
+import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.functions.utils.FunctionCommon;
/**
* This class accumulates the state updates from one function.
*
* <p>currently it exposes incr operations. but we can expose other key/values operations if needed.
*/
-public class StateContextImpl implements StateContext {
+public class BKStateStoreImpl implements DefaultStateStore {
+ private final String tenant;
+ private final String namespace;
+ private final String name;
+ private final String fqsn;
private final Table<ByteBuf, ByteBuf> table;
- public StateContextImpl(Table<ByteBuf, ByteBuf> table) {
+ public BKStateStoreImpl(String tenant, String namespace, String name,
+ Table<ByteBuf, ByteBuf> table) {
+ this.tenant = tenant;
+ this.namespace = namespace;
+ this.name = name;
this.table = table;
+ this.fqsn = FunctionCommon.getFullyQualifiedName(tenant, namespace, name);
+ }
+
+ @Override
+ public String tenant() {
+ return tenant;
+ }
+
+ @Override
+ public String namespace() {
+ return namespace;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public String fqsn() {
+ return fqsn;
+ }
+
+ @Override
+ public void init(StateStoreContext ctx) {
+ }
+
+ @Override
+ public void close() {
+ table.close();
}
@Override
- public CompletableFuture<Void> incrCounter(String key, long amount) {
+ public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
// TODO: this can be optimized with a batch operation.
return table.increment(
Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
@@ -51,7 +91,30 @@ public class StateContextImpl implements StateContext {
}
@Override
- public CompletableFuture<Void> put(String key, ByteBuffer value) {
+ public void incrCounter(String key, long amount) {
+ try {
+ result(incrCounterAsync(key, amount));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to increment key '" + key + "' by amount '" + amount + "'", e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Long> getCounterAsync(String key) {
+ return table.getNumber(Unpooled.wrappedBuffer(key.getBytes(UTF_8)));
+ }
+
+ @Override
+ public long getCounter(String key) {
+ try {
+ return result(getCounterAsync(key));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to retrieve counter from key '" + key + "'");
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> putAsync(String key, ByteBuffer value) {
if(value != null) {
// Set position to off the buffer to the beginning.
// If a user used an operation like ByteBuffer.allocate(4).putInt(count) to create a ByteBuffer to store to the state store
@@ -68,7 +131,16 @@ public class StateContextImpl implements StateContext {
}
@Override
- public CompletableFuture<Void> delete(String key) {
+ public void put(String key, ByteBuffer value) {
+ try {
+ result(putAsync(key, value));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to update the state value for key '" + key + "'");
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAsync(String key) {
return table.delete(
Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
Options.delete()
@@ -76,7 +148,16 @@ public class StateContextImpl implements StateContext {
}
@Override
- public CompletableFuture<ByteBuffer> get(String key) {
+ public void delete(String key) {
+ try {
+ result(deleteAsync(key));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to delete the state value for key '" + key + "'");
+ }
+ }
+
+ @Override
+ public CompletableFuture<ByteBuffer> getAsync(String key) {
return table.get(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
data -> {
try {
@@ -98,8 +179,11 @@ public class StateContextImpl implements StateContext {
}
@Override
- public CompletableFuture<Long> getCounter(String key) {
- return table.getNumber(Unpooled.wrappedBuffer(key.getBytes(UTF_8)));
+ public ByteBuffer get(String key) {
+ try {
+ return result(getAsync(key));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to retrieve the state value for key '" + key + "'", e);
+ }
}
-
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
new file mode 100644
index 0000000..88960d6
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
+
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.clients.StorageClientBuilder;
+import org.apache.bookkeeper.clients.admin.SimpleStorageAdminClientImpl;
+import org.apache.bookkeeper.clients.admin.StorageAdminClient;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.exceptions.ClientException;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
+import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
+import org.apache.bookkeeper.clients.utils.ClientResources;
+import org.apache.bookkeeper.common.util.Backoff.Jitter;
+import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
+import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
+import org.apache.bookkeeper.stream.proto.StorageType;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
+import org.apache.pulsar.functions.api.StateStore;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+
+/**
+ * The state store provider that provides bookkeeper table backed state stores.
+ */
+@Slf4j
+public class BKStateStoreProviderImpl implements StateStoreProvider {
+
+ public static final String STATE_STORAGE_SERVICE_URL = "stateStorageServiceUrl";
+
+ private String stateStorageServiceUrl;
+ private Map<String, StorageClient> clients;
+
+ @Override
+ public void init(Map<String, Object> config, FunctionDetails functionDetails) throws Exception {
+ stateStorageServiceUrl = (String) config.get(STATE_STORAGE_SERVICE_URL);
+ clients = new HashMap<>();
+ }
+
+ private StorageClient getStorageClient(String tenant, String namespace) {
+ final String tableNs = FunctionCommon.getStateNamespace(tenant, namespace);
+
+ StorageClient client = clients.get(tableNs);
+ if (null != client) {
+ return client;
+ }
+
+ StorageClientSettings settings = StorageClientSettings.newBuilder()
+ .serviceUri(stateStorageServiceUrl)
+ .clientName("function-" + tableNs)
+ // configure a maximum 2 minutes jitter backoff for accessing table service
+ .backoffPolicy(Jitter.of(
+ Type.EXPONENTIAL,
+ 100,
+ 2000,
+ 60
+ ))
+ .build();
+
+ StorageClient storageClient = StorageClientBuilder.newBuilder()
+ .withSettings(settings)
+ .withNamespace(tableNs)
+ .build();
+
+ clients.put(tableNs, storageClient);
+ return storageClient;
+ }
+
+ private void createStateTable(String stateStorageServiceUrl,
+ String tenant,
+ String namespace,
+ String name) throws Exception {
+ final String tableNs = FunctionCommon.getStateNamespace(tenant, namespace);
+ final String tableName = name;
+
+ try (StorageAdminClient storageAdminClient = new SimpleStorageAdminClientImpl(
+ StorageClientSettings.newBuilder().serviceUri(stateStorageServiceUrl).build(),
+ ClientResources.create().scheduler())){
+ StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+ .setInitialNumRanges(4)
+ .setMinNumRanges(4)
+ .setStorageType(StorageType.TABLE)
+ .build();
+ Stopwatch elapsedWatch = Stopwatch.createStarted();
+ while (elapsedWatch.elapsed(TimeUnit.MINUTES) < 1) {
+ try {
+ result(storageAdminClient.getStream(tableNs, tableName));
+ return;
+ } catch (NamespaceNotFoundException nnfe) {
+ try {
+ result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder()
+ .setDefaultStreamConf(streamConf)
+ .build()));
+ result(storageAdminClient.createStream(tableNs, tableName, streamConf));
+ } catch (Exception e) {
+ // there might be two clients conflicting at creating table, so let's retrieve the table again
+ // to make sure the table is created.
+ }
+ } catch (StreamNotFoundException snfe) {
+ try {
+ result(storageAdminClient.createStream(tableNs, tableName, streamConf));
+ } catch (Exception e) {
+ // there might be two client conflicting at creating table, so let's retrieve it to make
+ // sure the table is created.
+ }
+ } catch (ClientException ce) {
+ log.warn("Encountered issue {} on fetching state stable metadata, re-attempting in 100 milliseconds",
+ ce.getMessage());
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ }
+ }
+ }
+
+ private Table<ByteBuf, ByteBuf> openStateTable(String tenant,
+ String namespace,
+ String name) throws Exception {
+ StorageClient client = getStorageClient(tenant, namespace);
+
+ log.info("Opening state table for function {}/{}/{}", tenant, namespace, name);
+ // NOTE: this is a workaround until we bump bk version to 4.9.0
+ // table might just be created above, so it might not be ready for serving traffic
+ Stopwatch openSw = Stopwatch.createStarted();
+ while (openSw.elapsed(TimeUnit.MINUTES) < 1) {
+ try {
+ return result(client.openTable(name));
+ } catch (InternalServerException ise) {
+ log.warn("Encountered internal server on opening state table '{}/{}/{}', re-attempt in 100 milliseconds : {}",
+ tenant, namespace, name, ise.getMessage());
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ }
+ throw new IOException("Failed to open state table for function " + tenant + "/" + namespace + "/" + name);
+ }
+
+ @Override
+ public <S extends StateStore> S getStateStore(String tenant, String namespace, String name) throws Exception {
+ // we defer creation of the state table until a java instance is running here.
+ createStateTable(stateStorageServiceUrl, tenant, namespace, name);
+ Table<ByteBuf, ByteBuf> table = openStateTable(tenant, namespace, name);
+ return (S) new BKStateStoreImpl(tenant, namespace, name, table);
+ }
+
+ @Override
+ public void close() {
+ clients.forEach((name, client) -> client.closeAsync()
+ .exceptionally(cause -> {
+ log.warn("Failed to close state storage client", cause);
+ return null;
+ })
+ );
+ clients.clear();
+ }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/DefaultStateStore.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/DefaultStateStore.java
new file mode 100644
index 0000000..0f0d8ad
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/DefaultStateStore.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import org.apache.pulsar.functions.api.state.ByteBufferStateStore;
+import org.apache.pulsar.functions.api.state.CounterStateStore;
+
+/**
+ * The default state store interface.
+ */
+public interface DefaultStateStore extends ByteBufferStateStore, CounterStateStore {
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/InstanceStateManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/InstanceStateManager.java
new file mode 100644
index 0000000..a125953
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/InstanceStateManager.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.StateStore;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+
+/**
+ * The state manager for managing state stores for a running function instance.
+ */
+@Slf4j
+public class InstanceStateManager implements StateManager {
+
+ private final Map<String, StateStore> stores = new LinkedHashMap<>();
+
+ @VisibleForTesting
+ boolean isEmpty() {
+ return stores.isEmpty();
+ }
+
+ @Override
+ public void registerStore(StateStore store) {
+ final String storeName = store.fqsn();
+
+ checkArgument(!stores.containsKey(storeName),
+ String.format("Store %s has already been registered.", storeName));
+
+ stores.put(storeName, store);
+ }
+
+ @Override
+ public StateStore getStore(String tenant, String namespace, String name) {
+ String storeName = FunctionCommon.getFullyQualifiedName(tenant, namespace, name);
+ return stores.get(storeName);
+ }
+
+ @Override
+ public void close() {
+ RuntimeException firstException = null;
+ for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
+ final StateStore store = entry.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug("Closing store {}", store.fqsn());
+ }
+ try {
+ store.close();
+ } catch (RuntimeException e) {
+ if (firstException == null) {
+ firstException = e;
+ }
+ log.error("Failed to close state store {}: ", store.fqsn(), e);
+ }
+ }
+ stores.clear();
+ if (null != firstException) {
+ throw firstException;
+ }
+ }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java
deleted file mode 100644
index bfdb118..0000000
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.state;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * A state context per function.
- */
-public interface StateContext {
-
- /**
- * Increment the given <i>key</i> by the given <i>amount</i>.
- *
- * @param key key to increment
- * @param amount the amount incremented
- */
- CompletableFuture<Void> incrCounter(String key, long amount) throws Exception;
-
- /**
- * Update the given <i>key</i> to the provide <i>value</i>.
- *
- * <p>NOTE: the put operation might or might not be applied directly to the global state until
- * the state is flushed via {@link #flush()} at the completion of function execution.
- *
- * <p>The behavior of `PUT` is non-deterministic, if two function instances attempt to update
- * same key around the same time, there is no guarantee which update will be the final result.
- * That says, if you attempt to get amount via {@link #getAmount(String)}, increment the amount
- * based on the function computation logic, and update the computed amount back. one update will
- * overwrite the other update. For this case, you are encouraged to use {@link #incr(String, long)}
- * instead.
- *
- * @param key key to update.
- * @param value value to update; if null the key is deleted
- */
- CompletableFuture<Void> put(String key, ByteBuffer value) throws Exception;
-
- /**
- * Deletes the <i>value</i> at the given <i>key</i>
- *
- * @param key to delete
- */
- CompletableFuture<Void> delete(String key);
-
- /**
- * Get the value of a given <i>key</i>.
- *
- * @param key key to retrieve
- * @return a completable future representing the retrieve result.
- */
- CompletableFuture<ByteBuffer> get(String key) throws Exception;
-
- /**
- * Get the amount of a given <i>key</i>.
- *
- * @param key key to retrieve
- * @return a completable future representing the retrieve result.
- */
- CompletableFuture<Long> getCounter(String key) throws Exception;
-
-}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateManager.java
new file mode 100644
index 0000000..fddf92b
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateManager.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import org.apache.pulsar.functions.api.StateStore;
+
+/**
+ * A state manager that manages multiple state stores.
+ */
+public interface StateManager extends AutoCloseable {
+
+ /**
+ * Register the state store.
+ *
+ * @param store the state store to register.
+ */
+ void registerStore(StateStore store);
+
+ /**
+ * Get the state store with the given name.
+ *
+ * @param tenant the state store tenant.
+ * @param namespace the state store namespace.
+ * @param name the state store name.
+ * @return the state store with the given name.
+ */
+ StateStore getStore(String tenant, String namespace, String name);
+
+ void close();
+
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreContextImpl.java
new file mode 100644
index 0000000..b374a3f
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreContextImpl.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import org.apache.pulsar.functions.api.StateStoreContext;
+
+/**
+ * Default implementation of {@link StateStoreContext}.
+ */
+public class StateStoreContextImpl implements StateStoreContext {
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java
new file mode 100644
index 0000000..db3c6b3
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import java.util.Map;
+import org.apache.pulsar.functions.api.StateStore;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+
+/**
+ * The State Store Provider provides the state stores for a function.
+ */
+public interface StateStoreProvider extends AutoCloseable {
+
+ /**
+ * The state store provider returns `null` state stores.
+ */
+ StateStoreProvider NULL = new StateStoreProvider() {
+ @Override
+ public <S extends StateStore> S getStateStore(String tenant, String namespace, String name) {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ };
+
+ /**
+ * Initialize the state store provider.
+ *
+ * @param config the config to init the state store provider.
+ * @param functionDetails the function details.
+ * @throws Exception when failed to init the state store provider.
+ */
+ default void init(Map<String, Object> config, FunctionDetails functionDetails) throws Exception {}
+
+ /**
+ * Get the state store with the provided store name.
+ *
+ * @param tenant the tenant that owns this state store
+ * @param namespace the namespace that owns this state store
+ * @param name the state store name
+ * @param <S> the type of interface of the store to return
+ * @return the state store instance.
+ *
+ * @throws ClassCastException if the return type isn't a type
+ * or interface of the actual returned store.
+ */
+ <S extends StateStore> S getStateStore(String tenant, String namespace, String name) throws Exception;
+
+ @Override
+ void close();
+}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 26c3249..0305c2f 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -45,7 +45,8 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.instance.state.BKStateStoreImpl;
+import org.apache.pulsar.functions.instance.state.InstanceStateManager;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
import org.slf4j.Logger;
@@ -86,7 +87,7 @@ public class ContextImplTest {
logger,
client,
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
- FunctionDetails.ComponentType.FUNCTION, null, null);
+ FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager());
context.setCurrentMessageContext((Record<String>) () -> null);
}
@@ -117,39 +118,39 @@ public class ContextImplTest {
@Test
public void testIncrCounterStateEnabled() throws Exception {
- context.stateContext = mock(StateContextImpl.class);
+ context.defaultStateStore = mock(BKStateStoreImpl.class);
context.incrCounterAsync("test-key", 10L);
- verify(context.stateContext, times(1)).incrCounter(eq("test-key"), eq(10L));
+ verify(context.defaultStateStore, times(1)).incrCounterAsync(eq("test-key"), eq(10L));
}
@Test
public void testGetCounterStateEnabled() throws Exception {
- context.stateContext = mock(StateContextImpl.class);
+ context.defaultStateStore = mock(BKStateStoreImpl.class);
context.getCounterAsync("test-key");
- verify(context.stateContext, times(1)).getCounter(eq("test-key"));
+ verify(context.defaultStateStore, times(1)).getCounterAsync(eq("test-key"));
}
@Test
public void testPutStateStateEnabled() throws Exception {
- context.stateContext = mock(StateContextImpl.class);
+ context.defaultStateStore = mock(BKStateStoreImpl.class);
ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8));
context.putStateAsync("test-key", buffer);
- verify(context.stateContext, times(1)).put(eq("test-key"), same(buffer));
+ verify(context.defaultStateStore, times(1)).putAsync(eq("test-key"), same(buffer));
}
@Test
public void testDeleteStateStateEnabled() throws Exception {
- context.stateContext = mock(StateContextImpl.class);
+ context.defaultStateStore = mock(BKStateStoreImpl.class);
ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8));
context.deleteStateAsync("test-key");
- verify(context.stateContext, times(1)).delete(eq("test-key"));
+ verify(context.defaultStateStore, times(1)).deleteAsync(eq("test-key"));
}
@Test
public void testGetStateStateEnabled() throws Exception {
- context.stateContext = mock(StateContextImpl.class);
+ context.defaultStateStore = mock(BKStateStoreImpl.class);
context.getStateAsync("test-key");
- verify(context.stateContext, times(1)).get(eq("test-key"));
+ verify(context.defaultStateStore, times(1)).getAsync(eq("test-key"));
}
@Test
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
similarity index 80%
rename from pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
rename to pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
index 88490dd..8b0e385 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
@@ -42,24 +42,39 @@ import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
/**
- * Unit test {@link StateContextImpl}.
+ * Unit test {@link BKStateStoreImpl}.
*/
-public class StateContextImplTest {
+public class BKStateStoreImplTest {
+
+ private final String TENANT = "test-tenant";
+ private final String NS = "test-ns";
+ private final String NAME = "test-name";
+ private final String FQSN = "test-tenant/test-ns/test-name";
private Table<ByteBuf, ByteBuf> mockTable;
- private StateContextImpl stateContext;
+ private BKStateStoreImpl stateContext;
@BeforeMethod
public void setup() {
this.mockTable = mock(Table.class);
- this.stateContext = new StateContextImpl(mockTable);
+ this.stateContext = new BKStateStoreImpl(
+ TENANT, NS, NAME,
+ mockTable);
+ }
+
+ @Test
+ public void testGetter() {
+ assertEquals(stateContext.tenant(), TENANT);
+ assertEquals(stateContext.namespace(), NS);
+ assertEquals(stateContext.name(), NAME);
+ assertEquals(stateContext.fqsn(), FQSN);
}
@Test
public void testIncr() throws Exception {
when(mockTable.increment(any(ByteBuf.class), anyLong()))
.thenReturn(FutureUtils.Void());
- stateContext.incrCounter("test-key", 10L).get();
+ stateContext.incrCounter("test-key", 10L);
verify(mockTable, times(1)).increment(
eq(Unpooled.copiedBuffer("test-key", UTF_8)),
eq(10L)
@@ -70,7 +85,7 @@ public class StateContextImplTest {
public void testPut() throws Exception {
when(mockTable.put(any(ByteBuf.class), any(ByteBuf.class)))
.thenReturn(FutureUtils.Void());
- stateContext.put("test-key", ByteBuffer.wrap("test-value".getBytes(UTF_8))).get();
+ stateContext.put("test-key", ByteBuffer.wrap("test-value".getBytes(UTF_8)));
verify(mockTable, times(1)).put(
eq(Unpooled.copiedBuffer("test-key", UTF_8)),
eq(Unpooled.copiedBuffer("test-value", UTF_8))
@@ -94,7 +109,7 @@ public class StateContextImplTest {
ByteBuf returnedValue = Unpooled.copiedBuffer("test-value", UTF_8);
when(mockTable.get(any(ByteBuf.class)))
.thenReturn(FutureUtils.value(returnedValue));
- ByteBuffer result = stateContext.get("test-key").get();
+ ByteBuffer result = stateContext.get("test-key");
assertEquals("test-value", new String(result.array(), UTF_8));
verify(mockTable, times(1)).get(
eq(Unpooled.copiedBuffer("test-key", UTF_8))
@@ -105,7 +120,7 @@ public class StateContextImplTest {
public void testGetAmount() throws Exception {
when(mockTable.getNumber(any(ByteBuf.class)))
.thenReturn(FutureUtils.value(10L));
- assertEquals((Long)10L, stateContext.getCounter("test-key").get());
+ assertEquals(10L, stateContext.getCounter("test-key"));
verify(mockTable, times(1)).getNumber(
eq(Unpooled.copiedBuffer("test-key", UTF_8))
);
@@ -115,7 +130,7 @@ public class StateContextImplTest {
public void testGetKeyNotPresent() throws Exception {
when(mockTable.get(any(ByteBuf.class)))
.thenReturn(FutureUtils.value(null));
- CompletableFuture<ByteBuffer> result = stateContext.get("test-key");
+ CompletableFuture<ByteBuffer> result = stateContext.getAsync("test-key");
assertTrue(result != null);
assertEquals(result.get(), null);
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/InstanceStateManagerTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/InstanceStateManagerTest.java
new file mode 100644
index 0000000..825f00f
--- /dev/null
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/InstanceStateManagerTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertNull;
+import static org.testng.AssertJUnit.assertSame;
+import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.fail;
+
+import org.apache.pulsar.functions.api.StateStore;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link InstanceStateManager}.
+ */
+public class InstanceStateManagerTest {
+
+ private InstanceStateManager stateManager;
+
+ @BeforeMethod
+ public void setup() {
+ this.stateManager = new InstanceStateManager();
+ }
+
+ @Test
+ public void testGetStoreNull() {
+ final String fqsn = "t/ns/store";
+ StateStore getStore = stateManager.getStore("t", "ns", "store");
+ assertNull(getStore);
+ }
+
+ @Test
+ public void testRegisterStore() {
+ final String fqsn = "t/ns/store";
+ StateStore store = mock(StateStore.class);
+ when(store.fqsn()).thenReturn(fqsn);
+ this.stateManager.registerStore(store);
+ StateStore getStore = stateManager.getStore("t", "ns", "store");
+ assertSame(getStore, store);
+ }
+
+ @Test
+ public void testRegisterStoreTwice() {
+ final String fqsn = "t/ns/store";
+ StateStore store = mock(StateStore.class);
+ when(store.fqsn()).thenReturn(fqsn);
+ this.stateManager.registerStore(store);
+ try {
+ this.stateManager.registerStore(store);
+ fail("Should fail to register a store twice");
+ } catch (IllegalArgumentException iae) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testClose() {
+ final String fqsn1 = "t/ns/store-1";
+ StateStore store1 = mock(StateStore.class);
+ when(store1.fqsn()).thenReturn(fqsn1);
+ final String fqsn2 = "t/ns/store-2";
+ StateStore store2 = mock(StateStore.class);
+ when(store2.fqsn()).thenReturn(fqsn2);
+
+ this.stateManager.registerStore(store1);
+ this.stateManager.registerStore(store2);
+
+ this.stateManager.close();
+
+ verify(store1, times(1)).close();
+ verify(store2, times(1)).close();
+ }
+
+ @Test
+ public void testCloseException() {
+ final String fqsn1 = "t/ns/store-1";
+ StateStore store1 = mock(StateStore.class);
+ when(store1.fqsn()).thenReturn(fqsn1);
+ RuntimeException exception1 = new RuntimeException("exception 1");
+ doThrow(exception1).when(store1).close();
+ final String fqsn2 = "t/ns/store-2";
+ StateStore store2 = mock(StateStore.class);
+ when(store2.fqsn()).thenReturn(fqsn2);
+ RuntimeException exception2 = new RuntimeException("exception 2");
+ doThrow(exception2).when(store2).close();
+
+ this.stateManager.registerStore(store2);
+ this.stateManager.registerStore(store1);
+
+ try {
+ this.stateManager.close();
+ fail("Should fail to close the state manager");
+ } catch (RuntimeException re) {
+ assertSame(re, exception2);
+ }
+
+ assertTrue(this.stateManager.isEmpty());
+ }
+
+}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
index 6be0f14..21f0e9d 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.functions.api.StateStore;
import org.slf4j.Logger;
/**
@@ -82,6 +83,20 @@ public interface ConnectorContext {
String getSecret(String secretName);
/**
+ * Get the state store with the provided store name.
+ *
+ * @param name the state store name
+ * @param <S> the type of interface of the store to return
+ * @return the state store instance.
+ *
+ * @throws ClassCastException if the return type isn't a type
+ * or interface of the actual returned store.
+ */
+ default <S extends StateStore> S getStateStore(String name) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ /**
* Increment the builtin distributed counter referred by key.
*
* @param key The name of the key