You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/11/13 21:25:57 UTC

[pulsar] branch master updated: [Functions] Refactor Context and State API to allow plugging different state store implementations (#8537)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9aaa1cc  [Functions] Refactor Context and State API to allow plugging different state store implementations (#8537)
9aaa1cc is described below

commit 9aaa1ccefb3ee1c543e2f0daeb23094cf8773b6a
Author: Sijie Guo <si...@apache.org>
AuthorDate: Fri Nov 13 14:24:18 2020 -0700

    [Functions] Refactor Context and State API to allow plugging different state store implementations (#8537)
    
    *Motivation*
    
    Currently, the state API is tied to bookkeeper table implementation. For users who already run a database, it
    might be useful to allow users to configure to use a different state store backend.
    
    This change refactors the context and state API to allow plugging different state store implementations
---
 .../org/apache/pulsar/client/api/Consumer.java     |   3 +-
 .../org/apache/pulsar/client/api/Producer.java     |   3 +-
 .../org/apache/pulsar/functions/api/Context.java   |  37 ++++-
 .../apache/pulsar/functions/api/StateStore.java    |  66 ++++++++
 .../pulsar/functions/api/StateStoreContext.java    |  30 ++++
 .../functions/api/state/ByteBufferStateStore.java  |  76 +++++++++
 .../functions/api/state/CounterStateStore.java     |  63 +++++++
 .../pulsar/functions/api/state/package-info.java   |  22 +++
 .../pulsar/functions/instance/ContextImpl.java     | 112 +++++++------
 .../functions/instance/JavaInstanceRunnable.java   | 179 +++++---------------
 ...StateContextImpl.java => BKStateStoreImpl.java} | 108 ++++++++++--
 .../instance/state/BKStateStoreProviderImpl.java   | 182 +++++++++++++++++++++
 .../instance/state/DefaultStateStore.java          |  28 ++++
 .../instance/state/InstanceStateManager.java       |  81 +++++++++
 .../functions/instance/state/StateContext.java     |  78 ---------
 .../functions/instance/state/StateManager.java     |  47 ++++++
 .../instance/state/StateStoreContextImpl.java      |  27 +++
 .../instance/state/StateStoreProvider.java         |  70 ++++++++
 .../pulsar/functions/instance/ContextImplTest.java |  25 +--
 ...textImplTest.java => BKStateStoreImplTest.java} |  33 +++-
 .../instance/state/InstanceStateManagerTest.java   | 122 ++++++++++++++
 .../apache/pulsar/io/core/ConnectorContext.java    |  15 ++
 22 files changed, 1097 insertions(+), 310 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index f1c4f0e..e21f653 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -22,11 +22,10 @@ import java.io.Closeable;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 
-import org.apache.pulsar.client.api.transaction.Transaction;
-
 /**
  * An interface that abstracts behavior of Pulsar's consumer.
  *
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
index 883cf2a..b94a6fe 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
@@ -20,11 +20,10 @@ package org.apache.pulsar.client.api;
 
 import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 
-import org.apache.pulsar.client.api.transaction.Transaction;
-
 /**
  * Producer is used to publish messages on a topic.
  *
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 6fd4dd7..db986a1 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -19,7 +19,10 @@
 package org.apache.pulsar.functions.api;
 
 import java.nio.ByteBuffer;
-
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -28,11 +31,6 @@ import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 import org.slf4j.Logger;
 
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
 /**
  * Context provides contextual information to the executing function.
  * Features like which message id we are handling, whats the topic name of the
@@ -127,6 +125,32 @@ public interface Context {
     Logger getLogger();
 
     /**
+     * Get the state store with the provided store name in the function tenant & namespace.
+     *
+     * @param name the state store name
+     * @param <S> the type of interface of the store to return
+     * @return the state store instance.
+     *
+     * @throws ClassCastException if the return type isn't a type
+     * or interface of the actual returned store.
+     */
+    <S extends StateStore> S getStateStore(String name);
+
+    /**
+     * Get the state store with the provided store name.
+     *
+     * @param tenant the state tenant name
+     * @param ns the state namespace name
+     * @param name the state store name
+     * @param <S> the type of interface of the store to return
+     * @return the state store instance.
+     *
+     * @throws ClassCastException if the return type isn't a type
+     * or interface of the actual returned store.
+     */
+    <S extends StateStore> S getStateStore(String tenant, String ns, String name);
+
+    /**
      * Increment the builtin distributed counter referred by key.
      *
      * @param key    The name of the key
@@ -134,7 +158,6 @@ public interface Context {
      */
     void incrCounter(String key, long amount);
 
-
     /**
      * Increment the builtin distributed counter referred by key
      * but dont wait for the completion of the increment operation
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/StateStore.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/StateStore.java
new file mode 100644
index 0000000..bbc39b2
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/StateStore.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api;
+
+import org.apache.pulsar.common.classification.InterfaceAudience.Public;
+import org.apache.pulsar.common.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public interface StateStore extends AutoCloseable {
+
+    /**
+     * The tenant of this store.
+     *
+     * @return the state store tenant.
+     */
+    String tenant();
+
+    /**
+     * The namespace of this store.
+     *
+     * @return the state store namespace.
+     */
+    String namespace();
+
+    /**
+     * The name of this store.
+     *
+     * @return the state store name.
+     */
+    String name();
+
+    /**
+     * The fully qualified state store name.
+     *
+     * @return the fully qualified state store name.
+     */
+    String fqsn();
+
+    /**
+     * Initialize the state store.
+     *
+     * @param ctx
+     */
+    void init(StateStoreContext ctx);
+
+    @Override
+    void close();
+
+}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/StateStoreContext.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/StateStoreContext.java
new file mode 100644
index 0000000..6a0379f
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/StateStoreContext.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api;
+
+import org.apache.pulsar.common.classification.InterfaceAudience.Public;
+import org.apache.pulsar.common.classification.InterfaceStability.Evolving;
+
+/**
+ * State Store Context Interface.
+ */
+@Public
+@Evolving
+public interface StateStoreContext {
+}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
new file mode 100644
index 0000000..7de1ef1
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.state;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.functions.api.StateStore;
+
+/**
+ * A key-value state store that stores values in {@link ByteBuffer}.
+ */
+public interface ByteBufferStateStore extends StateStore {
+
+    /**
+     * Update the state value for the key.
+     *
+     * @param key   name of the key
+     * @param value state value of the key
+     */
+    void put(String key, ByteBuffer value);
+
+    /**
+     * Update the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key   name of the key
+     * @param value state value of the key
+     */
+    CompletableFuture<Void> putAsync(String key, ByteBuffer value);
+
+    /**
+     * Delete the state value for the key.
+     *
+     * @param key   name of the key
+     */
+    void delete(String key);
+
+    /**
+     * Delete the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key   name of the key
+     */
+    CompletableFuture<Void> deleteAsync(String key);
+
+    /**
+     * Retrieve the state value for the key.
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    ByteBuffer get(String key);
+
+    /**
+     * Retrieve the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    CompletableFuture<ByteBuffer> getAsync(String key);
+
+}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/CounterStateStore.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/CounterStateStore.java
new file mode 100644
index 0000000..548c60b0
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/CounterStateStore.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.state;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.functions.api.StateStore;
+
+/**
+ * The state store supports counters.
+ */
+public interface CounterStateStore extends StateStore {
+
+    /**
+     * Increment the builtin distributed counter referred by key.
+     *
+     * @param key    The name of the key
+     * @param amount The amount to be incremented
+     */
+    void incrCounter(String key, long amount);
+
+    /**
+     * Increment the builtin distributed counter referred by key
+     * but dont wait for the completion of the increment operation
+     *
+     * @param key    The name of the key
+     * @param amount The amount to be incremented
+     */
+    CompletableFuture<Void> incrCounterAsync(String key, long amount);
+
+    /**
+     * Retrieve the counter value for the key.
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    long getCounter(String key);
+
+    /**
+     * Retrieve the counter value for the key, but don't wait
+     * for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    CompletableFuture<Long> getCounterAsync(String key);
+
+}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/package-info.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/package-info.java
new file mode 100644
index 0000000..1059832
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * State Store API.
+ */
+package org.apache.pulsar.functions.api.state;
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 2be3e64..1a7da8f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -21,19 +21,37 @@ package org.apache.pulsar.functions.instance;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-import io.netty.buffer.ByteBuf;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Summary;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.bookkeeper.api.kv.Table;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.api.StateStore;
+import org.apache.pulsar.functions.instance.state.DefaultStateStore;
+import org.apache.pulsar.functions.instance.state.StateManager;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
 import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
 import org.apache.pulsar.functions.instance.stats.SinkStatsManager;
@@ -47,15 +65,8 @@ import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.pulsar.functions.instance.stats.FunctionStatsManager.USER_METRIC_PREFIX;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
 /**
  * This class implements the Context interface exposed to the user.
@@ -78,7 +89,10 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
     private final Map<String, Object> secretsMap;
 
     @VisibleForTesting
-    StateContextImpl stateContext;
+    StateManager stateManager;
+    @VisibleForTesting
+    DefaultStateStore defaultStateStore;
+
     private Map<String, Object> userConfigs;
 
     private ComponentStatsManager statsManager;
@@ -100,7 +114,7 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
                        SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
                        Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
-                       Table<ByteBuf, ByteBuf> stateTable) {
+                       StateManager stateManager) {
         this.config = config;
         this.logger = logger;
         this.client = client;
@@ -166,10 +180,12 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
                 .quantile(0.999, 0.01)
                 .register(collectorRegistry);
         this.componentType = componentType;
-
-        if (null != stateTable) {
-            this.stateContext = new StateContextImpl(stateTable);
-        }
+        this.stateManager = stateManager;
+        this.defaultStateStore = (DefaultStateStore) stateManager.getStore(
+            config.getFunctionDetails().getTenant(),
+            config.getFunctionDetails().getNamespace(),
+            config.getFunctionDetails().getName()
+        );
     }
 
     public void setCurrentMessageContext(Record<?> record) {
@@ -288,88 +304,84 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
         }
     }
 
+    @Override
+    public <S extends StateStore> S getStateStore(String name) {
+        return getStateStore(
+            config.getFunctionDetails().getTenant(),
+            config.getFunctionDetails().getNamespace(),
+            name);
+    }
+
+    @Override
+    public <S extends StateStore> S getStateStore(String tenant, String ns, String name) {
+        return (S) stateManager.getStore(tenant, ns, name);
+    }
+
     private void ensureStateEnabled() {
-        checkState(null != stateContext, "State is not enabled.");
+        checkState(null != defaultStateStore, "State %s/%s/%s is not enabled.",
+            config.getFunctionDetails().getTenant(),
+            config.getFunctionDetails().getNamespace(),
+            config.getFunctionDetails().getName());
     }
 
     @Override
     public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
         ensureStateEnabled();
-        return stateContext.incrCounter(key, amount);
+        return defaultStateStore.incrCounterAsync(key, amount);
     }
 
     @Override
     public void incrCounter(String key, long amount) {
         ensureStateEnabled();
-        try {
-            result(stateContext.incrCounter(key, amount));
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to increment key '" + key + "' by amount '" + amount + "'", e);
-        }
+        defaultStateStore.incrCounter(key, amount);
     }
 
     @Override
     public CompletableFuture<Long> getCounterAsync(String key) {
         ensureStateEnabled();
-        return stateContext.getCounter(key);
+        return defaultStateStore.getCounterAsync(key);
     }
 
     @Override
     public long getCounter(String key) {
         ensureStateEnabled();
-        try {
-            return result(stateContext.getCounter(key));
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to retrieve counter from key '" + key + "'");
-        }
+        return defaultStateStore.getCounter(key);
     }
 
     @Override
     public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
         ensureStateEnabled();
-        return stateContext.put(key, value);
+        return defaultStateStore.putAsync(key, value);
     }
 
     @Override
     public void putState(String key, ByteBuffer value) {
         ensureStateEnabled();
-        try {
-            result(stateContext.put(key, value));
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to update the state value for key '" + key + "'");
-        }
+        defaultStateStore.put(key, value);
     }
 
     @Override
     public CompletableFuture<Void> deleteStateAsync(String key) {
         ensureStateEnabled();
-        return stateContext.delete(key);
+        return defaultStateStore.deleteAsync(key);
     }
 
     @Override
     public void deleteState(String key) {
         ensureStateEnabled();
-        try {
-            result(stateContext.delete(key));
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to delete the state value for key '" + key + "'");
-        }
+        defaultStateStore.delete(key);
     }
 
     @Override
     public CompletableFuture<ByteBuffer> getStateAsync(String key) {
         ensureStateEnabled();
-        return stateContext.get(key);
+        return defaultStateStore.getAsync(key);
     }
 
     @Override
     public ByteBuffer getState(String key) {
         ensureStateEnabled();
-        try {
-            return result(stateContext.get(key));
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to retrieve the state value for key '" + key + "'", e);
-        }
+        return defaultStateStore.get(key);
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fd4697c..316d964 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,34 +19,19 @@
 
 package org.apache.pulsar.functions.instance;
 
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
-
-import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-import io.netty.buffer.ByteBuf;
 import io.prometheus.client.CollectorRegistry;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
-import org.apache.bookkeeper.api.StorageClient;
-import org.apache.bookkeeper.api.kv.Table;
-import org.apache.bookkeeper.clients.SimpleStorageClientImpl;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
-import org.apache.bookkeeper.clients.admin.SimpleStorageAdminClientImpl;
-import org.apache.bookkeeper.clients.admin.StorageAdminClient;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.clients.exceptions.ClientException;
-import org.apache.bookkeeper.clients.exceptions.InternalServerException;
-import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
-import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
-import org.apache.bookkeeper.clients.utils.ClientResources;
-import org.apache.bookkeeper.common.util.Backoff.Jitter;
-import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
-import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
-import org.apache.bookkeeper.stream.proto.StorageType;
-import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.ThreadContext;
 import org.apache.logging.log4j.core.LoggerContext;
@@ -62,6 +47,13 @@ import org.apache.pulsar.common.functions.ProducerConfig;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.StateStore;
+import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl;
+import org.apache.pulsar.functions.instance.state.InstanceStateManager;
+import org.apache.pulsar.functions.instance.state.StateManager;
+import org.apache.pulsar.functions.instance.state.StateStoreContextImpl;
+import org.apache.pulsar.functions.instance.state.StateStoreProvider;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
@@ -82,16 +74,6 @@ import org.apache.pulsar.io.core.Source;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 /**
  * A function container implemented using java thread.
  */
@@ -109,8 +91,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     // provide tables for storing states
     private final String stateStorageServiceUrl;
-    private StorageClient storageClient;
-    private Table<ByteBuf, ByteBuf> stateTable;
+    private StateStoreProvider stateStoreProvider;
+    private StateManager stateManager;
 
     private JavaInstance javaInstance;
     @Getter
@@ -221,7 +203,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         }
 
         // start the state table
-        setupStateTable();
+        setupStateStore();
 
         ContextImpl contextImpl = setupContext();
 
@@ -239,7 +221,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         Logger instanceLog = LoggerFactory.getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
         return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
-                collectorRegistry, metricsLabels, this.componentType, this.stats, stateTable);
+                collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager);
     }
 
     /**
@@ -334,99 +316,26 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         return fnClassLoader;
     }
 
-    private void createStateTableIfNotExist(String tableNs, String tableName, StorageClientSettings settings) throws Exception {
-        try (StorageAdminClient storageAdminClient = new SimpleStorageAdminClientImpl(
-          settings,
-          ClientResources.create().scheduler())) {
-            StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
-              .setInitialNumRanges(4)
-              .setMinNumRanges(4)
-              .setStorageType(StorageType.TABLE)
-              .build();
-            Stopwatch elapsedWatch = Stopwatch.createStarted();
-            Exception lastException = null;
-            while (true) {
-                try {
-                    result(storageAdminClient.getStream(tableNs, tableName), 30, TimeUnit.SECONDS);
-                    return;
-                } catch (TimeoutException e){
-                    lastException = e;
-                } catch (NamespaceNotFoundException nnfe) {
-                    try {
-                        result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder()
-                          .setDefaultStreamConf(streamConf)
-                          .build()));
-                        result(storageAdminClient.createStream(tableNs, tableName, streamConf));
-                    } catch (Exception e) {
-                        // there might be two clients conflicting at creating table, so let's retrieve the table again
-                        // to make sure the table is created.
-                        lastException = e;
-                    }
-                } catch (StreamNotFoundException snfe) {
-                    try {
-                        result(storageAdminClient.createStream(tableNs, tableName, streamConf));
-                    } catch (Exception e) {
-                        // there might be two client conflicting at creating table, so let's retrieve it to make
-                        // sure the table is created.
-                        lastException = e;
-                    }
-                } catch (ClientException ce) {
-                    lastException = ce;
-                    log.warn("Encountered issue {} on fetching state stable metadata, re-attempting in 100 milliseconds",
-                      ce.getMessage());
-                    TimeUnit.MILLISECONDS.sleep(100);
-                }
-                if (elapsedWatch.elapsed(TimeUnit.MINUTES) > 1) {
-                    if (lastException != null) {
-                        throw new RuntimeException("Failed to get or create state table within timeout", lastException);
-                    }
-                    throw new RuntimeException("Failed to get or create state table within timeout");
-                }
-            }
-        }
-    }
+    private void setupStateStore() throws Exception {
+        this.stateManager = new InstanceStateManager();
 
-    private void setupStateTable() throws Exception {
         if (null == stateStorageServiceUrl) {
-            return;
-        }
-
-        String tableNs = FunctionCommon.getStateNamespace(
-            instanceConfig.getFunctionDetails().getTenant(),
-            instanceConfig.getFunctionDetails().getNamespace()
-        );
-        String tableName = instanceConfig.getFunctionDetails().getName();
-
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-                .serviceUri(stateStorageServiceUrl)
-                .clientName("function-" + tableNs + "/" + tableName)
-                // configure a maximum 2 minutes jitter backoff for accessing table service
-                .backoffPolicy(Jitter.of(
-                    Type.EXPONENTIAL,
-                    100,
-                    2000,
-                    60
-                ))
-                .build();
-
-        // we defer creation of the state table until a java instance is running here.
-        createStateTableIfNotExist(tableNs, tableName, settings);
-
-        log.info("Starting state table for function {}", instanceConfig.getFunctionDetails().getName());
-        this.storageClient = new SimpleStorageClientImpl(tableNs, settings);
-
-        // NOTE: this is a workaround until we bump bk version to 4.9.0
-        // table might just be created above, so it might not be ready for serving traffic
-        Stopwatch openSw = Stopwatch.createStarted();
-        while (openSw.elapsed(TimeUnit.MINUTES) < 1) {
-            try {
-                this.stateTable = result(storageClient.openTable(tableName));
-                break;
-            } catch (InternalServerException ise) {
-                log.warn("Encountered internal server on opening table '{}', re-attempt in 100 milliseconds : {}",
-                    tableName, ise.getMessage());
-                TimeUnit.MILLISECONDS.sleep(100);
-            }
+            stateStoreProvider = StateStoreProvider.NULL;
+        } else {
+            stateStoreProvider = new BKStateStoreProviderImpl();
+            Map<String, Object> stateStoreProviderConfig = new HashMap();
+            stateStoreProviderConfig.put(BKStateStoreProviderImpl.STATE_STORAGE_SERVICE_URL, stateStorageServiceUrl);
+            stateStoreProvider.init(stateStoreProviderConfig, instanceConfig.getFunctionDetails());
+
+            StateStore store = stateStoreProvider.getStateStore(
+                instanceConfig.getFunctionDetails().getTenant(),
+                instanceConfig.getFunctionDetails().getNamespace(),
+                instanceConfig.getFunctionDetails().getName()
+            );
+            StateStoreContext context = new StateStoreContextImpl();
+            store.init(context);
+
+            stateManager.registerStore(store);
         }
     }
 
@@ -538,18 +447,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             javaInstance = null;
         }
 
-        // kill the state table
-        if (null != stateTable) {
-            stateTable.close();
-            stateTable = null;
+        if (null != stateManager) {
+            stateManager.close();
         }
-        if (null != storageClient) {
-            storageClient.closeAsync()
-                .exceptionally(cause -> {
-                    log.warn("Failed to close state storage client", cause);
-                    return null;
-                });
-            storageClient = null;
+
+        if (null != stateStoreProvider) {
+            stateStoreProvider.close();
         }
 
         if (instanceCache != null) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
similarity index 56%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
index 1c2ca5d..8714d2c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
@@ -19,31 +19,71 @@
 package org.apache.pulsar.functions.instance.state;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.ReferenceCountUtil;
-import org.apache.bookkeeper.api.kv.Table;
-import org.apache.bookkeeper.api.kv.options.Options;
-
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.api.kv.options.Options;
+import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 
 /**
  * This class accumulates the state updates from one function.
  *
  * <p>currently it exposes incr operations. but we can expose other key/values operations if needed.
  */
-public class StateContextImpl implements StateContext {
+public class BKStateStoreImpl implements DefaultStateStore {
 
+    private final String tenant;
+    private final String namespace;
+    private final String name;
+    private final String fqsn;
     private final Table<ByteBuf, ByteBuf> table;
 
-    public StateContextImpl(Table<ByteBuf, ByteBuf> table) {
+    public BKStateStoreImpl(String tenant, String namespace, String name,
+                            Table<ByteBuf, ByteBuf> table) {
+        this.tenant = tenant;
+        this.namespace = namespace;
+        this.name = name;
         this.table = table;
+        this.fqsn = FunctionCommon.getFullyQualifiedName(tenant, namespace, name);
+    }
+
+    @Override
+    public String tenant() {
+        return tenant;
+    }
+
+    @Override
+    public String namespace() {
+        return namespace;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public String fqsn() {
+        return fqsn;
+    }
+
+    @Override
+    public void init(StateStoreContext ctx) {
+    }
+
+    @Override
+    public void close() {
+        table.close();
     }
 
     @Override
-    public CompletableFuture<Void> incrCounter(String key, long amount) {
+    public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
         // TODO: this can be optimized with a batch operation.
         return table.increment(
             Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
@@ -51,7 +91,30 @@ public class StateContextImpl implements StateContext {
     }
 
     @Override
-    public CompletableFuture<Void> put(String key, ByteBuffer value) {
+    public void incrCounter(String key, long amount) {
+        try {
+            result(incrCounterAsync(key, amount));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to increment key '" + key + "' by amount '" + amount + "'", e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Long> getCounterAsync(String key) {
+        return table.getNumber(Unpooled.wrappedBuffer(key.getBytes(UTF_8)));
+    }
+
+    @Override
+    public long getCounter(String key) {
+        try {
+            return result(getCounterAsync(key));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to retrieve counter from key '" + key + "'");
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> putAsync(String key, ByteBuffer value) {
         if(value != null) {
             // Set position to off the buffer to the beginning.
             // If a user used an operation like ByteBuffer.allocate(4).putInt(count) to create a ByteBuffer to store to the state store
@@ -68,7 +131,16 @@ public class StateContextImpl implements StateContext {
     }
 
     @Override
-    public CompletableFuture<Void> delete(String key) {
+    public void put(String key, ByteBuffer value) {
+        try {
+            result(putAsync(key, value));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to update the state value for key '" + key + "'");
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteAsync(String key) {
         return table.delete(
                 Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
                 Options.delete()
@@ -76,7 +148,16 @@ public class StateContextImpl implements StateContext {
     }
 
     @Override
-    public CompletableFuture<ByteBuffer> get(String key) {
+    public void delete(String key) {
+        try {
+            result(deleteAsync(key));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to delete the state value for key '" + key + "'");
+        }
+    }
+
+    @Override
+    public CompletableFuture<ByteBuffer> getAsync(String key) {
         return table.get(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
                 data -> {
                     try {
@@ -98,8 +179,11 @@ public class StateContextImpl implements StateContext {
     }
 
     @Override
-    public CompletableFuture<Long> getCounter(String key) {
-        return table.getNumber(Unpooled.wrappedBuffer(key.getBytes(UTF_8)));
+    public ByteBuffer get(String key) {
+        try {
+            return result(getAsync(key));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to retrieve the state value for key '" + key + "'", e);
+        }
     }
-
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
new file mode 100644
index 0000000..88960d6
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
+
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.clients.StorageClientBuilder;
+import org.apache.bookkeeper.clients.admin.SimpleStorageAdminClientImpl;
+import org.apache.bookkeeper.clients.admin.StorageAdminClient;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.exceptions.ClientException;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
+import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
+import org.apache.bookkeeper.clients.utils.ClientResources;
+import org.apache.bookkeeper.common.util.Backoff.Jitter;
+import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
+import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
+import org.apache.bookkeeper.stream.proto.StorageType;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
+import org.apache.pulsar.functions.api.StateStore;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+
+/**
+ * The state store provider that provides bookkeeper table backed state stores.
+ */
+@Slf4j
+public class BKStateStoreProviderImpl implements StateStoreProvider {
+
+    public static final String STATE_STORAGE_SERVICE_URL = "stateStorageServiceUrl";
+
+    private String stateStorageServiceUrl;
+    private Map<String, StorageClient> clients;
+
+    @Override
+    public void init(Map<String, Object> config, FunctionDetails functionDetails) throws Exception {
+        stateStorageServiceUrl = (String) config.get(STATE_STORAGE_SERVICE_URL);
+        clients = new HashMap<>();
+    }
+
+    private StorageClient getStorageClient(String tenant, String namespace) {
+        final String tableNs = FunctionCommon.getStateNamespace(tenant, namespace);
+
+        StorageClient client = clients.get(tableNs);
+        if (null != client) {
+            return client;
+        }
+
+        StorageClientSettings settings = StorageClientSettings.newBuilder()
+            .serviceUri(stateStorageServiceUrl)
+            .clientName("function-" + tableNs)
+            // configure a maximum 2 minutes jitter backoff for accessing table service
+            .backoffPolicy(Jitter.of(
+                Type.EXPONENTIAL,
+                100,
+                2000,
+                60
+            ))
+            .build();
+
+        StorageClient storageClient = StorageClientBuilder.newBuilder()
+                .withSettings(settings)
+                .withNamespace(tableNs)
+                .build();
+
+        clients.put(tableNs, storageClient);
+        return storageClient;
+    }
+
+    private void createStateTable(String stateStorageServiceUrl,
+                                  String tenant,
+                                  String namespace,
+                                  String name) throws Exception {
+        final String tableNs = FunctionCommon.getStateNamespace(tenant, namespace);
+        final String tableName = name;
+
+    	try (StorageAdminClient storageAdminClient = new SimpleStorageAdminClientImpl(
+             StorageClientSettings.newBuilder().serviceUri(stateStorageServiceUrl).build(),
+             ClientResources.create().scheduler())){
+            StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                .setInitialNumRanges(4)
+                .setMinNumRanges(4)
+                .setStorageType(StorageType.TABLE)
+                .build();
+            Stopwatch elapsedWatch = Stopwatch.createStarted();
+            while (elapsedWatch.elapsed(TimeUnit.MINUTES) < 1) {
+                try {
+                    result(storageAdminClient.getStream(tableNs, tableName));
+                    return;
+                } catch (NamespaceNotFoundException nnfe) {
+                    try {
+                        result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder()
+                            .setDefaultStreamConf(streamConf)
+                            .build()));
+                        result(storageAdminClient.createStream(tableNs, tableName, streamConf));
+                    } catch (Exception e) {
+                        // there might be two clients conflicting at creating table, so let's retrieve the table again
+                        // to make sure the table is created.
+                    }
+                } catch (StreamNotFoundException snfe) {
+                    try {
+                        result(storageAdminClient.createStream(tableNs, tableName, streamConf));
+                    } catch (Exception e) {
+                        // there might be two client conflicting at creating table, so let's retrieve it to make
+                        // sure the table is created.
+                    }
+                } catch (ClientException ce) {
+                    log.warn("Encountered issue {} on fetching state stable metadata, re-attempting in 100 milliseconds",
+                        ce.getMessage());
+                    TimeUnit.MILLISECONDS.sleep(100);
+                }
+            }
+        }
+    }
+
+    private Table<ByteBuf, ByteBuf> openStateTable(String tenant,
+                                                   String namespace,
+                                                   String name) throws Exception {
+        StorageClient client = getStorageClient(tenant, namespace);
+
+        log.info("Opening state table for function {}/{}/{}", tenant, namespace, name);
+        // NOTE: this is a workaround until we bump bk version to 4.9.0
+        // table might just be created above, so it might not be ready for serving traffic
+        Stopwatch openSw = Stopwatch.createStarted();
+        while (openSw.elapsed(TimeUnit.MINUTES) < 1) {
+            try {
+                return result(client.openTable(name));
+            } catch (InternalServerException ise) {
+                log.warn("Encountered internal server on opening state table '{}/{}/{}', re-attempt in 100 milliseconds : {}",
+                    tenant, namespace, name, ise.getMessage());
+                TimeUnit.MILLISECONDS.sleep(100);
+            }
+        }
+        throw new IOException("Failed to open state table for function " + tenant + "/" + namespace + "/" + name);
+    }
+
+    @Override
+    public <S extends StateStore> S getStateStore(String tenant, String namespace, String name) throws Exception {
+        // we defer creation of the state table until a java instance is running here.
+        createStateTable(stateStorageServiceUrl, tenant, namespace, name);
+        Table<ByteBuf, ByteBuf> table = openStateTable(tenant, namespace, name);
+        return (S) new BKStateStoreImpl(tenant, namespace, name, table);
+    }
+
+    @Override
+    public void close() {
+        clients.forEach((name, client) -> client.closeAsync()
+            .exceptionally(cause -> {
+                log.warn("Failed to close state storage client", cause);
+                return null;
+            })
+        );
+        clients.clear();
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/DefaultStateStore.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/DefaultStateStore.java
new file mode 100644
index 0000000..0f0d8ad
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/DefaultStateStore.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import org.apache.pulsar.functions.api.state.ByteBufferStateStore;
+import org.apache.pulsar.functions.api.state.CounterStateStore;
+
+/**
+ * The default state store interface.
+ */
+public interface DefaultStateStore extends ByteBufferStateStore, CounterStateStore {
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/InstanceStateManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/InstanceStateManager.java
new file mode 100644
index 0000000..a125953
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/InstanceStateManager.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.StateStore;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+
+/**
+ * The state manager for managing state stores for a running function instance.
+ */
+@Slf4j
+public class InstanceStateManager implements StateManager {
+
+    private final Map<String, StateStore> stores = new LinkedHashMap<>();
+
+    @VisibleForTesting
+    boolean isEmpty() {
+        return stores.isEmpty();
+    }
+
+    @Override
+    public void registerStore(StateStore store) {
+        final String storeName = store.fqsn();
+
+        checkArgument(!stores.containsKey(storeName),
+            String.format("Store %s has already been registered.", storeName));
+
+        stores.put(storeName, store);
+    }
+
+    @Override
+    public StateStore getStore(String tenant, String namespace, String name) {
+        String storeName = FunctionCommon.getFullyQualifiedName(tenant, namespace, name);
+        return stores.get(storeName);
+    }
+
+    @Override
+    public void close() {
+        RuntimeException firstException = null;
+        for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
+            final StateStore store = entry.getValue();
+            if (log.isDebugEnabled()) {
+                log.debug("Closing store {}", store.fqsn());
+            }
+            try {
+                store.close();
+            } catch (RuntimeException e) {
+                if (firstException == null) {
+                    firstException = e;
+                }
+                log.error("Failed to close state store {}: ", store.fqsn(), e);
+            }
+        }
+        stores.clear();
+        if (null != firstException) {
+            throw firstException;
+        }
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java
deleted file mode 100644
index bfdb118..0000000
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.state;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * A state context per function.
- */
-public interface StateContext {
-
-    /**
-     * Increment the given <i>key</i> by the given <i>amount</i>.
-     *
-     * @param key key to increment
-     * @param amount the amount incremented
-     */
-    CompletableFuture<Void> incrCounter(String key, long amount) throws Exception;
-
-    /**
-     * Update the given <i>key</i> to the provide <i>value</i>.
-     *
-     * <p>NOTE: the put operation might or might not be applied directly to the global state until
-     * the state is flushed via {@link #flush()} at the completion of function execution.
-     *
-     * <p>The behavior of `PUT` is non-deterministic, if two function instances attempt to update
-     * same key around the same time, there is no guarantee which update will be the final result.
-     * That says, if you attempt to get amount via {@link #getAmount(String)}, increment the amount
-     * based on the function computation logic, and update the computed amount back. one update will
-     * overwrite the other update. For this case, you are encouraged to use {@link #incr(String, long)}
-     * instead.
-     *
-     * @param key key to update.
-     * @param value value to update; if null the key is deleted
-     */
-    CompletableFuture<Void> put(String key, ByteBuffer value) throws Exception;
-
-    /**
-     * Deletes the <i>value</i> at the given <i>key</i>
-     *
-     * @param key to delete
-     */
-    CompletableFuture<Void> delete(String key);
-
-    /**
-     * Get the value of a given <i>key</i>.
-     *
-     * @param key key to retrieve
-     * @return a completable future representing the retrieve result.
-     */
-    CompletableFuture<ByteBuffer> get(String key) throws Exception;
-
-    /**
-     * Get the amount of a given <i>key</i>.
-     *
-     * @param key key to retrieve
-     * @return a completable future representing the retrieve result.
-     */
-    CompletableFuture<Long> getCounter(String key) throws Exception;
-
-}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateManager.java
new file mode 100644
index 0000000..fddf92b
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateManager.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import org.apache.pulsar.functions.api.StateStore;
+
+/**
+ * A state manager that manages multiple state stores.
+ */
+public interface StateManager extends AutoCloseable {
+
+    /**
+     * Register the state store.
+     *
+     * @param store the state store to register.
+     */
+    void registerStore(StateStore store);
+
+    /**
+     * Get the state store with the given name.
+     *
+     * @param tenant the state store tenant.
+     * @param namespace the state store namespace.
+     * @param name the state store name.
+     * @return the state store with the given name.
+     */
+    StateStore getStore(String tenant, String namespace, String name);
+
+    void close();
+
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreContextImpl.java
new file mode 100644
index 0000000..b374a3f
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreContextImpl.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import org.apache.pulsar.functions.api.StateStoreContext;
+
+/**
+ * Default implementation of {@link StateStoreContext}.
+ */
+public class StateStoreContextImpl implements StateStoreContext {
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java
new file mode 100644
index 0000000..db3c6b3
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import java.util.Map;
+import org.apache.pulsar.functions.api.StateStore;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+
+/**
+ * The State Store Provider provides the state stores for a function.
+ */
+public interface StateStoreProvider extends AutoCloseable {
+
+    /**
+     * The state store provider returns `null` state stores.
+     */
+    StateStoreProvider NULL = new StateStoreProvider() {
+        @Override
+        public <S extends StateStore> S getStateStore(String tenant, String namespace, String name) {
+            return null;
+        }
+
+        @Override
+        public void close() {
+        }
+
+    };
+
+    /**
+     * Initialize the state store provider.
+     *
+     * @param config the config to init the state store provider.
+     * @param functionDetails the function details.
+     * @throws Exception when failed to init the state store provider.
+     */
+    default void init(Map<String, Object> config, FunctionDetails functionDetails) throws Exception {}
+
+    /**
+     * Get the state store with the provided store name.
+     *
+     * @param tenant the tenant that owns this state store
+     * @param namespace the namespace that owns this state store
+     * @param name the state store name
+     * @param <S> the type of interface of the store to return
+     * @return the state store instance.
+     *
+     * @throws ClassCastException if the return type isn't a type
+     * or interface of the actual returned store.
+     */
+    <S extends StateStore> S getStateStore(String tenant, String namespace, String name) throws Exception;
+
+    @Override
+    void close();
+}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 26c3249..0305c2f 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -45,7 +45,8 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.instance.state.BKStateStoreImpl;
+import org.apache.pulsar.functions.instance.state.InstanceStateManager;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
 import org.slf4j.Logger;
@@ -86,7 +87,7 @@ public class ContextImplTest {
             logger,
             client,
             new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
-                FunctionDetails.ComponentType.FUNCTION, null, null);
+                FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager());
         context.setCurrentMessageContext((Record<String>) () -> null);
     }
 
@@ -117,39 +118,39 @@ public class ContextImplTest {
 
     @Test
     public void testIncrCounterStateEnabled() throws Exception {
-        context.stateContext = mock(StateContextImpl.class);
+        context.defaultStateStore = mock(BKStateStoreImpl.class);
         context.incrCounterAsync("test-key", 10L);
-        verify(context.stateContext, times(1)).incrCounter(eq("test-key"), eq(10L));
+        verify(context.defaultStateStore, times(1)).incrCounterAsync(eq("test-key"), eq(10L));
     }
 
     @Test
     public void testGetCounterStateEnabled() throws Exception {
-        context.stateContext = mock(StateContextImpl.class);
+        context.defaultStateStore = mock(BKStateStoreImpl.class);
         context.getCounterAsync("test-key");
-        verify(context.stateContext, times(1)).getCounter(eq("test-key"));
+        verify(context.defaultStateStore, times(1)).getCounterAsync(eq("test-key"));
     }
 
     @Test
     public void testPutStateStateEnabled() throws Exception {
-        context.stateContext = mock(StateContextImpl.class);
+        context.defaultStateStore = mock(BKStateStoreImpl.class);
         ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8));
         context.putStateAsync("test-key", buffer);
-        verify(context.stateContext, times(1)).put(eq("test-key"), same(buffer));
+        verify(context.defaultStateStore, times(1)).putAsync(eq("test-key"), same(buffer));
     }
 
     @Test
     public void testDeleteStateStateEnabled() throws Exception {
-        context.stateContext = mock(StateContextImpl.class);
+        context.defaultStateStore = mock(BKStateStoreImpl.class);
         ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8));
         context.deleteStateAsync("test-key");
-        verify(context.stateContext, times(1)).delete(eq("test-key"));
+        verify(context.defaultStateStore, times(1)).deleteAsync(eq("test-key"));
     }
 
     @Test
     public void testGetStateStateEnabled() throws Exception {
-        context.stateContext = mock(StateContextImpl.class);
+        context.defaultStateStore = mock(BKStateStoreImpl.class);
         context.getStateAsync("test-key");
-        verify(context.stateContext, times(1)).get(eq("test-key"));
+        verify(context.defaultStateStore, times(1)).getAsync(eq("test-key"));
     }
 
     @Test
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
similarity index 80%
rename from pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
rename to pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
index 88490dd..8b0e385 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
@@ -42,24 +42,39 @@ import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
 
 /**
- * Unit test {@link StateContextImpl}.
+ * Unit test {@link BKStateStoreImpl}.
  */
-public class StateContextImplTest {
+public class BKStateStoreImplTest {
+
+    private final String TENANT = "test-tenant";
+    private final String NS = "test-ns";
+    private final String NAME = "test-name";
+    private final String FQSN = "test-tenant/test-ns/test-name";
 
     private Table<ByteBuf, ByteBuf> mockTable;
-    private StateContextImpl stateContext;
+    private BKStateStoreImpl stateContext;
 
     @BeforeMethod
     public void setup() {
         this.mockTable = mock(Table.class);
-        this.stateContext = new StateContextImpl(mockTable);
+        this.stateContext = new BKStateStoreImpl(
+            TENANT, NS, NAME,
+            mockTable);
+    }
+
+    @Test
+    public void testGetter() {
+        assertEquals(stateContext.tenant(), TENANT);
+        assertEquals(stateContext.namespace(), NS);
+        assertEquals(stateContext.name(), NAME);
+        assertEquals(stateContext.fqsn(), FQSN);
     }
 
     @Test
     public void testIncr() throws Exception {
         when(mockTable.increment(any(ByteBuf.class), anyLong()))
             .thenReturn(FutureUtils.Void());
-        stateContext.incrCounter("test-key", 10L).get();
+        stateContext.incrCounter("test-key", 10L);
         verify(mockTable, times(1)).increment(
             eq(Unpooled.copiedBuffer("test-key", UTF_8)),
             eq(10L)
@@ -70,7 +85,7 @@ public class StateContextImplTest {
     public void testPut() throws Exception {
         when(mockTable.put(any(ByteBuf.class), any(ByteBuf.class)))
             .thenReturn(FutureUtils.Void());
-        stateContext.put("test-key", ByteBuffer.wrap("test-value".getBytes(UTF_8))).get();
+        stateContext.put("test-key", ByteBuffer.wrap("test-value".getBytes(UTF_8)));
         verify(mockTable, times(1)).put(
             eq(Unpooled.copiedBuffer("test-key", UTF_8)),
             eq(Unpooled.copiedBuffer("test-value", UTF_8))
@@ -94,7 +109,7 @@ public class StateContextImplTest {
         ByteBuf returnedValue = Unpooled.copiedBuffer("test-value", UTF_8);
         when(mockTable.get(any(ByteBuf.class)))
             .thenReturn(FutureUtils.value(returnedValue));
-        ByteBuffer result = stateContext.get("test-key").get();
+        ByteBuffer result = stateContext.get("test-key");
         assertEquals("test-value", new String(result.array(), UTF_8));
         verify(mockTable, times(1)).get(
             eq(Unpooled.copiedBuffer("test-key", UTF_8))
@@ -105,7 +120,7 @@ public class StateContextImplTest {
     public void testGetAmount() throws Exception {
         when(mockTable.getNumber(any(ByteBuf.class)))
             .thenReturn(FutureUtils.value(10L));
-        assertEquals((Long)10L, stateContext.getCounter("test-key").get());
+        assertEquals(10L, stateContext.getCounter("test-key"));
         verify(mockTable, times(1)).getNumber(
             eq(Unpooled.copiedBuffer("test-key", UTF_8))
         );
@@ -115,7 +130,7 @@ public class StateContextImplTest {
     public void testGetKeyNotPresent() throws Exception {
         when(mockTable.get(any(ByteBuf.class)))
                 .thenReturn(FutureUtils.value(null));
-        CompletableFuture<ByteBuffer> result = stateContext.get("test-key");
+        CompletableFuture<ByteBuffer> result = stateContext.getAsync("test-key");
         assertTrue(result != null);
         assertEquals(result.get(), null);
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/InstanceStateManagerTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/InstanceStateManagerTest.java
new file mode 100644
index 0000000..825f00f
--- /dev/null
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/InstanceStateManagerTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertNull;
+import static org.testng.AssertJUnit.assertSame;
+import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.fail;
+
+import org.apache.pulsar.functions.api.StateStore;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link InstanceStateManager}.
+ */
+public class InstanceStateManagerTest {
+
+    private InstanceStateManager stateManager;
+
+    @BeforeMethod
+    public void setup() {
+        this.stateManager = new InstanceStateManager();
+    }
+
+    @Test
+    public void testGetStoreNull() {
+        final String fqsn = "t/ns/store";
+        StateStore getStore = stateManager.getStore("t", "ns", "store");
+        assertNull(getStore);
+    }
+
+    @Test
+    public void testRegisterStore() {
+        final String fqsn = "t/ns/store";
+        StateStore store = mock(StateStore.class);
+        when(store.fqsn()).thenReturn(fqsn);
+        this.stateManager.registerStore(store);
+        StateStore getStore = stateManager.getStore("t", "ns", "store");
+        assertSame(getStore, store);
+    }
+
+    @Test
+    public void testRegisterStoreTwice() {
+        final String fqsn = "t/ns/store";
+        StateStore store = mock(StateStore.class);
+        when(store.fqsn()).thenReturn(fqsn);
+        this.stateManager.registerStore(store);
+        try {
+            this.stateManager.registerStore(store);
+            fail("Should fail to register a store twice");
+        } catch (IllegalArgumentException iae) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testClose() {
+        final String fqsn1 = "t/ns/store-1";
+        StateStore store1 = mock(StateStore.class);
+        when(store1.fqsn()).thenReturn(fqsn1);
+        final String fqsn2 = "t/ns/store-2";
+        StateStore store2 = mock(StateStore.class);
+        when(store2.fqsn()).thenReturn(fqsn2);
+
+        this.stateManager.registerStore(store1);
+        this.stateManager.registerStore(store2);
+
+        this.stateManager.close();
+
+        verify(store1, times(1)).close();
+        verify(store2, times(1)).close();
+    }
+
+    @Test
+    public void testCloseException() {
+        final String fqsn1 = "t/ns/store-1";
+        StateStore store1 = mock(StateStore.class);
+        when(store1.fqsn()).thenReturn(fqsn1);
+        RuntimeException exception1 = new RuntimeException("exception 1");
+        doThrow(exception1).when(store1).close();
+        final String fqsn2 = "t/ns/store-2";
+        StateStore store2 = mock(StateStore.class);
+        when(store2.fqsn()).thenReturn(fqsn2);
+        RuntimeException exception2 = new RuntimeException("exception 2");
+        doThrow(exception2).when(store2).close();
+
+        this.stateManager.registerStore(store2);
+        this.stateManager.registerStore(store1);
+
+        try {
+            this.stateManager.close();
+            fail("Should fail to close the state manager");
+        } catch (RuntimeException re) {
+            assertSame(re, exception2);
+        }
+
+        assertTrue(this.stateManager.isEmpty());
+    }
+
+}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
index 6be0f14..21f0e9d 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.functions.api.StateStore;
 import org.slf4j.Logger;
 
 /**
@@ -82,6 +83,20 @@ public interface ConnectorContext {
     String getSecret(String secretName);
 
     /**
+     * Get the state store with the provided store name.
+     *
+     * @param name the state store name
+     * @param <S> the type of interface of the store to return
+     * @return the state store instance.
+     *
+     * @throws ClassCastException if the return type isn't a type
+     * or interface of the actual returned store.
+     */
+    default <S extends StateStore> S getStateStore(String name) {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
+    /**
      * Increment the builtin distributed counter referred by key.
      *
      * @param key    The name of the key