You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/22 11:45:48 UTC

[incubator-tubemq] branch TUBEMQ-455 updated: [TUBEMQ-466] add local storage interfaces and implements

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-455
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-455 by this push:
     new fd9ec2f  [TUBEMQ-466] add local storage interfaces and implements
fd9ec2f is described below

commit fd9ec2f3e6e54c441d3dffcef904052f5addf1ae
Author: yuanbo <yu...@apache.org>
AuthorDate: Tue Dec 22 11:16:58 2020 +0800

    [TUBEMQ-466] add local storage interfaces and implements
---
 .../tubemq/agent/constants/AgentConstants.java     |  15 +-
 .../org/apache/tubemq/agent/db/BerkeleyDBImp.java  | 190 +++++++++++++++++++++
 .../main/java/org/apache/tubemq/agent/db/DB.java   |  83 +++++++++
 .../org/apache/tubemq/agent/db/KeyValueEntity.java |  68 ++++++++
 .../org/apache/tubemq/agent/db/RocksDBImp.java     | 102 +++++++++++
 .../org/apache/tubemq/agent/db/StateSearchKey.java |  28 +++
 .../apache/tubemq/agent/AgentBaseTestsHelper.java  |  63 +++++++
 .../apache/tubemq/agent/db/TestBerkeleyDBImp.java  | 101 +++++++++++
 8 files changed, 647 insertions(+), 3 deletions(-)

diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/AgentConstants.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/AgentConstants.java
index 40c48e3..1b2c00b 100644
--- a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/AgentConstants.java
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/AgentConstants.java
@@ -13,14 +13,26 @@
  */
 package org.apache.tubemq.agent.constants;
 
+import org.apache.tubemq.agent.utils.AgentUtils;
+
 public class AgentConstants {
 
+    public static final String AGENT_HOME = "agent.home";
+    public static final String DEFAULT_AGENT_HOME = System.getProperty("agent.home");
+
+    public static final String AGENT_LOCAL_CACHE = "agent.local.cache";
+    public static final String DEFAULT_AGENT_LOCAL_CACHE = ".local";
+
     public static final String AGENT_LOCAL_STORE_PATH = "agent.localStore.path";
     public static final String DEFAULT_AGENT_LOCAL_STORE_PATH = ".bdb";
 
     public static final String AGENT_ROCKS_DB_PATH = "agent.rocks.db.path";
     public static final String DEFAULT_AGENT_ROCKS_DB_PATH = ".rocksdb";
 
+    public static final String AGENT_UNIQ_ID = "agent.uniq.id";
+    // default use local ip as uniq id for agent.
+    public static final String DEFAULT_AGENT_UNIQ_ID = AgentUtils.getLocalIP();
+
     public static final String AGENT_DB_INSTANCE_NAME = "agent.db.instance.name";
     public static final String DEFAULT_AGENT_DB_INSTANCE_NAME = "agent";
 
@@ -30,9 +42,6 @@ public class AgentConstants {
     // default is empty.
     public static final String AGENT_FETCHER_CLASSNAME = "agent.fetcher.classname";
 
-    public static final String AGENT_FETCHER_INTERVAL = "agent.fetcher.interval";
-    public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 60;
-
     public static final String AGENT_CONF_PARENT = "agent.conf.parent";
     public static final String DEFAULT_AGENT_CONF_PARENT = "conf";
 
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/BerkeleyDBImp.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/BerkeleyDBImp.java
new file mode 100644
index 0000000..9201ae4
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/BerkeleyDBImp.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_DB_INSTANCE_NAME;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_HOME;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_LOCAL_STORE_LOCK_TIMEOUT;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_LOCAL_STORE_NO_SYNC_VOID;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_LOCAL_STORE_PATH;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_LOCAL_STORE_READONLY;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_LOCAL_STORE_TRANSACTIONAL;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_LOCAL_STORE_WRITE_NO_SYNC_VOID;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_DB_INSTANCE_NAME;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_HOME;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_STORE_LOCK_TIMEOUT;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_STORE_NO_SYNC_VOID;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_STORE_PATH;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_STORE_READONLY;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_STORE_TRANSACTIONAL;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_STORE_WRITE_NO_SYNC_VOID;
+
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.SecondaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DB implement based on berkeley db.
+ */
+public class BerkeleyDBImp implements DB {
+    private static final Logger LOGGER = LoggerFactory.getLogger(BerkeleyDBImp.class);
+
+    private final EntityStore store;
+    private final PrimaryIndex<String, KeyValueEntity> primaryIndex;
+    private final SecondaryIndex<StateSearchKey, String, KeyValueEntity> secondaryIndex;
+    private final AgentConfiguration agentConf;
+
+    public BerkeleyDBImp() {
+        this.agentConf = AgentConfiguration.getAgentConf();
+        StoreConfig storeConfig = initStoreConfig();
+        Environment environment = initEnv();
+        String instanceName = agentConf.get(AGENT_DB_INSTANCE_NAME, DEFAULT_AGENT_DB_INSTANCE_NAME);
+        this.store = new EntityStore(environment, instanceName, storeConfig);
+        primaryIndex = this.store.getPrimaryIndex(String.class, KeyValueEntity.class);
+        secondaryIndex = this.store.getSecondaryIndex(primaryIndex, StateSearchKey.class,
+                "stateSearchKey");
+    }
+
+    /**
+     * init store by config
+     *
+     * @return store config
+     */
+    private StoreConfig initStoreConfig() {
+        return new StoreConfig()
+                .setReadOnly(agentConf.getBoolean(
+                        AGENT_LOCAL_STORE_READONLY, DEFAULT_AGENT_LOCAL_STORE_READONLY))
+                .setAllowCreate(!agentConf.getBoolean(
+                        AGENT_LOCAL_STORE_READONLY, DEFAULT_AGENT_LOCAL_STORE_READONLY))
+                .setTransactional(agentConf.getBoolean(
+                        AGENT_LOCAL_STORE_TRANSACTIONAL, DEFAULT_AGENT_LOCAL_STORE_TRANSACTIONAL));
+    }
+
+    /**
+     * init local bdb path and get it.
+     * @return local path.
+     */
+    private File tryToInitAndGetPath() {
+        String storePath = agentConf.get(AGENT_LOCAL_STORE_PATH, DEFAULT_AGENT_LOCAL_STORE_PATH);
+        String parentPath = agentConf.get(AGENT_HOME, DEFAULT_AGENT_HOME);
+        File finalPath = new File(parentPath, storePath);
+        try {
+            boolean result = finalPath.mkdirs();
+            LOGGER.info("try to create local path {}, result is {}", finalPath, result);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+        return finalPath;
+    }
+
+    /**
+     * init env by config
+     *
+     * @return env config
+     */
+    private Environment initEnv() {
+        EnvironmentConfig envConfig = new EnvironmentConfig()
+                .setReadOnly(agentConf.getBoolean(
+                        AGENT_LOCAL_STORE_READONLY, DEFAULT_AGENT_LOCAL_STORE_READONLY))
+                .setAllowCreate(!agentConf.getBoolean(
+                        AGENT_LOCAL_STORE_READONLY, DEFAULT_AGENT_LOCAL_STORE_READONLY))
+                .setTransactional(agentConf.getBoolean(
+                        AGENT_LOCAL_STORE_TRANSACTIONAL, DEFAULT_AGENT_LOCAL_STORE_TRANSACTIONAL))
+                .setLockTimeout(
+                        agentConf.getInt(AGENT_LOCAL_STORE_LOCK_TIMEOUT,
+                                DEFAULT_AGENT_LOCAL_STORE_LOCK_TIMEOUT),
+                        TimeUnit.MILLISECONDS);
+        envConfig.setTxnNoSyncVoid(agentConf.getBoolean(AGENT_LOCAL_STORE_NO_SYNC_VOID,
+                DEFAULT_AGENT_LOCAL_STORE_NO_SYNC_VOID));
+        envConfig.setTxnWriteNoSyncVoid(agentConf.getBoolean(AGENT_LOCAL_STORE_WRITE_NO_SYNC_VOID,
+                DEFAULT_AGENT_LOCAL_STORE_WRITE_NO_SYNC_VOID));
+        return new Environment(tryToInitAndGetPath(), envConfig);
+    }
+
+    @Override
+    public KeyValueEntity get(String key) {
+        requireNonNull(key);
+        return primaryIndex.get(key);
+    }
+
+    @Override
+    public void set(KeyValueEntity entity) {
+        requireNonNull(entity);
+        primaryIndex.put(entity);
+    }
+
+    @Override
+    public KeyValueEntity put(KeyValueEntity entity) {
+        requireNonNull(entity);
+        return primaryIndex.put(entity);
+    }
+
+    @Override
+    public KeyValueEntity remove(String key) {
+        requireNonNull(key);
+        KeyValueEntity entity = primaryIndex.get(key);
+        primaryIndex.delete(key);
+        return entity;
+    }
+
+    @Override
+    public List<KeyValueEntity> search(StateSearchKey searchKey) {
+        requireNonNull(searchKey);
+        List<KeyValueEntity> ret = new ArrayList<>();
+        try (EntityCursor<KeyValueEntity> children = secondaryIndex.subIndex(searchKey)
+                .entities()) {
+            for (KeyValueEntity entity : children) {
+                ret.add(entity);
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public KeyValueEntity searchOne(StateSearchKey searchKey) {
+        requireNonNull(searchKey);
+        return secondaryIndex.get(searchKey);
+    }
+
+    @Override
+    public List<KeyValueEntity> findAll(String prefix) {
+        requireNonNull(prefix);
+        List<KeyValueEntity> ret = new ArrayList<>();
+        try (EntityCursor<KeyValueEntity> children = primaryIndex.entities()) {
+            for (KeyValueEntity entity : children) {
+                if (entity.getKey().startsWith(prefix)) {
+                    ret.add(entity);
+                }
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public void close() {
+        store.close();
+    }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/DB.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/DB.java
new file mode 100644
index 0000000..d7fa421
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/DB.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+import java.io.Closeable;
+import java.util.List;
+import javax.management.openmbean.KeyAlreadyExistsException;
+
+/**
+ * local storage for key/value.
+ */
+public interface DB extends Closeable {
+
+    /**
+     * get keyValue by key
+     *
+     * @param key - key
+     * @return key/value
+     * @throws NullPointerException key should not be null.
+     */
+    KeyValueEntity get(String key);
+
+    /**
+     * store keyValue, if key has exists, throw exception.
+     *
+     * @param entity - key/value
+     * @throws NullPointerException key should not be null
+     * @throws KeyAlreadyExistsException key already exists
+     */
+    void set(KeyValueEntity entity);
+
+    /**
+     * store keyValue, if key has exists, overwrite it.
+     *
+     * @param entity - key/value
+     * @return null or old value which is overwritten.
+     * @throws NullPointerException key should not be null.
+     */
+    KeyValueEntity put(KeyValueEntity entity);
+
+    /**
+     * remove keyValue by key.
+     *
+     * @param key - key
+     * @return key/value
+     * @throws NullPointerException key should not be null.
+     */
+    KeyValueEntity remove(String key);
+
+    /**
+     * search keyValue list by search key.
+     *
+     * @param searchKey - search keys.
+     * @return key/value list
+     * @throws NullPointerException search key should not be null.
+     */
+    List<KeyValueEntity> search(StateSearchKey searchKey);
+
+    /**
+     * search one keyValue by search key
+     * @param searchKey - search key
+     * @return null or keyValue
+     */
+    KeyValueEntity searchOne(StateSearchKey searchKey);
+
+    /**
+     * find all by prefix key.
+     * @param prefix - prefix string
+     * @return list of k/v
+     */
+    List<KeyValueEntity> findAll(String prefix);
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/KeyValueEntity.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/KeyValueEntity.java
new file mode 100644
index 0000000..cd87f2f
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/KeyValueEntity.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+import com.sleepycat.persist.model.Entity;
+import com.sleepycat.persist.model.PrimaryKey;
+import com.sleepycat.persist.model.Relationship;
+import com.sleepycat.persist.model.SecondaryKey;
+
+/**
+ * key value entity. key is string and value is a json
+ */
+@Entity(version = 1)
+public class KeyValueEntity {
+
+    @PrimaryKey
+    private String key;
+
+
+    @SecondaryKey(relate = Relationship.MANY_TO_ONE)
+    private StateSearchKey stateSearchKey;
+
+    // string json
+    private String jsonValue;
+
+    private KeyValueEntity() {
+
+    }
+
+    public KeyValueEntity(String key, String jsonValue) {
+        this.key = key;
+        this.jsonValue = jsonValue;
+        this.stateSearchKey = StateSearchKey.ACCEPTED;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public StateSearchKey getStateSearchKey() {
+        return stateSearchKey;
+    }
+
+    public KeyValueEntity setStateSearchKey(StateSearchKey stateSearchKey) {
+        this.stateSearchKey = stateSearchKey;
+        return this;
+    }
+
+    public String getJsonValue() {
+        return jsonValue;
+    }
+
+    public KeyValueEntity setJsonValue(String jsonValue) {
+        this.jsonValue = jsonValue;
+        return this;
+    }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/RocksDBImp.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/RocksDBImp.java
new file mode 100644
index 0000000..42a247f
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/RocksDBImp.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_HOME;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_ROCKS_DB_PATH;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_HOME;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DB implement based on rocks db.
+ * TODO: this is low priority.
+ */
+public class RocksDBImp implements DB {
+    private static final Logger LOGGER = LoggerFactory.getLogger(RocksDBImp.class);
+
+    private final AgentConfiguration conf;
+    private final RocksDB db;
+
+    public RocksDBImp() {
+        // init rocks db
+        this.conf = AgentConfiguration.getAgentConf();
+        this.db = initEnv();
+    }
+
+    private RocksDB initEnv() {
+        String storePath = conf.get(AGENT_ROCKS_DB_PATH, DEFAULT_AGENT_ROCKS_DB_PATH);
+        String parentPath = conf.get(AGENT_HOME, DEFAULT_AGENT_HOME);
+        File finalPath = new File(parentPath, storePath);
+        RocksDB.loadLibrary();
+        final Options options = new Options();
+        options.setCreateIfMissing(true);
+        try {
+            boolean result = finalPath.mkdirs();
+            LOGGER.info("create directory {}, result is {}", finalPath, result);
+            return RocksDB.open(options, finalPath.getAbsolutePath());
+        } catch (Exception ex) {
+            // cannot create local path, stop running.
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public KeyValueEntity get(String key) {
+        return null;
+    }
+
+    @Override
+    public void set(KeyValueEntity entity) {
+
+    }
+
+    @Override
+    public KeyValueEntity put(KeyValueEntity entity) {
+        return null;
+    }
+
+    @Override
+    public KeyValueEntity remove(String key) {
+        return null;
+    }
+
+    @Override
+    public List<KeyValueEntity> search(StateSearchKey searchKey) {
+        return null;
+    }
+
+    @Override
+    public KeyValueEntity searchOne(StateSearchKey searchKey) {
+        return null;
+    }
+
+    @Override
+    public List<KeyValueEntity> findAll(String prefix) {
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/StateSearchKey.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/StateSearchKey.java
new file mode 100644
index 0000000..84740bd
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/StateSearchKey.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+/**
+ * search key for state.
+ */
+public enum StateSearchKey {
+    // success state
+    SUCCESS,
+    // fail state
+    FAILED,
+    // accepted state
+    ACCEPTED,
+    // running state
+    RUNNING
+}
diff --git a/tubemq-agent/agent-common/src/test/java/org/apache/tubemq/agent/AgentBaseTestsHelper.java b/tubemq-agent/agent-common/src/test/java/org/apache/tubemq/agent/AgentBaseTestsHelper.java
new file mode 100644
index 0000000..3bd3540
--- /dev/null
+++ b/tubemq-agent/agent-common/src/test/java/org/apache/tubemq/agent/AgentBaseTestsHelper.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent;
+
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_HOME;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.io.FileUtils;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * common environment setting up for test cases.
+ */
+public class AgentBaseTestsHelper {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AgentBaseTestsHelper.class);
+
+    private static final AtomicLong fileIndex = new AtomicLong();
+    private static final ConcurrentHashMap<String, Path> localPath = new ConcurrentHashMap<>();
+
+    public static void setupAgentHome(String className) {
+        long uniqId = fileIndex.incrementAndGet();
+        Path testRootDir = Paths
+            .get("/tmp", AgentBaseTestsHelper.class.getSimpleName(), String.valueOf(uniqId));
+        localPath.putIfAbsent(className, testRootDir);
+        teardownAgentHome(className);
+        boolean result = testRootDir.toFile().mkdirs();
+        LOGGER.info("try to create {}, result is {}", testRootDir, result);
+
+        AgentConfiguration.getAgentConf().set(AGENT_HOME, testRootDir.toString());
+    }
+
+    public static Path getTestRootDir(String className) {
+        return localPath.get(className);
+    }
+
+    public static void teardownAgentHome(String className) {
+        Path testRootDir = localPath.get(className);
+        if (testRootDir != null) {
+            try {
+                FileUtils.deleteDirectory(testRootDir.toFile());
+            } catch (Exception ignored) {
+
+            }
+        }
+    }
+}
diff --git a/tubemq-agent/agent-common/src/test/java/org/apache/tubemq/agent/db/TestBerkeleyDBImp.java b/tubemq-agent/agent-common/src/test/java/org/apache/tubemq/agent/db/TestBerkeleyDBImp.java
new file mode 100644
index 0000000..e7e9102
--- /dev/null
+++ b/tubemq-agent/agent-common/src/test/java/org/apache/tubemq/agent/db/TestBerkeleyDBImp.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+import java.util.List;
+import org.apache.tubemq.agent.AgentBaseTestsHelper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestBerkeleyDBImp {
+
+    private static BerkeleyDBImp db;
+    private static final String className = TestBerkeleyDBImp.class.getName();
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        AgentBaseTestsHelper.setupAgentHome(className);
+        db = new BerkeleyDBImp();
+    }
+
+    @AfterClass
+    public static void teardown() throws Exception {
+        db.close();
+        AgentBaseTestsHelper.teardownAgentHome(className);
+    }
+
+    @Test
+    public void testDB() {
+        KeyValueEntity entity = new KeyValueEntity("test1", "testA");
+        db.put(entity);
+        KeyValueEntity ret = db.get("test1");
+        Assert.assertEquals("test1", ret.getKey());
+        Assert.assertEquals("testA", ret.getJsonValue());
+
+        db.remove("test1");
+        ret = db.get("test1");
+        Assert.assertNull(ret);
+
+        StateSearchKey keys = StateSearchKey.SUCCESS;
+        KeyValueEntity entity1 = new KeyValueEntity("test2", "testA");
+        entity.setStateSearchKey(keys);
+        entity1.setStateSearchKey(keys);
+
+        db.set(entity);
+        db.set(entity1);
+
+        List<KeyValueEntity> entityList = db.search(keys);
+        for (KeyValueEntity keyValueEntity : entityList) {
+            Assert.assertEquals(StateSearchKey.SUCCESS, keyValueEntity.getStateSearchKey());
+        }
+        Assert.assertEquals(2, entityList.size());
+
+        entity.setJsonValue("testC");
+        KeyValueEntity oldEntity = db.put(entity);
+        Assert.assertEquals("testA", oldEntity.getJsonValue());
+
+        KeyValueEntity newEntity = db.get("test1");
+        Assert.assertEquals("testC", newEntity.getJsonValue());
+
+    }
+
+    @Test
+    public void testSecondaryIndex() {
+        KeyValueEntity entity = new KeyValueEntity("searchKey1", "searchResult1");
+        db.put(entity);
+        KeyValueEntity entity1 = new KeyValueEntity("searchKey2", "searchResult2");
+        db.put(entity1);
+        KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
+        Assert.assertEquals("searchKey1", entityResult.getKey());
+
+        entityResult = db.searchOne(StateSearchKey.ACCEPTED);
+        Assert.assertEquals("searchKey1", entityResult.getKey());
+
+        entityResult.setStateSearchKey(StateSearchKey.RUNNING);
+        db.put(entityResult);
+
+        entityResult = db.searchOne(StateSearchKey.ACCEPTED);
+        Assert.assertEquals("searchKey2", entityResult.getKey());
+
+        List<KeyValueEntity> entityList = db.search(StateSearchKey.ACCEPTED);
+        Assert.assertEquals(1, entityList.size());
+
+        entityList = db.search(StateSearchKey.FAILED);
+        Assert.assertEquals(0, entityList.size());
+    }
+
+}