You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/25 10:07:40 UTC

[incubator-inlong] branch master updated: [INLONG-3298][Agent] Remove dbd implementation (#3358)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d52ea13  [INLONG-3298][Agent] Remove dbd implementation (#3358)
d52ea13 is described below

commit d52ea13cda5b86215b374fd743e64a2e63047e3d
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Fri Mar 25 18:07:33 2022 +0800

    [INLONG-3298][Agent] Remove dbd implementation (#3358)
---
 inlong-agent/agent-common/pom.xml                  |   4 -
 .../org/apache/inlong/agent/db/BerkeleyDbImp.java  | 227 ---------------------
 .../org/apache/inlong/agent/db/KeyValueEntity.java |   8 -
 .../apache/inlong/agent/db/TestBerkeleyDBImp.java  | 139 -------------
 inlong-agent/pom.xml                               |  20 --
 inlong-common/pom.xml                              |   5 -
 .../org/apache/inlong/common/db/CommandEntity.java |   7 -
 pom.xml                                            |  20 --
 8 files changed, 430 deletions(-)

diff --git a/inlong-agent/agent-common/pom.xml b/inlong-agent/agent-common/pom.xml
index 2001f08..6516cfe 100755
--- a/inlong-agent/agent-common/pom.xml
+++ b/inlong-agent/agent-common/pom.xml
@@ -40,10 +40,6 @@
             <groupId>commons-dbutils</groupId>
         </dependency>
         <dependency>
-            <artifactId>je</artifactId>
-            <groupId>com.sleepycat</groupId>
-        </dependency>
-        <dependency>
             <artifactId>commons-lang3</artifactId>
             <groupId>org.apache.commons</groupId>
         </dependency>
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/BerkeleyDbImp.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/BerkeleyDbImp.java
deleted file mode 100644
index e4f7bec..0000000
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/BerkeleyDbImp.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.db;
-
-import static java.util.Objects.requireNonNull;
-
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.persist.EntityCursor;
-import com.sleepycat.persist.EntityStore;
-import com.sleepycat.persist.PrimaryIndex;
-import com.sleepycat.persist.SecondaryIndex;
-import com.sleepycat.persist.StoreConfig;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.constant.CommonConstants;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.common.db.CommandEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * DB implement based on berkeley db.
- */
-public class BerkeleyDbImp implements Db {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(BerkeleyDbImp.class);
-
-    private final EntityStore jobStore;
-    private final EntityStore commandStore;
-    private final PrimaryIndex<String, KeyValueEntity> primaryIndex;
-    private final SecondaryIndex<StateSearchKey, String, KeyValueEntity> secondaryIndex;
-    private final PrimaryIndex<String, CommandEntity> commandPrimaryIndex;
-    private final SecondaryIndex<String, String, KeyValueEntity> fileNameSecondaryIndex;
-    private final SecondaryIndex<Boolean, String, CommandEntity> commandSecondaryIndex;
-
-    private final AgentConfiguration agentConf;
-
-    public BerkeleyDbImp() {
-        this.agentConf = AgentConfiguration.getAgentConf();
-        StoreConfig storeConfig = initStoreConfig();
-        Environment environment = initEnv();
-        String instanceName = agentConf.get(
-            AgentConstants.AGENT_DB_INSTANCE_NAME, AgentConstants.DEFAULT_AGENT_DB_INSTANCE_NAME);
-        this.jobStore = new EntityStore(environment, instanceName, storeConfig);
-        this.commandStore = new EntityStore(environment, CommonConstants.COMMAND_STORE_INSTANCE_NAME, storeConfig);
-        commandPrimaryIndex = this.commandStore.getPrimaryIndex(String.class, CommandEntity.class);
-        commandSecondaryIndex = commandStore.getSecondaryIndex(
-            commandPrimaryIndex, Boolean.class, "isAcked");
-        primaryIndex = this.jobStore.getPrimaryIndex(String.class, KeyValueEntity.class);
-        secondaryIndex = this.jobStore.getSecondaryIndex(primaryIndex, StateSearchKey.class,
-                "stateSearchKey");
-        fileNameSecondaryIndex = this.jobStore.getSecondaryIndex(primaryIndex,
-            String.class, "fileName");
-    }
-
-    /**
-     * init store by config
-     *
-     * @return store config
-     */
-    private StoreConfig initStoreConfig() {
-        return new StoreConfig()
-                .setReadOnly(agentConf.getBoolean(
-                        AgentConstants.AGENT_LOCAL_STORE_READONLY,
-                    AgentConstants.DEFAULT_AGENT_LOCAL_STORE_READONLY))
-                .setAllowCreate(!agentConf.getBoolean(
-                        AgentConstants.AGENT_LOCAL_STORE_READONLY,
-                    AgentConstants.DEFAULT_AGENT_LOCAL_STORE_READONLY))
-                .setTransactional(agentConf.getBoolean(
-                        AgentConstants.AGENT_LOCAL_STORE_TRANSACTIONAL,
-                    AgentConstants.DEFAULT_AGENT_LOCAL_STORE_TRANSACTIONAL));
-    }
-
-    /**
-     * init local bdb path and get it.
-     * @return local path.
-     */
-    private File tryToInitAndGetPath() {
-        String storePath = agentConf.get(
-            AgentConstants.AGENT_LOCAL_STORE_PATH, AgentConstants.DEFAULT_AGENT_LOCAL_STORE_PATH);
-        String parentPath = agentConf.get(
-            AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
-        return AgentUtils.makeDirsIfNotExist(storePath, parentPath);
-    }
-
-    /**
-     * init env by config
-     *
-     * @return env config
-     */
-    private Environment initEnv() {
-        EnvironmentConfig envConfig = new EnvironmentConfig()
-                .setReadOnly(agentConf.getBoolean(
-                        AgentConstants.AGENT_LOCAL_STORE_READONLY, AgentConstants.DEFAULT_AGENT_LOCAL_STORE_READONLY))
-                .setAllowCreate(!agentConf.getBoolean(
-                        AgentConstants.AGENT_LOCAL_STORE_READONLY, AgentConstants.DEFAULT_AGENT_LOCAL_STORE_READONLY))
-                .setTransactional(agentConf.getBoolean(
-                        AgentConstants.AGENT_LOCAL_STORE_TRANSACTIONAL,
-                    AgentConstants.DEFAULT_AGENT_LOCAL_STORE_TRANSACTIONAL))
-                .setLockTimeout(
-                        agentConf.getInt(AgentConstants.AGENT_LOCAL_STORE_LOCK_TIMEOUT,
-                                AgentConstants.DEFAULT_AGENT_LOCAL_STORE_LOCK_TIMEOUT),
-                        TimeUnit.MILLISECONDS);
-        envConfig.setTxnNoSyncVoid(agentConf.getBoolean(
-            AgentConstants.AGENT_LOCAL_STORE_NO_SYNC_VOID,
-                AgentConstants.DEFAULT_AGENT_LOCAL_STORE_NO_SYNC_VOID));
-        envConfig.setTxnWriteNoSyncVoid(agentConf.getBoolean(
-            AgentConstants.AGENT_LOCAL_STORE_WRITE_NO_SYNC_VOID,
-                AgentConstants.DEFAULT_AGENT_LOCAL_STORE_WRITE_NO_SYNC_VOID));
-        return new Environment(tryToInitAndGetPath(), envConfig);
-    }
-
-    @Override
-    public KeyValueEntity get(String key) {
-        requireNonNull(key);
-        return primaryIndex.get(key);
-    }
-
-    @Override
-    public CommandEntity getCommand(String commandId) {
-        requireNonNull(commandId);
-        return commandPrimaryIndex.get(commandId);
-    }
-
-    @Override
-    public CommandEntity putCommand(CommandEntity entity) {
-        requireNonNull(entity);
-        return commandPrimaryIndex.put(entity);
-    }
-
-    @Override
-    public void set(KeyValueEntity entity) {
-        requireNonNull(entity);
-        primaryIndex.put(entity);
-    }
-
-    @Override
-    public KeyValueEntity put(KeyValueEntity entity) {
-        requireNonNull(entity);
-        return primaryIndex.put(entity);
-    }
-
-    @Override
-    public KeyValueEntity remove(String key) {
-        requireNonNull(key);
-        KeyValueEntity entity = primaryIndex.get(key);
-        primaryIndex.delete(key);
-        return entity;
-    }
-
-    @Override
-    public List<KeyValueEntity> search(StateSearchKey searchKey) {
-        requireNonNull(searchKey);
-        List<KeyValueEntity> ret = new ArrayList<>();
-        try (EntityCursor<KeyValueEntity> children = secondaryIndex.subIndex(searchKey)
-                .entities()) {
-            for (KeyValueEntity entity : children) {
-                ret.add(entity);
-            }
-        }
-        return ret;
-    }
-
-    @Override
-    public List<CommandEntity> searchCommands(boolean isAcked) {
-        requireNonNull(isAcked);
-        List<CommandEntity> ret = new ArrayList<>();
-        try (EntityCursor<CommandEntity> children = commandSecondaryIndex.subIndex(isAcked)
-            .entities()) {
-            for (CommandEntity entity : children) {
-                ret.add(entity);
-            }
-        }
-        return ret;
-    }
-
-    @Override
-    public KeyValueEntity searchOne(StateSearchKey searchKey) {
-        requireNonNull(searchKey);
-        return secondaryIndex.get(searchKey);
-    }
-
-    @Override
-    public KeyValueEntity searchOne(String fileName) {
-        requireNonNull(fileName);
-        return fileNameSecondaryIndex.get(fileName);
-    }
-
-    @Override
-    public List<KeyValueEntity> findAll(String prefix) {
-        requireNonNull(prefix);
-        List<KeyValueEntity> ret = new ArrayList<>();
-        try (EntityCursor<KeyValueEntity> children = primaryIndex.entities()) {
-            for (KeyValueEntity entity : children) {
-                if (entity.getKey().startsWith(prefix)) {
-                    ret.add(entity);
-                }
-            }
-        }
-        return ret;
-    }
-
-    @Override
-    public void close() {
-        jobStore.close();
-    }
-}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
index 930f8de..8c65599 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
@@ -17,29 +17,21 @@
 
 package org.apache.inlong.agent.db;
 
-import com.sleepycat.persist.model.Entity;
-import com.sleepycat.persist.model.PrimaryKey;
-import com.sleepycat.persist.model.Relationship;
-import com.sleepycat.persist.model.SecondaryKey;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.conf.TriggerProfile;
 
 /**
  * key value entity. key is string and value is a json
  */
-@Entity(version = 1)
 public class KeyValueEntity {
 
-    @PrimaryKey
     private String key;
 
-    @SecondaryKey(relate = Relationship.MANY_TO_ONE)
     private StateSearchKey stateSearchKey;
 
     /**
      * stores the file name that the jsonValue refers
      */
-    @SecondaryKey(relate = Relationship.MANY_TO_ONE)
     private String fileName;
 
     private String jsonValue;
diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
deleted file mode 100755
index fa282f3..0000000
--- a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.db;
-
-import org.apache.inlong.agent.AgentBaseTestsHelper;
-import org.apache.inlong.common.db.CommandEntity;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.List;
-
-public class TestBerkeleyDBImp {
-
-    private static BerkeleyDbImp db;
-    private static AgentBaseTestsHelper helper;
-
-    @BeforeClass
-    public static void setup() throws Exception {
-        helper = new AgentBaseTestsHelper(TestBerkeleyDBImp.class.getName()).setupAgentHome();
-        db = new BerkeleyDbImp();
-    }
-
-    @AfterClass
-    public static void teardown() {
-        db.close();
-        helper.teardownAgentHome();
-    }
-
-    @Test
-    public void testKeyValueDB() {
-        KeyValueEntity entity = new KeyValueEntity("test1", "testA", "test");
-        db.put(entity);
-        KeyValueEntity ret = db.get("test1");
-        Assert.assertEquals("test1", ret.getKey());
-        Assert.assertEquals("testA", ret.getJsonValue());
-
-        db.remove("test1");
-        ret = db.get("test1");
-        Assert.assertNull(ret);
-
-        StateSearchKey keys = StateSearchKey.SUCCESS;
-        KeyValueEntity entity1 = new KeyValueEntity("test2", "testA", "test");
-        entity.setStateSearchKey(keys);
-        entity1.setStateSearchKey(keys);
-
-        db.set(entity);
-        db.set(entity1);
-
-        List<KeyValueEntity> entityList = db.search(keys);
-        for (KeyValueEntity keyValueEntity : entityList) {
-            Assert.assertEquals(StateSearchKey.SUCCESS, keyValueEntity.getStateSearchKey());
-        }
-        Assert.assertEquals(2, entityList.size());
-
-        entity.setJsonValue("testC");
-        KeyValueEntity oldEntity = db.put(entity);
-        Assert.assertEquals("testA", oldEntity.getJsonValue());
-
-        KeyValueEntity newEntity = db.get("test1");
-        Assert.assertEquals("testC", newEntity.getJsonValue());
-
-    }
-
-    @Test
-    public void testCommandDb() {
-        CommandEntity commandEntity = new CommandEntity();
-        commandEntity.setId("1");
-        commandEntity.setCommandResult(0);
-        commandEntity.setAcked(false);
-        commandEntity.setTaskId(1);
-        commandEntity.setVersion(1);
-        db.putCommand(commandEntity);
-        CommandEntity command = db.getCommand("1");
-        Assert.assertEquals("1", command.getId());
-        List<CommandEntity> commandEntities = db.searchCommands(false);
-        Assert.assertEquals("1", commandEntities.get(0).getId());
-    }
-
-    @Test
-    public void testSecondaryIndex() {
-        KeyValueEntity entity = new KeyValueEntity("searchKey1", "searchResult1", "test");
-        db.put(entity);
-        KeyValueEntity entity1 = new KeyValueEntity("searchKey2", "searchResult2", "test");
-        db.put(entity1);
-        KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
-        Assert.assertEquals("searchKey1", entityResult.getKey());
-
-        entityResult = db.searchOne(StateSearchKey.ACCEPTED);
-        Assert.assertEquals("searchKey1", entityResult.getKey());
-
-        entityResult.setStateSearchKey(StateSearchKey.RUNNING);
-        db.put(entityResult);
-
-        entityResult = db.searchOne(StateSearchKey.ACCEPTED);
-        Assert.assertEquals("searchKey2", entityResult.getKey());
-
-        List<KeyValueEntity> entityList = db.search(StateSearchKey.ACCEPTED);
-        Assert.assertEquals(1, entityList.size());
-
-        entityList = db.search(StateSearchKey.FAILED);
-        Assert.assertEquals(0, entityList.size());
-    }
-
-    @Test
-    public void testDeleteItem() {
-        KeyValueEntity entity = new KeyValueEntity("searchKey1", "searchResult1", "test");
-        db.put(entity);
-        KeyValueEntity entityResult1 = db.remove("searchKey1");
-        KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
-        Assert.assertEquals("searchKey1", entityResult1.getKey());
-        Assert.assertNull(entityResult);
-    }
-
-    @Test
-    public void testFileNameSearch() {
-        KeyValueEntity entity = new KeyValueEntity("searchKey1", "searchResult1", "test");
-        db.put(entity);
-        KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
-        Assert.assertEquals("searchKey1", entityResult.getKey());
-    }
-
-}
diff --git a/inlong-agent/pom.xml b/inlong-agent/pom.xml
index d073823..235c175 100644
--- a/inlong-agent/pom.xml
+++ b/inlong-agent/pom.xml
@@ -63,7 +63,6 @@
         <protobuf.version>2.5.0</protobuf.version>
         <httpclient.version>4.5.13</httpclient.version>
         <fastjson.version>1.2.68</fastjson.version>
-        <sleepycat.version>7.3.7</sleepycat.version>
         <hippoclient.version>2.0.5</hippoclient.version>
         <jetty.version>9.4.44.v20210927</jetty.version>
         <rocksdb.version>6.14.6</rocksdb.version>
@@ -76,20 +75,6 @@
         <springcontext.version>5.3.13</springcontext.version>
     </properties>
 
-    <repositories>
-        <repository>
-            <id>berkeleydb-je</id>
-            <name>berkeleydb-je</name>
-            <url>https://download.dcache.org/nexus/repository/berkeleydb-je/</url>
-            <releases>
-                <enabled>true</enabled>
-            </releases>
-            <snapshots>
-                <enabled>false</enabled>
-            </snapshots>
-        </repository>
-    </repositories>
-
     <dependencyManagement>
         <dependencies>
             <dependency>
@@ -197,11 +182,6 @@
                 <version>${oro.version}</version>
             </dependency>
             <dependency>
-                <groupId>com.sleepycat</groupId>
-                <artifactId>je</artifactId>
-                <version>${sleepycat.version}</version>
-            </dependency>
-            <dependency>
                 <groupId>joda-time</groupId>
                 <artifactId>joda-time</artifactId>
                 <version>2.8.2</version>
diff --git a/inlong-common/pom.xml b/inlong-common/pom.xml
index 52a9534..06ddfa6 100644
--- a/inlong-common/pom.xml
+++ b/inlong-common/pom.xml
@@ -34,7 +34,6 @@
         <snappy-java.version>1.1.8.4</snappy-java.version>
         <jackson.version>2.13.1</jackson.version>
         <slf4j-api.version>1.7.36</slf4j-api.version>
-        <sleepycat.version>7.3.7</sleepycat.version>
         <simpleclient_httpserver.version>0.14.1</simpleclient_httpserver.version>
         <common.lang.version>2.4</common.lang.version>
         <commons-lang3.version>3.3.2</commons-lang3.version>
@@ -81,10 +80,6 @@
             <artifactId>lombok</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.sleepycat</groupId>
-            <artifactId>je</artifactId>
-        </dependency>
-        <dependency>
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
             <version>${common.lang.version}</version>
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java b/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java
index 8ad1edc..c85c3af 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java
@@ -17,10 +17,6 @@
 
 package org.apache.inlong.common.db;
 
-import com.sleepycat.persist.model.Entity;
-import com.sleepycat.persist.model.PrimaryKey;
-import com.sleepycat.persist.model.Relationship;
-import com.sleepycat.persist.model.SecondaryKey;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -28,16 +24,13 @@ import lombok.NoArgsConstructor;
 /**
  * The entity of task command, used for Agent to interact with Manager and BDB.
  */
-@Entity(version = 1)
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
 public class CommandEntity {
 
-    @PrimaryKey
     private String id;
     private int commandResult;
-    @SecondaryKey(relate = Relationship.MANY_TO_ONE)
     private boolean isAcked;
     private Integer taskId;
     /**
diff --git a/pom.xml b/pom.xml
index 08ba32b..17613e1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,7 +116,6 @@
         <lombok.version>1.18.22</lombok.version>
         <logback.version>1.2.10</logback.version>
         <junit.version>4.13.2</junit.version>
-        <je.version>7.3.7</je.version>
         <hadoop.version>2.10.1</hadoop.version>
         <postgres.version>42.2.25</postgres.version>
         <netty.version>3.10.6.Final</netty.version>
@@ -150,11 +149,6 @@
                 <version>${logback.version}</version>
             </dependency>
             <dependency>
-                <groupId>com.sleepycat</groupId>
-                <artifactId>je</artifactId>
-                <version>${je.version}</version>
-            </dependency>
-            <dependency>
                 <groupId>junit</groupId>
                 <artifactId>junit</artifactId>
                 <version>${junit.version}</version>
@@ -320,18 +314,4 @@
         </plugins>
     </build>
 
-    <repositories>
-        <repository>
-            <id>berkeleydb-je</id>
-            <name>berkeleydb-je</name>
-            <url>https://download.dcache.org/nexus/repository/berkeleydb-je/</url>
-            <releases>
-                <enabled>true</enabled>
-            </releases>
-            <snapshots>
-                <enabled>false</enabled>
-            </snapshots>
-        </repository>
-    </repositories>
-
 </project>