You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/22 11:45:48 UTC
[incubator-tubemq] branch TUBEMQ-455 updated: [TUBEMQ-466] add
local storage interfaces and implements
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-455
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-455 by this push:
new fd9ec2f [TUBEMQ-466] add local storage interfaces and implements
fd9ec2f is described below
commit fd9ec2f3e6e54c441d3dffcef904052f5addf1ae
Author: yuanbo <yu...@apache.org>
AuthorDate: Tue Dec 22 11:16:58 2020 +0800
[TUBEMQ-466] add local storage interfaces and implements
---
.../tubemq/agent/constants/AgentConstants.java | 15 +-
.../org/apache/tubemq/agent/db/BerkeleyDBImp.java | 190 +++++++++++++++++++++
.../main/java/org/apache/tubemq/agent/db/DB.java | 83 +++++++++
.../org/apache/tubemq/agent/db/KeyValueEntity.java | 68 ++++++++
.../org/apache/tubemq/agent/db/RocksDBImp.java | 102 +++++++++++
.../org/apache/tubemq/agent/db/StateSearchKey.java | 28 +++
.../apache/tubemq/agent/AgentBaseTestsHelper.java | 63 +++++++
.../apache/tubemq/agent/db/TestBerkeleyDBImp.java | 101 +++++++++++
8 files changed, 647 insertions(+), 3 deletions(-)
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/AgentConstants.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/AgentConstants.java
index 40c48e3..1b2c00b 100644
--- a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/AgentConstants.java
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/AgentConstants.java
@@ -13,14 +13,26 @@
*/
package org.apache.tubemq.agent.constants;
+import org.apache.tubemq.agent.utils.AgentUtils;
+
public class AgentConstants {
+ public static final String AGENT_HOME = "agent.home";
+ public static final String DEFAULT_AGENT_HOME = System.getProperty("agent.home");
+
+ public static final String AGENT_LOCAL_CACHE = "agent.local.cache";
+ public static final String DEFAULT_AGENT_LOCAL_CACHE = ".local";
+
public static final String AGENT_LOCAL_STORE_PATH = "agent.localStore.path";
public static final String DEFAULT_AGENT_LOCAL_STORE_PATH = ".bdb";
public static final String AGENT_ROCKS_DB_PATH = "agent.rocks.db.path";
public static final String DEFAULT_AGENT_ROCKS_DB_PATH = ".rocksdb";
+ public static final String AGENT_UNIQ_ID = "agent.uniq.id";
+ // default use local ip as uniq id for agent.
+ public static final String DEFAULT_AGENT_UNIQ_ID = AgentUtils.getLocalIP();
+
public static final String AGENT_DB_INSTANCE_NAME = "agent.db.instance.name";
public static final String DEFAULT_AGENT_DB_INSTANCE_NAME = "agent";
@@ -30,9 +42,6 @@ public class AgentConstants {
// default is empty.
public static final String AGENT_FETCHER_CLASSNAME = "agent.fetcher.classname";
- public static final String AGENT_FETCHER_INTERVAL = "agent.fetcher.interval";
- public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 60;
-
public static final String AGENT_CONF_PARENT = "agent.conf.parent";
public static final String DEFAULT_AGENT_CONF_PARENT = "conf";
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/BerkeleyDBImp.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/BerkeleyDBImp.java
new file mode 100644
index 0000000..9201ae4
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/BerkeleyDBImp.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_DB_INSTANCE_NAME;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_HOME;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_LOCAL_STORE_LOCK_TIMEOUT;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_LOCAL_STORE_NO_SYNC_VOID;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_LOCAL_STORE_PATH;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_LOCAL_STORE_READONLY;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_LOCAL_STORE_TRANSACTIONAL;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_LOCAL_STORE_WRITE_NO_SYNC_VOID;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_DB_INSTANCE_NAME;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_HOME;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_STORE_LOCK_TIMEOUT;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_STORE_NO_SYNC_VOID;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_STORE_PATH;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_STORE_READONLY;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_STORE_TRANSACTIONAL;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_STORE_WRITE_NO_SYNC_VOID;
+
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.SecondaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DB implement based on berkeley db.
+ */
+public class BerkeleyDBImp implements DB {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BerkeleyDBImp.class);
+
+ private final EntityStore store;
+ private final PrimaryIndex<String, KeyValueEntity> primaryIndex;
+ private final SecondaryIndex<StateSearchKey, String, KeyValueEntity> secondaryIndex;
+ private final AgentConfiguration agentConf;
+
+ public BerkeleyDBImp() {
+ this.agentConf = AgentConfiguration.getAgentConf();
+ StoreConfig storeConfig = initStoreConfig();
+ Environment environment = initEnv();
+ String instanceName = agentConf.get(AGENT_DB_INSTANCE_NAME, DEFAULT_AGENT_DB_INSTANCE_NAME);
+ this.store = new EntityStore(environment, instanceName, storeConfig);
+ primaryIndex = this.store.getPrimaryIndex(String.class, KeyValueEntity.class);
+ secondaryIndex = this.store.getSecondaryIndex(primaryIndex, StateSearchKey.class,
+ "stateSearchKey");
+ }
+
+ /**
+ * init store by config
+ *
+ * @return store config
+ */
+ private StoreConfig initStoreConfig() {
+ return new StoreConfig()
+ .setReadOnly(agentConf.getBoolean(
+ AGENT_LOCAL_STORE_READONLY, DEFAULT_AGENT_LOCAL_STORE_READONLY))
+ .setAllowCreate(!agentConf.getBoolean(
+ AGENT_LOCAL_STORE_READONLY, DEFAULT_AGENT_LOCAL_STORE_READONLY))
+ .setTransactional(agentConf.getBoolean(
+ AGENT_LOCAL_STORE_TRANSACTIONAL, DEFAULT_AGENT_LOCAL_STORE_TRANSACTIONAL));
+ }
+
+ /**
+ * init local bdb path and get it.
+ * @return local path.
+ */
+ private File tryToInitAndGetPath() {
+ String storePath = agentConf.get(AGENT_LOCAL_STORE_PATH, DEFAULT_AGENT_LOCAL_STORE_PATH);
+ String parentPath = agentConf.get(AGENT_HOME, DEFAULT_AGENT_HOME);
+ File finalPath = new File(parentPath, storePath);
+ try {
+ boolean result = finalPath.mkdirs();
+ LOGGER.info("try to create local path {}, result is {}", finalPath, result);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ return finalPath;
+ }
+
+ /**
+ * init env by config
+ *
+ * @return env config
+ */
+ private Environment initEnv() {
+ EnvironmentConfig envConfig = new EnvironmentConfig()
+ .setReadOnly(agentConf.getBoolean(
+ AGENT_LOCAL_STORE_READONLY, DEFAULT_AGENT_LOCAL_STORE_READONLY))
+ .setAllowCreate(!agentConf.getBoolean(
+ AGENT_LOCAL_STORE_READONLY, DEFAULT_AGENT_LOCAL_STORE_READONLY))
+ .setTransactional(agentConf.getBoolean(
+ AGENT_LOCAL_STORE_TRANSACTIONAL, DEFAULT_AGENT_LOCAL_STORE_TRANSACTIONAL))
+ .setLockTimeout(
+ agentConf.getInt(AGENT_LOCAL_STORE_LOCK_TIMEOUT,
+ DEFAULT_AGENT_LOCAL_STORE_LOCK_TIMEOUT),
+ TimeUnit.MILLISECONDS);
+ envConfig.setTxnNoSyncVoid(agentConf.getBoolean(AGENT_LOCAL_STORE_NO_SYNC_VOID,
+ DEFAULT_AGENT_LOCAL_STORE_NO_SYNC_VOID));
+ envConfig.setTxnWriteNoSyncVoid(agentConf.getBoolean(AGENT_LOCAL_STORE_WRITE_NO_SYNC_VOID,
+ DEFAULT_AGENT_LOCAL_STORE_WRITE_NO_SYNC_VOID));
+ return new Environment(tryToInitAndGetPath(), envConfig);
+ }
+
+ @Override
+ public KeyValueEntity get(String key) {
+ requireNonNull(key);
+ return primaryIndex.get(key);
+ }
+
+ @Override
+ public void set(KeyValueEntity entity) {
+ requireNonNull(entity);
+ primaryIndex.put(entity);
+ }
+
+ @Override
+ public KeyValueEntity put(KeyValueEntity entity) {
+ requireNonNull(entity);
+ return primaryIndex.put(entity);
+ }
+
+ @Override
+ public KeyValueEntity remove(String key) {
+ requireNonNull(key);
+ KeyValueEntity entity = primaryIndex.get(key);
+ primaryIndex.delete(key);
+ return entity;
+ }
+
+ @Override
+ public List<KeyValueEntity> search(StateSearchKey searchKey) {
+ requireNonNull(searchKey);
+ List<KeyValueEntity> ret = new ArrayList<>();
+ try (EntityCursor<KeyValueEntity> children = secondaryIndex.subIndex(searchKey)
+ .entities()) {
+ for (KeyValueEntity entity : children) {
+ ret.add(entity);
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public KeyValueEntity searchOne(StateSearchKey searchKey) {
+ requireNonNull(searchKey);
+ return secondaryIndex.get(searchKey);
+ }
+
+ @Override
+ public List<KeyValueEntity> findAll(String prefix) {
+ requireNonNull(prefix);
+ List<KeyValueEntity> ret = new ArrayList<>();
+ try (EntityCursor<KeyValueEntity> children = primaryIndex.entities()) {
+ for (KeyValueEntity entity : children) {
+ if (entity.getKey().startsWith(prefix)) {
+ ret.add(entity);
+ }
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public void close() {
+ store.close();
+ }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/DB.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/DB.java
new file mode 100644
index 0000000..d7fa421
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/DB.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+import java.io.Closeable;
+import java.util.List;
+import javax.management.openmbean.KeyAlreadyExistsException;
+
+/**
+ * local storage for key/value.
+ */
+public interface DB extends Closeable {
+
+ /**
+ * get keyValue by key
+ *
+ * @param key - key
+ * @return key/value
+ * @throws NullPointerException key should not be null.
+ */
+ KeyValueEntity get(String key);
+
+ /**
+ * store keyValue, if key has exists, throw exception.
+ *
+ * @param entity - key/value
+ * @throws NullPointerException key should not be null
+ * @throws KeyAlreadyExistsException key already exists
+ */
+ void set(KeyValueEntity entity);
+
+ /**
+ * store keyValue, if key has exists, overwrite it.
+ *
+ * @param entity - key/value
+ * @return null or old value which is overwritten.
+ * @throws NullPointerException key should not be null.
+ */
+ KeyValueEntity put(KeyValueEntity entity);
+
+ /**
+ * remove keyValue by key.
+ *
+ * @param key - key
+ * @return key/value
+ * @throws NullPointerException key should not be null.
+ */
+ KeyValueEntity remove(String key);
+
+ /**
+ * search keyValue list by search key.
+ *
+ * @param searchKey - search keys.
+ * @return key/value list
+ * @throws NullPointerException search key should not be null.
+ */
+ List<KeyValueEntity> search(StateSearchKey searchKey);
+
+ /**
+ * search one keyValue by search key
+ * @param searchKey - search key
+ * @return null or keyValue
+ */
+ KeyValueEntity searchOne(StateSearchKey searchKey);
+
+ /**
+ * find all by prefix key.
+ * @param prefix - prefix string
+ * @return list of k/v
+ */
+ List<KeyValueEntity> findAll(String prefix);
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/KeyValueEntity.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/KeyValueEntity.java
new file mode 100644
index 0000000..cd87f2f
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/KeyValueEntity.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+import com.sleepycat.persist.model.Entity;
+import com.sleepycat.persist.model.PrimaryKey;
+import com.sleepycat.persist.model.Relationship;
+import com.sleepycat.persist.model.SecondaryKey;
+
+/**
+ * key value entity. key is string and value is a json
+ */
+@Entity(version = 1)
+public class KeyValueEntity {
+
+ @PrimaryKey
+ private String key;
+
+
+ @SecondaryKey(relate = Relationship.MANY_TO_ONE)
+ private StateSearchKey stateSearchKey;
+
+ // string json
+ private String jsonValue;
+
+ private KeyValueEntity() {
+
+ }
+
+ public KeyValueEntity(String key, String jsonValue) {
+ this.key = key;
+ this.jsonValue = jsonValue;
+ this.stateSearchKey = StateSearchKey.ACCEPTED;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public StateSearchKey getStateSearchKey() {
+ return stateSearchKey;
+ }
+
+ public KeyValueEntity setStateSearchKey(StateSearchKey stateSearchKey) {
+ this.stateSearchKey = stateSearchKey;
+ return this;
+ }
+
+ public String getJsonValue() {
+ return jsonValue;
+ }
+
+ public KeyValueEntity setJsonValue(String jsonValue) {
+ this.jsonValue = jsonValue;
+ return this;
+ }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/RocksDBImp.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/RocksDBImp.java
new file mode 100644
index 0000000..42a247f
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/RocksDBImp.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_HOME;
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_ROCKS_DB_PATH;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_HOME;
+import static org.apache.tubemq.agent.constants.AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DB implement based on rocks db.
+ * TODO: this is low priority.
+ */
+public class RocksDBImp implements DB {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RocksDBImp.class);
+
+ private final AgentConfiguration conf;
+ private final RocksDB db;
+
+ public RocksDBImp() {
+ // init rocks db
+ this.conf = AgentConfiguration.getAgentConf();
+ this.db = initEnv();
+ }
+
+ private RocksDB initEnv() {
+ String storePath = conf.get(AGENT_ROCKS_DB_PATH, DEFAULT_AGENT_ROCKS_DB_PATH);
+ String parentPath = conf.get(AGENT_HOME, DEFAULT_AGENT_HOME);
+ File finalPath = new File(parentPath, storePath);
+ RocksDB.loadLibrary();
+ final Options options = new Options();
+ options.setCreateIfMissing(true);
+ try {
+ boolean result = finalPath.mkdirs();
+ LOGGER.info("create directory {}, result is {}", finalPath, result);
+ return RocksDB.open(options, finalPath.getAbsolutePath());
+ } catch (Exception ex) {
+ // cannot create local path, stop running.
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public KeyValueEntity get(String key) {
+ return null;
+ }
+
+ @Override
+ public void set(KeyValueEntity entity) {
+
+ }
+
+ @Override
+ public KeyValueEntity put(KeyValueEntity entity) {
+ return null;
+ }
+
+ @Override
+ public KeyValueEntity remove(String key) {
+ return null;
+ }
+
+ @Override
+ public List<KeyValueEntity> search(StateSearchKey searchKey) {
+ return null;
+ }
+
+ @Override
+ public KeyValueEntity searchOne(StateSearchKey searchKey) {
+ return null;
+ }
+
+ @Override
+ public List<KeyValueEntity> findAll(String prefix) {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/StateSearchKey.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/StateSearchKey.java
new file mode 100644
index 0000000..84740bd
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/StateSearchKey.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+/**
+ * search key for state.
+ */
+public enum StateSearchKey {
+ // success state
+ SUCCESS,
+ // fail state
+ FAILED,
+ // accepted state
+ ACCEPTED,
+ // running state
+ RUNNING
+}
diff --git a/tubemq-agent/agent-common/src/test/java/org/apache/tubemq/agent/AgentBaseTestsHelper.java b/tubemq-agent/agent-common/src/test/java/org/apache/tubemq/agent/AgentBaseTestsHelper.java
new file mode 100644
index 0000000..3bd3540
--- /dev/null
+++ b/tubemq-agent/agent-common/src/test/java/org/apache/tubemq/agent/AgentBaseTestsHelper.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent;
+
+import static org.apache.tubemq.agent.constants.AgentConstants.AGENT_HOME;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.io.FileUtils;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * common environment setting up for test cases.
+ */
+public class AgentBaseTestsHelper {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AgentBaseTestsHelper.class);
+
+ private static final AtomicLong fileIndex = new AtomicLong();
+ private static final ConcurrentHashMap<String, Path> localPath = new ConcurrentHashMap<>();
+
+ public static void setupAgentHome(String className) {
+ long uniqId = fileIndex.incrementAndGet();
+ Path testRootDir = Paths
+ .get("/tmp", AgentBaseTestsHelper.class.getSimpleName(), String.valueOf(uniqId));
+ localPath.putIfAbsent(className, testRootDir);
+ teardownAgentHome(className);
+ boolean result = testRootDir.toFile().mkdirs();
+ LOGGER.info("try to create {}, result is {}", testRootDir, result);
+
+ AgentConfiguration.getAgentConf().set(AGENT_HOME, testRootDir.toString());
+ }
+
+ public static Path getTestRootDir(String className) {
+ return localPath.get(className);
+ }
+
+ public static void teardownAgentHome(String className) {
+ Path testRootDir = localPath.get(className);
+ if (testRootDir != null) {
+ try {
+ FileUtils.deleteDirectory(testRootDir.toFile());
+ } catch (Exception ignored) {
+
+ }
+ }
+ }
+}
diff --git a/tubemq-agent/agent-common/src/test/java/org/apache/tubemq/agent/db/TestBerkeleyDBImp.java b/tubemq-agent/agent-common/src/test/java/org/apache/tubemq/agent/db/TestBerkeleyDBImp.java
new file mode 100644
index 0000000..e7e9102
--- /dev/null
+++ b/tubemq-agent/agent-common/src/test/java/org/apache/tubemq/agent/db/TestBerkeleyDBImp.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+import java.util.List;
+import org.apache.tubemq.agent.AgentBaseTestsHelper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestBerkeleyDBImp {
+
+ private static BerkeleyDBImp db;
+ private static final String className = TestBerkeleyDBImp.class.getName();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ AgentBaseTestsHelper.setupAgentHome(className);
+ db = new BerkeleyDBImp();
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ db.close();
+ AgentBaseTestsHelper.teardownAgentHome(className);
+ }
+
+ @Test
+ public void testDB() {
+ KeyValueEntity entity = new KeyValueEntity("test1", "testA");
+ db.put(entity);
+ KeyValueEntity ret = db.get("test1");
+ Assert.assertEquals("test1", ret.getKey());
+ Assert.assertEquals("testA", ret.getJsonValue());
+
+ db.remove("test1");
+ ret = db.get("test1");
+ Assert.assertNull(ret);
+
+ StateSearchKey keys = StateSearchKey.SUCCESS;
+ KeyValueEntity entity1 = new KeyValueEntity("test2", "testA");
+ entity.setStateSearchKey(keys);
+ entity1.setStateSearchKey(keys);
+
+ db.set(entity);
+ db.set(entity1);
+
+ List<KeyValueEntity> entityList = db.search(keys);
+ for (KeyValueEntity keyValueEntity : entityList) {
+ Assert.assertEquals(StateSearchKey.SUCCESS, keyValueEntity.getStateSearchKey());
+ }
+ Assert.assertEquals(2, entityList.size());
+
+ entity.setJsonValue("testC");
+ KeyValueEntity oldEntity = db.put(entity);
+ Assert.assertEquals("testA", oldEntity.getJsonValue());
+
+ KeyValueEntity newEntity = db.get("test1");
+ Assert.assertEquals("testC", newEntity.getJsonValue());
+
+ }
+
+ @Test
+ public void testSecondaryIndex() {
+ KeyValueEntity entity = new KeyValueEntity("searchKey1", "searchResult1");
+ db.put(entity);
+ KeyValueEntity entity1 = new KeyValueEntity("searchKey2", "searchResult2");
+ db.put(entity1);
+ KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
+ Assert.assertEquals("searchKey1", entityResult.getKey());
+
+ entityResult = db.searchOne(StateSearchKey.ACCEPTED);
+ Assert.assertEquals("searchKey1", entityResult.getKey());
+
+ entityResult.setStateSearchKey(StateSearchKey.RUNNING);
+ db.put(entityResult);
+
+ entityResult = db.searchOne(StateSearchKey.ACCEPTED);
+ Assert.assertEquals("searchKey2", entityResult.getKey());
+
+ List<KeyValueEntity> entityList = db.search(StateSearchKey.ACCEPTED);
+ Assert.assertEquals(1, entityList.size());
+
+ entityList = db.search(StateSearchKey.FAILED);
+ Assert.assertEquals(0, entityList.size());
+ }
+
+}