You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/04/23 11:14:58 UTC

[GitHub] [ignite-3] sanpwc commented on a change in pull request #105: IGNITE-14406 introduced in-memory vault implementation

sanpwc commented on a change in pull request #105:
URL: https://github.com/apache/ignite-3/pull/105#discussion_r619093214



##########
File path: modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A class for handling byte array.
+ */
+public final class ByteArray implements Comparable<ByteArray> {

Review comment:
       What about using` org.apache.ignite.metastorage.common.Key` instead of `ByteArray`? It also means that Key should be moved to the internal part of ignite-core: `org.apache.ingnite.internal.core....`

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
##########
@@ -17,18 +17,80 @@
 
 package org.apache.ignite.internal.vault;
 
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.vault.common.VaultEntry;
+import org.apache.ignite.internal.vault.impl.VaultServiceImpl;
+import org.apache.ignite.internal.vault.service.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
 /**
- * VaultManager is responsible for handling VaultService lifecycle and providing interface for managing local keys.
+ * VaultManager is responsible for handling {@link VaultService} lifecycle
+ * and providing interface for managing local keys.
  */
 public class VaultManager {
+    private VaultService vaultService;
+
+    /**
+     * Default constructor.
+     */
+    public VaultManager() {
+        this.vaultService = new VaultServiceImpl();
+    }
 
     /**
      * @return {@code true} if VaultService beneath given VaultManager was bootstrapped with data
      * either from PDS or from user initial bootstrap configuration.
+     *
+     * TODO: implement when IGNITE-14408 will be ready
      */
     public boolean bootstrapped() {
         return false;

Review comment:
       Lets either rename `bootspapped` to `bootstrappedWithPDS` or rename to `initialized()` and change logic in a following way:
   - returns true if was deployed atop PDS with some configuration included;
   - returns true if it was bootstrapped with user bootstrap configuration;
   - returns true if there was no user bootstrap configuration and default one was used.
   I believe that second option is better.

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultEntry.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.io.Serializable;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Representation of vault entry.
+ */
+public class VaultEntry implements Entry, Serializable {

Review comment:
       Let's make it unmodifiable, adding final to key, val, etc.

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Watcher.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Vault watcher.
+ *
+ * Watches for vault entries updates.
+ */
+public interface Watcher {
+    /**
+     * Registers watch for vault entries updates.
+     *
+     * @param vaultWatch Vault watch.
+     * @return UUID of registered watch.
+     */
+    CompletableFuture<IgniteUuid> register(@NotNull VaultWatch vaultWatch);

Review comment:
       Cause Vault watches are always local we should probably use Long instead of IgniteUuid.

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/common/WatcherImpl.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of vault {@link Watcher}.
+ */
+public class WatcherImpl implements Watcher {
+    /** Queue for changed vault entries. */
+    private final BlockingQueue<VaultEntry> queue = new LinkedBlockingQueue<>();
+
+    /** Registered vault watches. */
+    private final Map<IgniteUuid, VaultWatch> watches = new HashMap<>();
+
+    /** Flag for indicating if watcher is stopped. */
+    private volatile boolean stop;
+
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    /** Execution service which runs thread for processing changed vault entries. */
+    private final ExecutorService exec;
+
+    /**
+     * Default constructor.
+     */
+    public WatcherImpl() {
+        exec = Executors.newFixedThreadPool(1);
+
+        exec.execute(new WatcherWorker());
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<IgniteUuid> register(@NotNull VaultWatch vaultWatch) {

Review comment:
       Why do we need Future here?

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
##########
@@ -17,18 +17,80 @@
 
 package org.apache.ignite.internal.vault;
 
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.vault.common.VaultEntry;
+import org.apache.ignite.internal.vault.impl.VaultServiceImpl;
+import org.apache.ignite.internal.vault.service.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
 /**
- * VaultManager is responsible for handling VaultService lifecycle and providing interface for managing local keys.
+ * VaultManager is responsible for handling {@link VaultService} lifecycle
+ * and providing interface for managing local keys.
  */
 public class VaultManager {
+    private VaultService vaultService;
+
+    /**
+     * Default constructor.
+     */
+    public VaultManager() {
+        this.vaultService = new VaultServiceImpl();

Review comment:
       Let's move VaultService to the constructor parameter in order to be consistent with other modules.

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultListener.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+/**
+ * Vault storage listener for changes.
+ */
+@FunctionalInterface
+public interface VaultListener {

Review comment:
       Semantically it should be the same as `org.apache.ignite.metastorage.common.WatchListener,` why not to reuse it?

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
##########
@@ -17,18 +17,80 @@
 
 package org.apache.ignite.internal.vault;
 
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.vault.common.VaultEntry;
+import org.apache.ignite.internal.vault.impl.VaultServiceImpl;
+import org.apache.ignite.internal.vault.service.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
 /**
- * VaultManager is responsible for handling VaultService lifecycle and providing interface for managing local keys.
+ * VaultManager is responsible for handling {@link VaultService} lifecycle
+ * and providing interface for managing local keys.
  */
 public class VaultManager {
+    private VaultService vaultService;
+
+    /**
+     * Default constructor.
+     */
+    public VaultManager() {
+        this.vaultService = new VaultServiceImpl();
+    }
 
     /**
      * @return {@code true} if VaultService beneath given VaultManager was bootstrapped with data
      * either from PDS or from user initial bootstrap configuration.
+     *
+     * TODO: implement when IGNITE-14408 will be ready
      */
     public boolean bootstrapped() {
         return false;
     }
 
-    // TODO: IGNITE-14405 Local persistent key-value storage (Vault).
+    /**
+     * See {@link VaultService#get(ByteArray)}
+     */
+    public CompletableFuture<VaultEntry> get(ByteArray key) {
+        return vaultService.get(key);
+    }
+
+    /**
+     * See {@link VaultService#put(ByteArray, byte[])}
+     */
+    public CompletableFuture<Void> put(ByteArray key, byte[] val) {
+        return vaultService.put(key, val);
+    }
+
+    /**
+     * See {@link VaultService#remove(ByteArray)}
+     */
+    public CompletableFuture<Void> remove(ByteArray key) {
+        return vaultService.remove(key);
+    }
+
+    /**
+     * See {@link VaultService#range(ByteArray, ByteArray)}
+     */
+    public Iterator<VaultEntry> range(ByteArray fromKey, ByteArray toKey) {
+        return vaultService.range(fromKey, toKey);
+    }
+
+    /**
+     * See {@link VaultService#putAll}
+     */
+    @NotNull
+    public CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals, long revision) {

Review comment:
       revision -> appliedRevision

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
##########
@@ -17,18 +17,80 @@
 
 package org.apache.ignite.internal.vault;
 
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.vault.common.VaultEntry;
+import org.apache.ignite.internal.vault.impl.VaultServiceImpl;
+import org.apache.ignite.internal.vault.service.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
 /**
- * VaultManager is responsible for handling VaultService lifecycle and providing interface for managing local keys.
+ * VaultManager is responsible for handling {@link VaultService} lifecycle
+ * and providing interface for managing local keys.
  */
 public class VaultManager {
+    private VaultService vaultService;
+
+    /**
+     * Default constructor.
+     */
+    public VaultManager() {
+        this.vaultService = new VaultServiceImpl();
+    }
 
     /**
      * @return {@code true} if VaultService beneath given VaultManager was bootstrapped with data
      * either from PDS or from user initial bootstrap configuration.
+     *
+     * TODO: implement when IGNITE-14408 will be ready
      */
     public boolean bootstrapped() {
         return false;
     }
 
-    // TODO: IGNITE-14405 Local persistent key-value storage (Vault).
+    /**
+     * See {@link VaultService#get(ByteArray)}
+     */
+    public CompletableFuture<VaultEntry> get(ByteArray key) {
+        return vaultService.get(key);
+    }
+
+    /**
+     * See {@link VaultService#put(ByteArray, byte[])}
+     */
+    public CompletableFuture<Void> put(ByteArray key, byte[] val) {
+        return vaultService.put(key, val);
+    }
+
+    /**
+     * See {@link VaultService#remove(ByteArray)}
+     */
+    public CompletableFuture<Void> remove(ByteArray key) {
+        return vaultService.remove(key);
+    }
+
+    /**
+     * See {@link VaultService#range(ByteArray, ByteArray)}
+     */
+    public Iterator<VaultEntry> range(ByteArray fromKey, ByteArray toKey) {
+        return vaultService.range(fromKey, toKey);
+    }
+
+    /**
+     * See {@link VaultService#putAll}
+     */
+    @NotNull
+    public CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals, long revision) {
+        return vaultService.putAll(vals, revision);
+    }
+
+    /**
+     * See {@link VaultService#appliedRevision()}
+     */
+    @NotNull
+    public CompletableFuture<Long> appliedRevision() {
+        return vaultService.appliedRevision();
+    }

Review comment:
       What about watch() and stopWatch() methods?

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
##########
@@ -17,18 +17,80 @@
 
 package org.apache.ignite.internal.vault;
 
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.vault.common.VaultEntry;
+import org.apache.ignite.internal.vault.impl.VaultServiceImpl;
+import org.apache.ignite.internal.vault.service.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
 /**
- * VaultManager is responsible for handling VaultService lifecycle and providing interface for managing local keys.
+ * VaultManager is responsible for handling {@link VaultService} lifecycle
+ * and providing interface for managing local keys.
  */
 public class VaultManager {
+    private VaultService vaultService;
+
+    /**
+     * Default constructor.
+     */
+    public VaultManager() {
+        this.vaultService = new VaultServiceImpl();
+    }
 
     /**
      * @return {@code true} if VaultService beneath given VaultManager was bootstrapped with data
      * either from PDS or from user initial bootstrap configuration.
+     *
+     * TODO: implement when IGNITE-14408 will be ready
      */
     public boolean bootstrapped() {
         return false;
     }
 
-    // TODO: IGNITE-14405 Local persistent key-value storage (Vault).
+    /**
+     * See {@link VaultService#get(ByteArray)}
+     */
+    public CompletableFuture<VaultEntry> get(ByteArray key) {

Review comment:
       Why we need Futures here? In my opinion cause all vault methods are local, have no dependencies on other components and actually sync there's no sense in Futures. Do you expect any chaining logic?

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultWatch.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.util.Comparator;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Watch for vault entries.
+ * Could be specified by range of keys.
+ * If value of key in range is changed, then corresponding listener will be triggered.
+ */
+public class VaultWatch {

Review comment:
       Let's make it unmodified, final private fields, etc.

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Entry.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents a vault unit as entry with key and value, where
+ * <ul>
+ *     <li>key - an unique entry's key. Keys are comparable in lexicographic manner and represented as an {@link ByteArray}.</li>
+ *     <li>value - a data which is associated with a key and represented as an array of bytes.</ul>
+ * </ul>
+ */
+public interface Entry {

Review comment:
       Cause given Entry is the same as `org.apache.ignite.metastorage.common.Entry ` what about moving it to some common module like it supposed to be done with Key?

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.common.*;
+import org.apache.ignite.internal.vault.service.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Simple in-memory representation of vault. Only for test purposes.
+ */
+public class VaultServiceImpl implements VaultService {
+    /** Map to store values. */
+    private TreeMap<ByteArray, byte[]> storage = new TreeMap<>();
+
+    /**
+     * Special key for vault where applied revision for {@code putAll} operation is stored.
+     */
+    private static ByteArray APPLIED_REV = ByteArray.fromString("applied_revision");
+
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    private final WatcherImpl watcher;
+
+    public VaultServiceImpl() {
+        this.watcher = new WatcherImpl();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<VaultEntry> get(ByteArray key) {
+        synchronized (mux) {
+            return CompletableFuture.completedFuture(new VaultEntry(key, storage.get(key)));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @NotNull @Override public CompletableFuture<Long> appliedRevision() {
+        synchronized (mux) {
+            return CompletableFuture.completedFuture(IgniteUtils.bytesToLong(storage.get(APPLIED_REV), 0));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Void> put(ByteArray key, byte[] val) {
+        synchronized (mux) {
+            storage.put(key, val);
+
+            watcher.notify(new VaultEntry(key, val));
+
+            return CompletableFuture.allOf();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Void> remove(ByteArray key) {
+        synchronized (mux) {
+            storage.remove(key);
+
+            return CompletableFuture.allOf();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterator<VaultEntry> range(ByteArray fromKey, ByteArray toKey) {
+        synchronized (mux) {
+            return new ArrayList<>(storage.subMap(fromKey, toKey).entrySet())
+                .stream()
+                .map(e -> new VaultEntry(e.getKey(), e.getValue()))
+                .iterator();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public @NotNull CompletableFuture<IgniteUuid> watch(@NotNull VaultWatch vaultWatch) {

Review comment:
       In order to be consistent with MetaStorageService contract, I'd rather use methods similar to
   watch(@Nullable Key keyFrom, @Nullable Key keyTo, @NotNull WatchListener lsnr);
   and
   watch(@NotNull Key key, l@NotNull WatchListener lsnr);
   within VaultService and VaultManager.

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/common/WatcherImpl.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of vault {@link Watcher}.
+ */
+public class WatcherImpl implements Watcher {
+    /** Queue for changed vault entries. */
+    private final BlockingQueue<VaultEntry> queue = new LinkedBlockingQueue<>();
+
+    /** Registered vault watches. */
+    private final Map<IgniteUuid, VaultWatch> watches = new HashMap<>();
+
+    /** Flag for indicating if watcher is stopped. */
+    private volatile boolean stop;
+
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    /** Execution service which runs thread for processing changed vault entries. */
+    private final ExecutorService exec;
+
+    /**
+     * Default constructor.
+     */
+    public WatcherImpl() {
+        exec = Executors.newFixedThreadPool(1);
+
+        exec.execute(new WatcherWorker());
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<IgniteUuid> register(@NotNull VaultWatch vaultWatch) {
+        synchronized (mux) {
+            IgniteUuid key = new IgniteUuid(UUID.randomUUID(), 0);

Review comment:
       Let's use AtomicLong instead of `UUID.randomUUID()`.

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Watcher.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Vault watcher.
+ *
+ * Watches for vault entries updates.
+ */
+public interface Watcher {

Review comment:
       Do we really need given interface? Seems that it's an inner detail of VaultServiceImpl. Besides that seems that we will have only one implementation of Wather.

##########
File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/common/WatcherImpl.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of vault {@link Watcher}.
+ */
+public class WatcherImpl implements Watcher {
+    /** Queue for changed vault entries. */
+    private final BlockingQueue<VaultEntry> queue = new LinkedBlockingQueue<>();
+
+    /** Registered vault watches. */
+    private final Map<IgniteUuid, VaultWatch> watches = new HashMap<>();
+
+    /** Flag for indicating if watcher is stopped. */
+    private volatile boolean stop;
+
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    /** Execution service which runs thread for processing changed vault entries. */
+    private final ExecutorService exec;
+
+    /**
+     * Default constructor.
+     */
+    public WatcherImpl() {
+        exec = Executors.newFixedThreadPool(1);
+
+        exec.execute(new WatcherWorker());
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<IgniteUuid> register(@NotNull VaultWatch vaultWatch) {
+        synchronized (mux) {

Review comment:
       What about synchronize methods instead of `synchronized (mux)` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org