You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/03/21 01:39:03 UTC
[incubator-inlong] branch master updated: [INLONG-3045][Agent] Add rocksDb implementation (#3249)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new df6c870 [INLONG-3045][Agent] Add rocksDb implementation (#3249)
df6c870 is described below
commit df6c870256f8008b5cff36c56ba09ebc4dcdf99b
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Mon Mar 21 09:38:58 2022 +0800
[INLONG-3045][Agent] Add rocksDb implementation (#3249)
---
.../org/apache/inlong/agent/db/KeyValueEntity.java | 4 +
.../org/apache/inlong/agent/db/RocksDbImp.java | 216 +++++++++++++++++++--
.../org/apache/inlong/agent/db/TestRocksDbImp.java | 106 ++++++++++
3 files changed, 311 insertions(+), 15 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
index ab0ca19..930f8de 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
@@ -59,6 +59,10 @@ public class KeyValueEntity {
return key;
}
+ public String getFileName() {
+ return fileName;
+ }
+
public StateSearchKey getStateSearchKey() {
return stateSearchKey;
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
index 617667f..0384835 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
@@ -17,108 +17,294 @@
package org.apache.inlong.agent.db;
+import static java.util.Objects.requireNonNull;
+
+import com.google.gson.Gson;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.common.db.CommandEntity;
+import org.rocksdb.AbstractImmutableNativeReference;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Statistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DB implement based on rocks db.
- * TODO: this is low priority.
*/
public class RocksDbImp implements Db {
private static final Logger LOGGER = LoggerFactory.getLogger(RocksDbImp.class);
+ private static final Gson GSON = new Gson();
private final AgentConfiguration conf;
private final RocksDB db;
+ private ConcurrentHashMap<String, ColumnFamilyHandle> columnHandlesMap;
+ private ConcurrentHashMap<String, ColumnFamilyDescriptor> columnDescriptorMap;
+ private final String commandFamilyName = "command";
+ private final String defaultFamilyName = "default";
+ private String storePath;
public RocksDbImp() {
// init rocks db
this.conf = AgentConfiguration.getAgentConf();
this.db = initEnv();
+ // add a command column family
+ addColumnFamily(commandFamilyName);
}
private RocksDB initEnv() {
- String storePath = conf.get(
+ String configPath = conf.get(
AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH);
String parentPath = conf.get(AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME);
- File finalPath = new File(parentPath, storePath);
+ File finalPath = new File(parentPath, configPath);
+ storePath = finalPath.getAbsolutePath();
RocksDB.loadLibrary();
final Options options = new Options();
options.setCreateIfMissing(true);
try {
boolean result = finalPath.mkdirs();
LOGGER.info("create directory {}, result is {}", finalPath, result);
- return RocksDB.open(options, finalPath.getAbsolutePath());
+
+ columnHandlesMap = new ConcurrentHashMap<>();
+ columnDescriptorMap = new ConcurrentHashMap<>();
+
+ final DBOptions dbOptions = new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true)
+ .setWalDir(finalPath.getAbsolutePath()).setStatistics(new Statistics());
+
+ final List<ColumnFamilyDescriptor> managedColumnFamilies = loadManagedColumnFamilies(dbOptions);
+ final List<ColumnFamilyHandle> managedHandles = new ArrayList<>();
+
+ RocksDB rocksDB = RocksDB.open(dbOptions,
+ finalPath.getAbsolutePath(), managedColumnFamilies, managedHandles);
+
+ for (int index = 0; index < managedHandles.size(); index++) {
+ ColumnFamilyHandle handle = managedHandles.get(index);
+ ColumnFamilyDescriptor descriptor = managedColumnFamilies.get(index);
+ String familyNameFromHandle = new String(handle.getName());
+ String familyNameFromDescriptor = new String(descriptor.getName());
+
+ columnHandlesMap.put(familyNameFromHandle, handle);
+ columnDescriptorMap.put(familyNameFromDescriptor, descriptor);
+ }
+ return rocksDB;
} catch (Exception ex) {
- // cannot create local path, stop running.
+ // db is vital.
+ LOGGER.error("init rocksdb error, please check", ex);
throw new RuntimeException(ex);
}
}
+ private List<ColumnFamilyDescriptor> loadManagedColumnFamilies(DBOptions dbOptions) throws RocksDBException {
+ final List<ColumnFamilyDescriptor> managedColumnFamilies = new ArrayList<>();
+ final Options options = new Options(dbOptions, new ColumnFamilyOptions());
+ List<byte[]> existing = RocksDB.listColumnFamilies(options, storePath);
+
+ if (existing.isEmpty()) {
+ LOGGER.info("no previous column family found, use default");
+ managedColumnFamilies.add(getColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
+ } else {
+ LOGGER.info("loading column families :" + existing.stream().map(String::new).collect(Collectors.toList()));
+ managedColumnFamilies
+ .addAll(existing.stream().map(RocksDbImp::getColumnFamilyDescriptor).collect(Collectors.toList()));
+ }
+ return managedColumnFamilies;
+ }
+
+ private static ColumnFamilyDescriptor getColumnFamilyDescriptor(byte[] columnFamilyName) {
+ return new ColumnFamilyDescriptor(columnFamilyName, new ColumnFamilyOptions());
+ }
+
+ public void addColumnFamily(String columnFamilyName) {
+ columnDescriptorMap.computeIfAbsent(columnFamilyName, colFamilyName -> {
+ try {
+ ColumnFamilyDescriptor descriptor = getColumnFamilyDescriptor(colFamilyName.getBytes());
+ ColumnFamilyHandle handle = db.createColumnFamily(descriptor);
+ columnHandlesMap.put(colFamilyName, handle);
+ return descriptor;
+ } catch (RocksDBException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
@Override
public KeyValueEntity get(String key) {
- return null;
+ requireNonNull(key);
+ try {
+ byte[] bytes =
+ db.get(columnHandlesMap.get(defaultFamilyName), key.getBytes());
+ return bytes == null ? null :
+ GSON.fromJson(new String(bytes), KeyValueEntity.class);
+ } catch (Exception e) {
+ throw new RuntimeException("get key value entity error", e);
+ }
}
@Override
public CommandEntity getCommand(String commandId) {
- return null;
+ try {
+ byte[] bytes = db
+ .get(columnHandlesMap.get(commandFamilyName), commandId.getBytes());
+ return bytes == null ? null :
+ GSON.fromJson(new String(bytes), CommandEntity.class);
+ } catch (Exception e) {
+ throw new RuntimeException("get command value error", e);
+ }
}
@Override
public CommandEntity putCommand(CommandEntity entity) {
- return null;
+ requireNonNull(entity);
+ try {
+ db.put(columnHandlesMap.get(commandFamilyName),
+ entity.getId().getBytes(), GSON.toJson(entity).getBytes());
+ } catch (Exception e) {
+ throw new RuntimeException("put value to rocks db error", e);
+ }
+ return entity;
}
@Override
public void set(KeyValueEntity entity) {
-
+ requireNonNull(entity);
+ put(entity);
}
@Override
public KeyValueEntity put(KeyValueEntity entity) {
- return null;
+ requireNonNull(entity);
+ try {
+ db.put(columnHandlesMap.get(defaultFamilyName),
+ entity.getKey().getBytes(), GSON.toJson(entity).getBytes());
+ } catch (Exception e) {
+ throw new RuntimeException("put value to rocks db error", e);
+ }
+ return entity;
}
@Override
public KeyValueEntity remove(String key) {
- return null;
+ requireNonNull(key);
+ KeyValueEntity keyValueEntity = get(key);
+ if (keyValueEntity == null) {
+ LOGGER.warn("no key {} exist in rocksdb", key);
+ return null;
+ }
+ try {
+ db.delete(columnHandlesMap.get(defaultFamilyName),
+ key.getBytes());
+ return keyValueEntity;
+ } catch (Exception e) {
+ throw new RuntimeException("remove value from rocks db error", e);
+ }
}
@Override
public List<KeyValueEntity> search(StateSearchKey searchKey) {
- return null;
+ List<KeyValueEntity> results = new LinkedList<>();
+ try (final RocksIterator it = db.newIterator(
+ columnHandlesMap.get(defaultFamilyName))) {
+ it.seekToFirst();
+ while (it.isValid()) {
+ KeyValueEntity keyValueItem = GSON
+ .fromJson(new String(it.value()), KeyValueEntity.class);
+ if (keyValueItem.getStateSearchKey().equals(searchKey)) {
+ results.add(keyValueItem);
+ }
+ it.next();
+ }
+ }
+ return results;
}
@Override
public List<CommandEntity> searchCommands(boolean isAcked) {
- return null;
+ List<CommandEntity> results = new LinkedList<>();
+ try (final RocksIterator it = db.newIterator(
+ columnHandlesMap.get(commandFamilyName))) {
+ it.seekToFirst();
+ while (it.isValid()) {
+ CommandEntity commandEntity = GSON
+ .fromJson(new String(it.value()), CommandEntity.class);
+ if (commandEntity.isAcked() == isAcked) {
+ results.add(commandEntity);
+ }
+ it.next();
+ }
+ }
+ return results;
}
@Override
public KeyValueEntity searchOne(StateSearchKey searchKey) {
+ try (final RocksIterator it = db.newIterator(
+ columnHandlesMap.get(defaultFamilyName))) {
+ it.seekToFirst();
+ while (it.isValid()) {
+ KeyValueEntity keyValueItem = GSON
+ .fromJson(new String(it.value()), KeyValueEntity.class);
+ if (keyValueItem.getStateSearchKey().equals(searchKey)) {
+ return keyValueItem;
+ }
+ it.next();
+ }
+ }
return null;
}
@Override
public KeyValueEntity searchOne(String fileName) {
+ try (final RocksIterator it = db.newIterator(
+ columnHandlesMap.get(defaultFamilyName))) {
+ it.seekToFirst();
+ while (it.isValid()) {
+ KeyValueEntity keyValueItem = GSON
+ .fromJson(new String(it.value()), KeyValueEntity.class);
+ if (keyValueItem.getFileName().equals(fileName)) {
+ return keyValueItem;
+ }
+ it.next();
+ }
+ }
return null;
}
@Override
public List<KeyValueEntity> findAll(String prefix) {
- return null;
+ List<KeyValueEntity> results = new LinkedList<>();
+ try (final RocksIterator it = db.newIterator(
+ columnHandlesMap.get(defaultFamilyName))) {
+ it.seekToFirst();
+ while (it.isValid()) {
+ KeyValueEntity keyValueItem = GSON
+ .fromJson(new String(it.value()), KeyValueEntity.class);
+ results.add(keyValueItem);
+ it.next();
+ }
+ }
+ return results;
}
@Override
public void close() throws IOException {
-
+ db.close();
+ columnHandlesMap.values().forEach(AbstractImmutableNativeReference::close);
+ columnHandlesMap.clear();
+ columnDescriptorMap.clear();
}
+
}
diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
new file mode 100644
index 0000000..3bd7821
--- /dev/null
+++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.db;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.inlong.agent.AgentBaseTestsHelper;
+import org.apache.inlong.common.db.CommandEntity;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestRocksDbImp {
+
+ private static RocksDbImp db;
+ private static AgentBaseTestsHelper helper;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ helper = new AgentBaseTestsHelper(TestRocksDbImp.class.getName()).setupAgentHome();
+ db = new RocksDbImp();
+ }
+
+ @Test
+ public void testKeyValueDB() {
+ KeyValueEntity entity = new KeyValueEntity("test1", "testA", "test");
+ db.put(entity);
+ KeyValueEntity ret = db.get("test1");
+ Assert.assertEquals("test1", ret.getKey());
+ Assert.assertEquals("testA", ret.getJsonValue());
+
+ db.remove("test1");
+ ret = db.get("test1");
+ Assert.assertNull(ret);
+
+ StateSearchKey keys = StateSearchKey.SUCCESS;
+ KeyValueEntity entity1 = new KeyValueEntity("test2", "testA", "test");
+ entity.setStateSearchKey(keys);
+ entity1.setStateSearchKey(keys);
+
+ db.set(entity);
+ db.set(entity1);
+
+ List<KeyValueEntity> entityList = db.search(keys);
+ for (KeyValueEntity keyValueEntity : entityList) {
+ Assert.assertEquals(StateSearchKey.SUCCESS, keyValueEntity.getStateSearchKey());
+ }
+ Assert.assertEquals(2, entityList.size());
+
+ entity.setJsonValue("testC");
+ db.put(entity);
+ KeyValueEntity newEntity = db.get("test1");
+ Assert.assertEquals("testC", newEntity.getJsonValue());
+
+ }
+
+ @Test
+ public void testCommandDb() {
+ CommandEntity commandEntity = new CommandEntity("1", 0, false, 1, "1");
+ db.putCommand(commandEntity);
+ CommandEntity command = db.getCommand("1");
+ Assert.assertEquals("1", command.getId());
+ List<CommandEntity> commandEntities = db.searchCommands(false);
+ Assert.assertEquals("1", commandEntities.get(0).getId());
+ }
+
+ @Test
+ public void testDeleteEntity() {
+ KeyValueEntity entity = new KeyValueEntity("searchKey1", "searchResult1", "test");
+ db.put(entity);
+ db.remove("searchKey1");
+ KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
+ Assert.assertNull(entityResult);
+ }
+
+ @Test
+ public void testFileNameSearch() {
+ KeyValueEntity entity = new KeyValueEntity("searchKey1", "searchResult1", "test");
+ db.put(entity);
+ KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
+ Assert.assertEquals("searchKey1", entityResult.getKey());
+ }
+
+ @AfterClass
+ public static void teardown() throws IOException {
+ db.close();
+ helper.teardownAgentHome();
+ }
+
+}