You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/25 10:07:40 UTC
[incubator-inlong] branch master updated: [INLONG-3298][Agent] Remove dbd implementation (#3358)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d52ea13 [INLONG-3298][Agent] Remove dbd implementation (#3358)
d52ea13 is described below
commit d52ea13cda5b86215b374fd743e64a2e63047e3d
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Fri Mar 25 18:07:33 2022 +0800
[INLONG-3298][Agent] Remove dbd implementation (#3358)
---
inlong-agent/agent-common/pom.xml | 4 -
.../org/apache/inlong/agent/db/BerkeleyDbImp.java | 227 ---------------------
.../org/apache/inlong/agent/db/KeyValueEntity.java | 8 -
.../apache/inlong/agent/db/TestBerkeleyDBImp.java | 139 -------------
inlong-agent/pom.xml | 20 --
inlong-common/pom.xml | 5 -
.../org/apache/inlong/common/db/CommandEntity.java | 7 -
pom.xml | 20 --
8 files changed, 430 deletions(-)
diff --git a/inlong-agent/agent-common/pom.xml b/inlong-agent/agent-common/pom.xml
index 2001f08..6516cfe 100755
--- a/inlong-agent/agent-common/pom.xml
+++ b/inlong-agent/agent-common/pom.xml
@@ -40,10 +40,6 @@
<groupId>commons-dbutils</groupId>
</dependency>
<dependency>
- <artifactId>je</artifactId>
- <groupId>com.sleepycat</groupId>
- </dependency>
- <dependency>
<artifactId>commons-lang3</artifactId>
<groupId>org.apache.commons</groupId>
</dependency>
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/BerkeleyDbImp.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/BerkeleyDbImp.java
deleted file mode 100644
index e4f7bec..0000000
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/BerkeleyDbImp.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.db;
-
-import static java.util.Objects.requireNonNull;
-
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.persist.EntityCursor;
-import com.sleepycat.persist.EntityStore;
-import com.sleepycat.persist.PrimaryIndex;
-import com.sleepycat.persist.SecondaryIndex;
-import com.sleepycat.persist.StoreConfig;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.constant.CommonConstants;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.common.db.CommandEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * DB implement based on berkeley db.
- */
-public class BerkeleyDbImp implements Db {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(BerkeleyDbImp.class);
-
- private final EntityStore jobStore;
- private final EntityStore commandStore;
- private final PrimaryIndex<String, KeyValueEntity> primaryIndex;
- private final SecondaryIndex<StateSearchKey, String, KeyValueEntity> secondaryIndex;
- private final PrimaryIndex<String, CommandEntity> commandPrimaryIndex;
- private final SecondaryIndex<String, String, KeyValueEntity> fileNameSecondaryIndex;
- private final SecondaryIndex<Boolean, String, CommandEntity> commandSecondaryIndex;
-
- private final AgentConfiguration agentConf;
-
- public BerkeleyDbImp() {
- this.agentConf = AgentConfiguration.getAgentConf();
- StoreConfig storeConfig = initStoreConfig();
- Environment environment = initEnv();
- String instanceName = agentConf.get(
- AgentConstants.AGENT_DB_INSTANCE_NAME, AgentConstants.DEFAULT_AGENT_DB_INSTANCE_NAME);
- this.jobStore = new EntityStore(environment, instanceName, storeConfig);
- this.commandStore = new EntityStore(environment, CommonConstants.COMMAND_STORE_INSTANCE_NAME, storeConfig);
- commandPrimaryIndex = this.commandStore.getPrimaryIndex(String.class, CommandEntity.class);
- commandSecondaryIndex = commandStore.getSecondaryIndex(
- commandPrimaryIndex, Boolean.class, "isAcked");
- primaryIndex = this.jobStore.getPrimaryIndex(String.class, KeyValueEntity.class);
- secondaryIndex = this.jobStore.getSecondaryIndex(primaryIndex, StateSearchKey.class,
- "stateSearchKey");
- fileNameSecondaryIndex = this.jobStore.getSecondaryIndex(primaryIndex,
- String.class, "fileName");
- }
-
- /**
- * init store by config
- *
- * @return store config
- */
- private StoreConfig initStoreConfig() {
- return new StoreConfig()
- .setReadOnly(agentConf.getBoolean(
- AgentConstants.AGENT_LOCAL_STORE_READONLY,
- AgentConstants.DEFAULT_AGENT_LOCAL_STORE_READONLY))
- .setAllowCreate(!agentConf.getBoolean(
- AgentConstants.AGENT_LOCAL_STORE_READONLY,
- AgentConstants.DEFAULT_AGENT_LOCAL_STORE_READONLY))
- .setTransactional(agentConf.getBoolean(
- AgentConstants.AGENT_LOCAL_STORE_TRANSACTIONAL,
- AgentConstants.DEFAULT_AGENT_LOCAL_STORE_TRANSACTIONAL));
- }
-
- /**
- * init local bdb path and get it.
- * @return local path.
- */
- private File tryToInitAndGetPath() {
- String storePath = agentConf.get(
- AgentConstants.AGENT_LOCAL_STORE_PATH, AgentConstants.DEFAULT_AGENT_LOCAL_STORE_PATH);
- String parentPath = agentConf.get(
- AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
- return AgentUtils.makeDirsIfNotExist(storePath, parentPath);
- }
-
- /**
- * init env by config
- *
- * @return env config
- */
- private Environment initEnv() {
- EnvironmentConfig envConfig = new EnvironmentConfig()
- .setReadOnly(agentConf.getBoolean(
- AgentConstants.AGENT_LOCAL_STORE_READONLY, AgentConstants.DEFAULT_AGENT_LOCAL_STORE_READONLY))
- .setAllowCreate(!agentConf.getBoolean(
- AgentConstants.AGENT_LOCAL_STORE_READONLY, AgentConstants.DEFAULT_AGENT_LOCAL_STORE_READONLY))
- .setTransactional(agentConf.getBoolean(
- AgentConstants.AGENT_LOCAL_STORE_TRANSACTIONAL,
- AgentConstants.DEFAULT_AGENT_LOCAL_STORE_TRANSACTIONAL))
- .setLockTimeout(
- agentConf.getInt(AgentConstants.AGENT_LOCAL_STORE_LOCK_TIMEOUT,
- AgentConstants.DEFAULT_AGENT_LOCAL_STORE_LOCK_TIMEOUT),
- TimeUnit.MILLISECONDS);
- envConfig.setTxnNoSyncVoid(agentConf.getBoolean(
- AgentConstants.AGENT_LOCAL_STORE_NO_SYNC_VOID,
- AgentConstants.DEFAULT_AGENT_LOCAL_STORE_NO_SYNC_VOID));
- envConfig.setTxnWriteNoSyncVoid(agentConf.getBoolean(
- AgentConstants.AGENT_LOCAL_STORE_WRITE_NO_SYNC_VOID,
- AgentConstants.DEFAULT_AGENT_LOCAL_STORE_WRITE_NO_SYNC_VOID));
- return new Environment(tryToInitAndGetPath(), envConfig);
- }
-
- @Override
- public KeyValueEntity get(String key) {
- requireNonNull(key);
- return primaryIndex.get(key);
- }
-
- @Override
- public CommandEntity getCommand(String commandId) {
- requireNonNull(commandId);
- return commandPrimaryIndex.get(commandId);
- }
-
- @Override
- public CommandEntity putCommand(CommandEntity entity) {
- requireNonNull(entity);
- return commandPrimaryIndex.put(entity);
- }
-
- @Override
- public void set(KeyValueEntity entity) {
- requireNonNull(entity);
- primaryIndex.put(entity);
- }
-
- @Override
- public KeyValueEntity put(KeyValueEntity entity) {
- requireNonNull(entity);
- return primaryIndex.put(entity);
- }
-
- @Override
- public KeyValueEntity remove(String key) {
- requireNonNull(key);
- KeyValueEntity entity = primaryIndex.get(key);
- primaryIndex.delete(key);
- return entity;
- }
-
- @Override
- public List<KeyValueEntity> search(StateSearchKey searchKey) {
- requireNonNull(searchKey);
- List<KeyValueEntity> ret = new ArrayList<>();
- try (EntityCursor<KeyValueEntity> children = secondaryIndex.subIndex(searchKey)
- .entities()) {
- for (KeyValueEntity entity : children) {
- ret.add(entity);
- }
- }
- return ret;
- }
-
- @Override
- public List<CommandEntity> searchCommands(boolean isAcked) {
- requireNonNull(isAcked);
- List<CommandEntity> ret = new ArrayList<>();
- try (EntityCursor<CommandEntity> children = commandSecondaryIndex.subIndex(isAcked)
- .entities()) {
- for (CommandEntity entity : children) {
- ret.add(entity);
- }
- }
- return ret;
- }
-
- @Override
- public KeyValueEntity searchOne(StateSearchKey searchKey) {
- requireNonNull(searchKey);
- return secondaryIndex.get(searchKey);
- }
-
- @Override
- public KeyValueEntity searchOne(String fileName) {
- requireNonNull(fileName);
- return fileNameSecondaryIndex.get(fileName);
- }
-
- @Override
- public List<KeyValueEntity> findAll(String prefix) {
- requireNonNull(prefix);
- List<KeyValueEntity> ret = new ArrayList<>();
- try (EntityCursor<KeyValueEntity> children = primaryIndex.entities()) {
- for (KeyValueEntity entity : children) {
- if (entity.getKey().startsWith(prefix)) {
- ret.add(entity);
- }
- }
- }
- return ret;
- }
-
- @Override
- public void close() {
- jobStore.close();
- }
-}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
index 930f8de..8c65599 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
@@ -17,29 +17,21 @@
package org.apache.inlong.agent.db;
-import com.sleepycat.persist.model.Entity;
-import com.sleepycat.persist.model.PrimaryKey;
-import com.sleepycat.persist.model.Relationship;
-import com.sleepycat.persist.model.SecondaryKey;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.TriggerProfile;
/**
* key value entity. key is string and value is a json
*/
-@Entity(version = 1)
public class KeyValueEntity {
- @PrimaryKey
private String key;
- @SecondaryKey(relate = Relationship.MANY_TO_ONE)
private StateSearchKey stateSearchKey;
/**
* stores the file name that the jsonValue refers
*/
- @SecondaryKey(relate = Relationship.MANY_TO_ONE)
private String fileName;
private String jsonValue;
diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
deleted file mode 100755
index fa282f3..0000000
--- a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.db;
-
-import org.apache.inlong.agent.AgentBaseTestsHelper;
-import org.apache.inlong.common.db.CommandEntity;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.List;
-
-public class TestBerkeleyDBImp {
-
- private static BerkeleyDbImp db;
- private static AgentBaseTestsHelper helper;
-
- @BeforeClass
- public static void setup() throws Exception {
- helper = new AgentBaseTestsHelper(TestBerkeleyDBImp.class.getName()).setupAgentHome();
- db = new BerkeleyDbImp();
- }
-
- @AfterClass
- public static void teardown() {
- db.close();
- helper.teardownAgentHome();
- }
-
- @Test
- public void testKeyValueDB() {
- KeyValueEntity entity = new KeyValueEntity("test1", "testA", "test");
- db.put(entity);
- KeyValueEntity ret = db.get("test1");
- Assert.assertEquals("test1", ret.getKey());
- Assert.assertEquals("testA", ret.getJsonValue());
-
- db.remove("test1");
- ret = db.get("test1");
- Assert.assertNull(ret);
-
- StateSearchKey keys = StateSearchKey.SUCCESS;
- KeyValueEntity entity1 = new KeyValueEntity("test2", "testA", "test");
- entity.setStateSearchKey(keys);
- entity1.setStateSearchKey(keys);
-
- db.set(entity);
- db.set(entity1);
-
- List<KeyValueEntity> entityList = db.search(keys);
- for (KeyValueEntity keyValueEntity : entityList) {
- Assert.assertEquals(StateSearchKey.SUCCESS, keyValueEntity.getStateSearchKey());
- }
- Assert.assertEquals(2, entityList.size());
-
- entity.setJsonValue("testC");
- KeyValueEntity oldEntity = db.put(entity);
- Assert.assertEquals("testA", oldEntity.getJsonValue());
-
- KeyValueEntity newEntity = db.get("test1");
- Assert.assertEquals("testC", newEntity.getJsonValue());
-
- }
-
- @Test
- public void testCommandDb() {
- CommandEntity commandEntity = new CommandEntity();
- commandEntity.setId("1");
- commandEntity.setCommandResult(0);
- commandEntity.setAcked(false);
- commandEntity.setTaskId(1);
- commandEntity.setVersion(1);
- db.putCommand(commandEntity);
- CommandEntity command = db.getCommand("1");
- Assert.assertEquals("1", command.getId());
- List<CommandEntity> commandEntities = db.searchCommands(false);
- Assert.assertEquals("1", commandEntities.get(0).getId());
- }
-
- @Test
- public void testSecondaryIndex() {
- KeyValueEntity entity = new KeyValueEntity("searchKey1", "searchResult1", "test");
- db.put(entity);
- KeyValueEntity entity1 = new KeyValueEntity("searchKey2", "searchResult2", "test");
- db.put(entity1);
- KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
- Assert.assertEquals("searchKey1", entityResult.getKey());
-
- entityResult = db.searchOne(StateSearchKey.ACCEPTED);
- Assert.assertEquals("searchKey1", entityResult.getKey());
-
- entityResult.setStateSearchKey(StateSearchKey.RUNNING);
- db.put(entityResult);
-
- entityResult = db.searchOne(StateSearchKey.ACCEPTED);
- Assert.assertEquals("searchKey2", entityResult.getKey());
-
- List<KeyValueEntity> entityList = db.search(StateSearchKey.ACCEPTED);
- Assert.assertEquals(1, entityList.size());
-
- entityList = db.search(StateSearchKey.FAILED);
- Assert.assertEquals(0, entityList.size());
- }
-
- @Test
- public void testDeleteItem() {
- KeyValueEntity entity = new KeyValueEntity("searchKey1", "searchResult1", "test");
- db.put(entity);
- KeyValueEntity entityResult1 = db.remove("searchKey1");
- KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
- Assert.assertEquals("searchKey1", entityResult1.getKey());
- Assert.assertNull(entityResult);
- }
-
- @Test
- public void testFileNameSearch() {
- KeyValueEntity entity = new KeyValueEntity("searchKey1", "searchResult1", "test");
- db.put(entity);
- KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
- Assert.assertEquals("searchKey1", entityResult.getKey());
- }
-
-}
diff --git a/inlong-agent/pom.xml b/inlong-agent/pom.xml
index d073823..235c175 100644
--- a/inlong-agent/pom.xml
+++ b/inlong-agent/pom.xml
@@ -63,7 +63,6 @@
<protobuf.version>2.5.0</protobuf.version>
<httpclient.version>4.5.13</httpclient.version>
<fastjson.version>1.2.68</fastjson.version>
- <sleepycat.version>7.3.7</sleepycat.version>
<hippoclient.version>2.0.5</hippoclient.version>
<jetty.version>9.4.44.v20210927</jetty.version>
<rocksdb.version>6.14.6</rocksdb.version>
@@ -76,20 +75,6 @@
<springcontext.version>5.3.13</springcontext.version>
</properties>
- <repositories>
- <repository>
- <id>berkeleydb-je</id>
- <name>berkeleydb-je</name>
- <url>https://download.dcache.org/nexus/repository/berkeleydb-je/</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
-
<dependencyManagement>
<dependencies>
<dependency>
@@ -197,11 +182,6 @@
<version>${oro.version}</version>
</dependency>
<dependency>
- <groupId>com.sleepycat</groupId>
- <artifactId>je</artifactId>
- <version>${sleepycat.version}</version>
- </dependency>
- <dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.8.2</version>
diff --git a/inlong-common/pom.xml b/inlong-common/pom.xml
index 52a9534..06ddfa6 100644
--- a/inlong-common/pom.xml
+++ b/inlong-common/pom.xml
@@ -34,7 +34,6 @@
<snappy-java.version>1.1.8.4</snappy-java.version>
<jackson.version>2.13.1</jackson.version>
<slf4j-api.version>1.7.36</slf4j-api.version>
- <sleepycat.version>7.3.7</sleepycat.version>
<simpleclient_httpserver.version>0.14.1</simpleclient_httpserver.version>
<common.lang.version>2.4</common.lang.version>
<commons-lang3.version>3.3.2</commons-lang3.version>
@@ -81,10 +80,6 @@
<artifactId>lombok</artifactId>
</dependency>
<dependency>
- <groupId>com.sleepycat</groupId>
- <artifactId>je</artifactId>
- </dependency>
- <dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>${common.lang.version}</version>
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java b/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java
index 8ad1edc..c85c3af 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java
@@ -17,10 +17,6 @@
package org.apache.inlong.common.db;
-import com.sleepycat.persist.model.Entity;
-import com.sleepycat.persist.model.PrimaryKey;
-import com.sleepycat.persist.model.Relationship;
-import com.sleepycat.persist.model.SecondaryKey;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -28,16 +24,13 @@ import lombok.NoArgsConstructor;
/**
* The entity of task command, used for Agent to interact with Manager and BDB.
*/
-@Entity(version = 1)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CommandEntity {
- @PrimaryKey
private String id;
private int commandResult;
- @SecondaryKey(relate = Relationship.MANY_TO_ONE)
private boolean isAcked;
private Integer taskId;
/**
diff --git a/pom.xml b/pom.xml
index 08ba32b..17613e1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,7 +116,6 @@
<lombok.version>1.18.22</lombok.version>
<logback.version>1.2.10</logback.version>
<junit.version>4.13.2</junit.version>
- <je.version>7.3.7</je.version>
<hadoop.version>2.10.1</hadoop.version>
<postgres.version>42.2.25</postgres.version>
<netty.version>3.10.6.Final</netty.version>
@@ -150,11 +149,6 @@
<version>${logback.version}</version>
</dependency>
<dependency>
- <groupId>com.sleepycat</groupId>
- <artifactId>je</artifactId>
- <version>${je.version}</version>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
@@ -320,18 +314,4 @@
</plugins>
</build>
- <repositories>
- <repository>
- <id>berkeleydb-je</id>
- <name>berkeleydb-je</name>
- <url>https://download.dcache.org/nexus/repository/berkeleydb-je/</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
-
</project>