You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/03/21 01:39:03 UTC

[incubator-inlong] branch master updated: [INLONG-3045][Agent] Add rocksDb implementation (#3249)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new df6c870  [INLONG-3045][Agent] Add rocksDb implementation (#3249)
df6c870 is described below

commit df6c870256f8008b5cff36c56ba09ebc4dcdf99b
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Mon Mar 21 09:38:58 2022 +0800

    [INLONG-3045][Agent] Add rocksDb implementation (#3249)
---
 .../org/apache/inlong/agent/db/KeyValueEntity.java |   4 +
 .../org/apache/inlong/agent/db/RocksDbImp.java     | 216 +++++++++++++++++++--
 .../org/apache/inlong/agent/db/TestRocksDbImp.java | 106 ++++++++++
 3 files changed, 311 insertions(+), 15 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
index ab0ca19..930f8de 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
@@ -59,6 +59,10 @@ public class KeyValueEntity {
         return key;
     }
 
+    public String getFileName() {
+        return fileName;
+    }
+
     public StateSearchKey getStateSearchKey() {
         return stateSearchKey;
     }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
index 617667f..0384835 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
@@ -17,108 +17,294 @@
 
 package org.apache.inlong.agent.db;
 
+import static java.util.Objects.requireNonNull;
+
+import com.google.gson.Gson;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.common.db.CommandEntity;
+import org.rocksdb.AbstractImmutableNativeReference;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Statistics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * DB implement based on rocks db.
- * TODO: this is low priority.
  */
 public class RocksDbImp implements Db {
     private static final Logger LOGGER = LoggerFactory.getLogger(RocksDbImp.class);
+    private static final Gson GSON = new Gson();
 
     private final AgentConfiguration conf;
     private final RocksDB db;
+    private ConcurrentHashMap<String, ColumnFamilyHandle> columnHandlesMap;
+    private ConcurrentHashMap<String, ColumnFamilyDescriptor> columnDescriptorMap;
+    private final String commandFamilyName = "command";
+    private final String defaultFamilyName = "default";
+    private String storePath;
 
     public RocksDbImp() {
         // init rocks db
         this.conf = AgentConfiguration.getAgentConf();
         this.db = initEnv();
+        // add a command column family
+        addColumnFamily(commandFamilyName);
     }
 
     private RocksDB initEnv() {
-        String storePath = conf.get(
+        String configPath = conf.get(
             AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH);
         String parentPath = conf.get(AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
-        File finalPath = new File(parentPath, storePath);
+        File finalPath = new File(parentPath, configPath);
+        storePath = finalPath.getAbsolutePath();
         RocksDB.loadLibrary();
         final Options options = new Options();
         options.setCreateIfMissing(true);
         try {
             boolean result = finalPath.mkdirs();
             LOGGER.info("create directory {}, result is {}", finalPath, result);
-            return RocksDB.open(options, finalPath.getAbsolutePath());
+
+            columnHandlesMap = new ConcurrentHashMap<>();
+            columnDescriptorMap = new ConcurrentHashMap<>();
+
+            final DBOptions dbOptions = new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true)
+                .setWalDir(finalPath.getAbsolutePath()).setStatistics(new Statistics());
+
+            final List<ColumnFamilyDescriptor> managedColumnFamilies = loadManagedColumnFamilies(dbOptions);
+            final List<ColumnFamilyHandle> managedHandles = new ArrayList<>();
+
+            RocksDB rocksDB = RocksDB.open(dbOptions,
+                finalPath.getAbsolutePath(), managedColumnFamilies, managedHandles);
+
+            for (int index = 0; index < managedHandles.size(); index++) {
+                ColumnFamilyHandle handle = managedHandles.get(index);
+                ColumnFamilyDescriptor descriptor = managedColumnFamilies.get(index);
+                String familyNameFromHandle = new String(handle.getName());
+                String familyNameFromDescriptor = new String(descriptor.getName());
+
+                columnHandlesMap.put(familyNameFromHandle, handle);
+                columnDescriptorMap.put(familyNameFromDescriptor, descriptor);
+            }
+            return rocksDB;
         } catch (Exception ex) {
-            // cannot create local path, stop running.
+            // db is vital.
+            LOGGER.error("init rocksdb error, please check", ex);
             throw new RuntimeException(ex);
         }
     }
 
+    private List<ColumnFamilyDescriptor> loadManagedColumnFamilies(DBOptions dbOptions) throws RocksDBException {
+        final List<ColumnFamilyDescriptor> managedColumnFamilies = new ArrayList<>();
+        final Options options = new Options(dbOptions, new ColumnFamilyOptions());
+        List<byte[]> existing = RocksDB.listColumnFamilies(options, storePath);
+
+        if (existing.isEmpty()) {
+            LOGGER.info("no previous column family found, use default");
+            managedColumnFamilies.add(getColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
+        } else {
+            LOGGER.info("loading column families :" + existing.stream().map(String::new).collect(Collectors.toList()));
+            managedColumnFamilies
+                .addAll(existing.stream().map(RocksDbImp::getColumnFamilyDescriptor).collect(Collectors.toList()));
+        }
+        return managedColumnFamilies;
+    }
+
+    private static ColumnFamilyDescriptor getColumnFamilyDescriptor(byte[] columnFamilyName) {
+        return new ColumnFamilyDescriptor(columnFamilyName, new ColumnFamilyOptions());
+    }
+
+    public void addColumnFamily(String columnFamilyName) {
+        columnDescriptorMap.computeIfAbsent(columnFamilyName, colFamilyName -> {
+            try {
+                ColumnFamilyDescriptor descriptor = getColumnFamilyDescriptor(colFamilyName.getBytes());
+                ColumnFamilyHandle handle = db.createColumnFamily(descriptor);
+                columnHandlesMap.put(colFamilyName, handle);
+                return descriptor;
+            } catch (RocksDBException e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
     @Override
     public KeyValueEntity get(String key) {
-        return null;
+        requireNonNull(key);
+        try {
+            byte[] bytes =
+                db.get(columnHandlesMap.get(defaultFamilyName), key.getBytes());
+            return bytes == null ? null :
+                GSON.fromJson(new String(bytes), KeyValueEntity.class);
+        } catch (Exception e) {
+            throw new RuntimeException("get key value entity error", e);
+        }
     }
 
     @Override
     public CommandEntity getCommand(String commandId) {
-        return null;
+        try {
+            byte[] bytes = db
+                .get(columnHandlesMap.get(commandFamilyName), commandId.getBytes());
+            return bytes == null ? null :
+                GSON.fromJson(new String(bytes), CommandEntity.class);
+        } catch (Exception e) {
+            throw new RuntimeException("get command value error", e);
+        }
     }
 
     @Override
     public CommandEntity putCommand(CommandEntity entity) {
-        return null;
+        requireNonNull(entity);
+        try {
+            db.put(columnHandlesMap.get(commandFamilyName),
+                entity.getId().getBytes(), GSON.toJson(entity).getBytes());
+        } catch (Exception e) {
+            throw new RuntimeException("put value to rocks db error", e);
+        }
+        return entity;
     }
 
     @Override
     public void set(KeyValueEntity entity) {
-
+        requireNonNull(entity);
+        put(entity);
     }
 
     @Override
     public KeyValueEntity put(KeyValueEntity entity) {
-        return null;
+        requireNonNull(entity);
+        try {
+            db.put(columnHandlesMap.get(defaultFamilyName),
+                entity.getKey().getBytes(), GSON.toJson(entity).getBytes());
+        } catch (Exception e) {
+            throw new RuntimeException("put value to rocks db error", e);
+        }
+        return entity;
     }
 
     @Override
     public KeyValueEntity remove(String key) {
-        return null;
+        requireNonNull(key);
+        KeyValueEntity keyValueEntity = get(key);
+        if (keyValueEntity == null) {
+            LOGGER.warn("no key {} exist in rocksdb", key);
+            return null;
+        }
+        try {
+            db.delete(columnHandlesMap.get(defaultFamilyName),
+                key.getBytes());
+            return keyValueEntity;
+        } catch (Exception e) {
+            throw new RuntimeException("remove value from rocks db error", e);
+        }
     }
 
     @Override
     public List<KeyValueEntity> search(StateSearchKey searchKey) {
-        return null;
+        List<KeyValueEntity> results = new LinkedList<>();
+        try (final RocksIterator it = db.newIterator(
+            columnHandlesMap.get(defaultFamilyName))) {
+            it.seekToFirst();
+            while (it.isValid()) {
+                KeyValueEntity keyValueItem = GSON
+                    .fromJson(new String(it.value()), KeyValueEntity.class);
+                if (keyValueItem.getStateSearchKey().equals(searchKey)) {
+                    results.add(keyValueItem);
+                }
+                it.next();
+            }
+        }
+        return results;
     }
 
     @Override
     public List<CommandEntity> searchCommands(boolean isAcked) {
-        return null;
+        List<CommandEntity> results = new LinkedList<>();
+        try (final RocksIterator it = db.newIterator(
+            columnHandlesMap.get(commandFamilyName))) {
+            it.seekToFirst();
+            while (it.isValid()) {
+                CommandEntity commandEntity = GSON
+                    .fromJson(new String(it.value()), CommandEntity.class);
+                if (commandEntity.isAcked() == isAcked) {
+                    results.add(commandEntity);
+                }
+                it.next();
+            }
+        }
+        return results;
     }
 
     @Override
     public KeyValueEntity searchOne(StateSearchKey searchKey) {
+        try (final RocksIterator it = db.newIterator(
+            columnHandlesMap.get(defaultFamilyName))) {
+            it.seekToFirst();
+            while (it.isValid()) {
+                KeyValueEntity keyValueItem = GSON
+                    .fromJson(new String(it.value()), KeyValueEntity.class);
+                if (keyValueItem.getStateSearchKey().equals(searchKey)) {
+                    return keyValueItem;
+                }
+                it.next();
+            }
+        }
         return null;
     }
 
     @Override
     public KeyValueEntity searchOne(String fileName) {
+        try (final RocksIterator it = db.newIterator(
+            columnHandlesMap.get(defaultFamilyName))) {
+            it.seekToFirst();
+            while (it.isValid()) {
+                KeyValueEntity keyValueItem = GSON
+                    .fromJson(new String(it.value()), KeyValueEntity.class);
+                if (keyValueItem.getFileName().equals(fileName)) {
+                    return keyValueItem;
+                }
+                it.next();
+            }
+        }
         return null;
     }
 
     @Override
     public List<KeyValueEntity> findAll(String prefix) {
-        return null;
+        List<KeyValueEntity> results = new LinkedList<>();
+        try (final RocksIterator it = db.newIterator(
+            columnHandlesMap.get(defaultFamilyName))) {
+            it.seekToFirst();
+            while (it.isValid()) {
+                KeyValueEntity keyValueItem = GSON
+                    .fromJson(new String(it.value()), KeyValueEntity.class);
+                results.add(keyValueItem);
+                it.next();
+            }
+        }
+        return results;
     }
 
     @Override
     public void close() throws IOException {
-
+        db.close();
+        columnHandlesMap.values().forEach(AbstractImmutableNativeReference::close);
+        columnHandlesMap.clear();
+        columnDescriptorMap.clear();
     }
+
 }
diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
new file mode 100644
index 0000000..3bd7821
--- /dev/null
+++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.db;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.inlong.agent.AgentBaseTestsHelper;
+import org.apache.inlong.common.db.CommandEntity;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestRocksDbImp {
+
+    private static RocksDbImp db;
+    private static AgentBaseTestsHelper helper;
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        helper = new AgentBaseTestsHelper(TestRocksDbImp.class.getName()).setupAgentHome();
+        db = new RocksDbImp();
+    }
+
+    @Test
+    public void testKeyValueDB() {
+        KeyValueEntity entity = new KeyValueEntity("test1", "testA", "test");
+        db.put(entity);
+        KeyValueEntity ret = db.get("test1");
+        Assert.assertEquals("test1", ret.getKey());
+        Assert.assertEquals("testA", ret.getJsonValue());
+
+        db.remove("test1");
+        ret = db.get("test1");
+        Assert.assertNull(ret);
+
+        StateSearchKey keys = StateSearchKey.SUCCESS;
+        KeyValueEntity entity1 = new KeyValueEntity("test2", "testA", "test");
+        entity.setStateSearchKey(keys);
+        entity1.setStateSearchKey(keys);
+
+        db.set(entity);
+        db.set(entity1);
+
+        List<KeyValueEntity> entityList = db.search(keys);
+        for (KeyValueEntity keyValueEntity : entityList) {
+            Assert.assertEquals(StateSearchKey.SUCCESS, keyValueEntity.getStateSearchKey());
+        }
+        Assert.assertEquals(2, entityList.size());
+
+        entity.setJsonValue("testC");
+        db.put(entity);
+        KeyValueEntity newEntity = db.get("test1");
+        Assert.assertEquals("testC", newEntity.getJsonValue());
+
+    }
+
+    @Test
+    public void testCommandDb() {
+        CommandEntity commandEntity = new CommandEntity("1", 0, false, 1, "1");
+        db.putCommand(commandEntity);
+        CommandEntity command = db.getCommand("1");
+        Assert.assertEquals("1", command.getId());
+        List<CommandEntity> commandEntities = db.searchCommands(false);
+        Assert.assertEquals("1", commandEntities.get(0).getId());
+    }
+
+    @Test
+    public void testDeleteEntity() {
+        KeyValueEntity entity = new KeyValueEntity("searchKey1", "searchResult1", "test");
+        db.put(entity);
+        db.remove("searchKey1");
+        KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
+        Assert.assertNull(entityResult);
+    }
+
+    @Test
+    public void testFileNameSearch() {
+        KeyValueEntity entity = new KeyValueEntity("searchKey1", "searchResult1", "test");
+        db.put(entity);
+        KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
+        Assert.assertEquals("searchKey1", entityResult.getKey());
+    }
+
+    @AfterClass
+    public static void teardown() throws IOException {
+        db.close();
+        helper.teardownAgentHome();
+    }
+
+}