You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/06/28 10:10:52 UTC

[GitHub] [ignite-3] ibessonov commented on a change in pull request #174: IGNITE-14408 Implement the Vault Service on top of RocksDB

ibessonov commented on a change in pull request #174:
URL: https://github.com/apache/ignite-3/pull/174#discussion_r659627787



##########
File path: modules/api/src/main/java/org/apache/ignite/app/IgnitionManager.java
##########
@@ -63,4 +63,11 @@ public static synchronized Ignite start(@NotNull String nodeName, @Nullable Stri
 
         return ignition.start(nodeName, jsonStrBootstrapCfg);
     }
+
+    /**
+     * Closes the encapsulated {@link Ignition} instance.
+     */
+    public static synchronized void close() throws Exception {
+        ignition.close();

Review comment:
       So you're telling that static Ignition must be explicitly closed? Looks bad, can we avoid this?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -328,14 +334,7 @@ public static ClassLoader igniteClassLoader() {
         if (ldr == null)
             ldr = igniteClassLoader;
 
-        ConcurrentMap<String, Class<?>> ldrMap = classCache.get(ldr);
-
-        if (ldrMap == null) {
-            ConcurrentMap<String, Class<?>> old = classCache.putIfAbsent(ldr, ldrMap = new ConcurrentHashMap<>());
-
-            if (old != null)
-                ldrMap = old;
-        }
+        ConcurrentMap<String, Class<?>> ldrMap = classCache.computeIfAbsent(ldr, k -> new ConcurrentHashMap<>());

Review comment:
       Is there a reason for this change other than code simplicity? Sadly, computeIfAbsent is a blocking method

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
##########
@@ -184,7 +192,32 @@ else if (jsonStrBootstrapCfg != null)
 
         ackSuccessStart();
 
-        return new IgniteImpl(distributedTblMgr);
+        return new IgniteImpl(distributedTblMgr, vaultMgr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        IgniteUtils.delete(VAULT_DB_PATH);

Review comment:
       This is clearly not how it's going to work in the future, can we add TODO here?

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultEntry.java
##########
@@ -52,25 +53,42 @@ public Entry(@NotNull ByteArray key, byte[] val) {
      *
      * @return The {@code ByteArray}.
      */
-    @NotNull public ByteArray key() {
+    public ByteArray key() {
         return key;
     }
 
     /**
-     * Returns a value. Could be {@code null} for empty entry.
+     * Returns a value. Can be {@code null} for if the entry is empty.

Review comment:
       "for" can be removed now. Thank you for spellchecking!

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
##########
@@ -184,7 +192,32 @@ else if (jsonStrBootstrapCfg != null)
 
         ackSuccessStart();
 
-        return new IgniteImpl(distributedTblMgr);
+        return new IgniteImpl(distributedTblMgr, vaultMgr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        IgniteUtils.delete(VAULT_DB_PATH);
+    }
+
+    /**
+     * Starts the Vault component.
+     */
+    private static VaultManager createVault(String nodeName) {
+        Path vaultPath = VAULT_DB_PATH.resolve(nodeName);
+
+        try {
+            Files.createDirectories(vaultPath);
+        }
+        catch (IOException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        var vaultMgr = new VaultManager(new PersistentVaultService(vaultPath));
+
+        vaultMgr.putName(nodeName).join();

Review comment:
       I find it weird that we put name unconditionally and only then check whether vault has been bootstrapped or not. Can we tweak it a little bit?

##########
File path: modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
##########
@@ -74,7 +76,7 @@
     public static final String DISTRIBUTED_PREFIX = "dst-cfg.";
 
     /**
-     * Special key for the vault where the applied revision for {@link MetaStorageManager#storeEntries(Collection, long)}
+     * Special key for the vault where the applied revision for {@link MetaStorageManager#storeEntries}

Review comment:
       Is this link still valid?

##########
File path: modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistentVaultServiceTest.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.persistence;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultService;
+import org.apache.ignite.internal.vault.VaultServiceTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Test suite for the {@link PersistentVaultService}.
+ */
+class PersistentVaultServiceTest extends VaultServiceTest {
+    /** */
+    private Path vaultDir;
+
+    /** */
+    @BeforeEach
+    @Override public void setUp(TestInfo testInfo) throws IOException {
+        String dirName = testInfo.getTestMethod()
+            .map(Method::getName)
+            .orElseThrow();
+
+        vaultDir = Paths.get(dirName, "vault");
+
+        Files.createDirectories(vaultDir);
+
+        super.setUp(testInfo);
+    }
+
+    /** {@inheritDoc} */
+    @AfterEach
+    @Override public void tearDown() throws Exception {
+        super.tearDown();
+
+        IgniteUtils.delete(vaultDir);

Review comment:
       Same comment

##########
File path: modules/vault/src/integrationTest/java/org/apache/ignite/internal/vault/persistence/PersistencePropertiesVaultServiceTest.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.persistence;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import static org.apache.ignite.internal.vault.CompletableFutureMatcher.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Test suite for testing persistence properties of {@link PersistentVaultService}.
+ */
+class PersistencePropertiesVaultServiceTest {
+    /** */
+    private static final int TIMEOUT_SECONDS = 1;
+
+    /** */
+    private Path vaultDir;
+
+    /** */
+    @BeforeEach
+    void setUp(TestInfo testInfo) throws IOException {
+        String dirName = testInfo.getTestMethod()
+            .map(Method::getName)
+            .orElseThrow();
+
+        vaultDir = Paths.get(dirName, "vault");
+
+        Files.createDirectories(vaultDir);
+    }
+
+    /** */
+    @AfterEach
+    void tearDown() {
+        IgniteUtils.delete(vaultDir);

Review comment:
       Will this leave "dirName" directory unaffected? I think we should also delete parent here.

##########
File path: modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
##########
@@ -504,20 +493,12 @@ public synchronized void deployWatches() {
     }
 
     /**
-     * @return Applied revision for {@link VaultManager#putAll(Map, ByteArray, long)} operation.
-     * @throws IgniteInternalCheckedException If couldn't get applied revision from vault.
+     * @return Applied revision for {@link VaultManager#putAll} operation.
      */
-    private long appliedRevision() throws IgniteInternalCheckedException {
-        byte[] appliedRevision;
+    private long appliedRevision() {
+        byte[] appliedRevision = vaultMgr.get(APPLIED_REV).join().value();

Review comment:
       That's why I don't like unchecked exceptions - now this method can throw ExecutionException and no one expects it

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.persistence;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompactionPriority;
+import org.rocksdb.CompressionType;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Vault Service implementation based on <a href="https://github.com/facebook/rocksdb">RocksDB</a>.
+ */
+public class PersistentVaultService implements VaultService {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** */
+    private final ExecutorService threadPool = Executors.newFixedThreadPool(2);
+
+    /** */
+    private final Options options = new Options();
+
+    /** */
+    private final RocksDB db;
+
+    /**
+     * Creates and starts the RocksDB instance using the recommended options on the given {@code path}.
+     *
+     * @param path base path for RocksDB
+     */
+    public PersistentVaultService(Path path) {
+        // using the recommended options from https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
+        options
+            .setCreateIfMissing(true)
+            .setCompressionType(CompressionType.LZ4_COMPRESSION)
+            .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+            .setLevelCompactionDynamicLevelBytes(true)
+            .setBytesPerSync(1048576)
+            .setCompactionPriority(CompactionPriority.MinOverlappingRatio)
+            .setTableFormatConfig(
+                new BlockBasedTableConfig()
+                    .setBlockSize(16 * 1024)
+                    .setCacheIndexAndFilterBlocks(true)
+                    .setPinL0FilterAndIndexBlocksInCache(true)
+                    .setFormatVersion(5)
+                    .setFilterPolicy(new BloomFilter(10, false))
+                    .setOptimizeFiltersForMemory(true)
+            );
+
+        try {
+            db = RocksDB.open(options, path.toString());
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() throws RocksDBException {
+        try (options; db) {
+            db.syncWal();
+
+            IgniteUtils.shutdownAndAwaitTermination(threadPool, 10, TimeUnit.SECONDS);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @NotNull CompletableFuture<VaultEntry> get(@NotNull ByteArray key) {
+        return supplyAsync(() -> db.get(key.bytes()))
+            .thenApply(v -> new VaultEntry(key, v));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, byte @Nullable [] val) {
+        return val == null ? remove(key) : runAsync(() -> db.put(key.bytes(), val));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @NotNull CompletableFuture<Void> remove(@NotNull ByteArray key) {
+        return runAsync(() -> db.delete(key.bytes()));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @NotNull Cursor<VaultEntry> range(@NotNull ByteArray fromKey, @NotNull ByteArray toKey) {
+        try (var readOpts = new ReadOptions()) {
+            var lowerBound = new Slice(fromKey.bytes());
+            var upperBound = new Slice(toKey.bytes());
+
+            readOpts
+                .setIterateLowerBound(lowerBound)
+                .setIterateUpperBound(upperBound);
+
+            RocksIterator it = db.newIterator(readOpts);
+            it.seekToFirst();
+
+            return new RocksIteratorAdapter(it, lowerBound, upperBound);

Review comment:
       Should you pass readOpts as well? %)

##########
File path: modules/vault/src/test/java/org/apache/ignite/internal/vault/VaultServiceTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.internal.vault.CompletableFutureMatcher.await;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+
+/**
+ * Base class for testing {@link VaultService} implementations.
+ */
+public abstract class VaultServiceTest {
+    /** */
+    private static final int TIMEOUT_SECONDS = 1;
+
+    /** Vault. */
+    private VaultService vaultService;
+
+    /** */
+    @BeforeEach
+    public void setUp(TestInfo testInfo) throws IOException {
+        vaultService = getVaultService();
+    }
+
+    /** */
+    @AfterEach
+    public void tearDown() throws Exception {
+        vaultService.close();
+    }
+
+    /**
+     * Returns the vault service that will be tested.
+     */
+    protected abstract VaultService getVaultService();
+
+    /**
+     * Tests regular behaviour of the {@link VaultService#put} method.
+     */
+    @Test
+    public void testPut() throws Exception {
+        ByteArray key = getKey(1);
+
+        assertThat(vaultService.get(key), await(is(new VaultEntry(key, null))));

Review comment:
       This is a minor complaint, but "await is" doesn't resemble a sentence. Maybe we should use something like "willBe(equalTo(...))"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org