You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "Pochatkin (via GitHub)" <gi...@apache.org> on 2023/04/13 19:40:49 UTC

[GitHub] [ignite-3] Pochatkin commented on a diff in pull request #1929: IGNITE-19152 Use schema information in LocalFileConfigurationStorage

Pochatkin commented on code in PR #1929:
URL: https://github.com/apache/ignite-3/pull/1929#discussion_r1165933231


##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java:
##########
@@ -40,114 +53,187 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.configuration.RootKey;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
 import org.apache.ignite.internal.configuration.NodeConfigCreateException;
 import org.apache.ignite.internal.configuration.NodeConfigWriteException;
+import org.apache.ignite.internal.configuration.RootInnerNode;
+import org.apache.ignite.internal.configuration.SuperRoot;
+import org.apache.ignite.internal.configuration.asm.ConfigurationAsmGenerator;
+import org.apache.ignite.internal.configuration.hocon.HoconConverter;
+import org.apache.ignite.internal.configuration.tree.ConverterToMapVisitor;
+import org.apache.ignite.internal.configuration.tree.InnerNode;
 import org.apache.ignite.internal.future.InFlightFutures;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;

Review Comment:
   Please don't use NotNull annotation



##########
modules/configuration/src/main/java/org/apache/ignite/internal/configuration/hocon/HoconConverter.java:
##########
@@ -47,6 +53,26 @@ public static ConfigValue represent(
         return ConfigImpl.fromAnyRef(res, null);
     }
 
+    /**

Review Comment:
   TBD



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java:
##########
@@ -234,4 +339,26 @@ private void checkAndRestoreConfigFile() {
             }
         }
     }
+
+    private <T> CompletableFuture<T> readLockAsync(Supplier<T> supplier) {
+        lock.readLock().lock();
+        try {
+            CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier, workerThreadPool);
+            futureTracker.registerFuture(future);
+            return future;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    private <T> CompletableFuture<T> writeLockAsync(Supplier<T> supplier) {
+        lock.writeLock().lock();
+        try {
+            CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier, workerThreadPool);
+            futureTracker.registerFuture(future);
+            return future;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }

Review Comment:
   As I see all RW operations which you made in supplier has no locking.  So, I think this is not your fault and was previously. I think it would be better to move locking to suppliers implementation 



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java:
##########
@@ -40,114 +53,187 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.configuration.RootKey;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
 import org.apache.ignite.internal.configuration.NodeConfigCreateException;
 import org.apache.ignite.internal.configuration.NodeConfigWriteException;
+import org.apache.ignite.internal.configuration.RootInnerNode;
+import org.apache.ignite.internal.configuration.SuperRoot;
+import org.apache.ignite.internal.configuration.asm.ConfigurationAsmGenerator;
+import org.apache.ignite.internal.configuration.hocon.HoconConverter;
+import org.apache.ignite.internal.configuration.tree.ConverterToMapVisitor;
+import org.apache.ignite.internal.configuration.tree.InnerNode;
 import org.apache.ignite.internal.future.InFlightFutures;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Implementation of {@link ConfigurationStorage} based on local file configuration storage.
  */
 public class LocalFileConfigurationStorage implements ConfigurationStorage {
     private static final IgniteLogger LOG = Loggers.forClass(LocalFileConfigurationStorage.class);
 
-    /**
-     * Path to config file.
-     */
+    /** Path to config file. */
     private final Path configPath;
 
-    /**
-     * Path to temporary configuration storage.
-     */
+    /** Path to temporary configuration storage. */
     private final Path tempConfigPath;
 
+    /** R/W lock to guard the latest configuration and config file. */
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    /**
-     * Latest state of last applied configuration.
-     */
+    /** Latest state of last applied configuration. */
     private final Map<String, Serializable> latest = new ConcurrentHashMap<>();
 
-    /**
-     *  Configuration changes listener.
-     *  */
+    /** Configuration nodes generator. */
+    private final ConfigurationAsmGenerator cgen = new ConfigurationAsmGenerator();
+
+    /** Map of root keys that are needed to generate configuration tree. */
+    private final Map<String, RootKey<?, ?>> rootKeys;
+
+    /** Configuration changes listener. */
     private final AtomicReference<ConfigurationStorageListener> lsnrRef = new AtomicReference<>();
 
-    private final ExecutorService threadPool = Executors.newFixedThreadPool(2, new NamedThreadFactory("loc-cfg-file", LOG));
+    /** Thread pool for configuration updates notifications. */
+    private final ExecutorService threadPool = Executors.newFixedThreadPool(
+            2, new NamedThreadFactory("loc-cfg-file", LOG)
+    );
+
+    /** Thread pool for configuration updates notifications. */
+    private final ExecutorService workerThreadPool = Executors.newFixedThreadPool(
+            2, new NamedThreadFactory("cfg-file-worker", LOG)
+    );

Review Comment:
   What the reason to have two thread pools?



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java:
##########
@@ -234,4 +339,26 @@ private void checkAndRestoreConfigFile() {
             }
         }
     }
+
+    private <T> CompletableFuture<T> readLockAsync(Supplier<T> supplier) {
+        lock.readLock().lock();
+        try {
+            CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier, workerThreadPool);
+            futureTracker.registerFuture(future);
+            return future;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    private <T> CompletableFuture<T> writeLockAsync(Supplier<T> supplier) {
+        lock.writeLock().lock();
+        try {
+            CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier, workerThreadPool);
+            futureTracker.registerFuture(future);
+            return future;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }

Review Comment:
   As I see all RW operations which you made in supplier has no locking.  So, I think this is not your fault and was previously. I think it would be better to move locking to suppliers implementation 



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java:
##########
@@ -40,114 +53,187 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.configuration.RootKey;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
 import org.apache.ignite.internal.configuration.NodeConfigCreateException;
 import org.apache.ignite.internal.configuration.NodeConfigWriteException;
+import org.apache.ignite.internal.configuration.RootInnerNode;
+import org.apache.ignite.internal.configuration.SuperRoot;
+import org.apache.ignite.internal.configuration.asm.ConfigurationAsmGenerator;
+import org.apache.ignite.internal.configuration.hocon.HoconConverter;
+import org.apache.ignite.internal.configuration.tree.ConverterToMapVisitor;
+import org.apache.ignite.internal.configuration.tree.InnerNode;
 import org.apache.ignite.internal.future.InFlightFutures;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Implementation of {@link ConfigurationStorage} based on local file configuration storage.
  */
 public class LocalFileConfigurationStorage implements ConfigurationStorage {
     private static final IgniteLogger LOG = Loggers.forClass(LocalFileConfigurationStorage.class);
 
-    /**
-     * Path to config file.
-     */
+    /** Path to config file. */
     private final Path configPath;
 
-    /**
-     * Path to temporary configuration storage.
-     */
+    /** Path to temporary configuration storage. */
     private final Path tempConfigPath;
 
+    /** R/W lock to guard the latest configuration and config file. */
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    /**
-     * Latest state of last applied configuration.
-     */
+    /** Latest state of last applied configuration. */
     private final Map<String, Serializable> latest = new ConcurrentHashMap<>();
 
-    /**
-     *  Configuration changes listener.
-     *  */
+    /** Configuration nodes generator. */
+    private final ConfigurationAsmGenerator cgen = new ConfigurationAsmGenerator();
+
+    /** Map of root keys that are needed to generate configuration tree. */
+    private final Map<String, RootKey<?, ?>> rootKeys;
+
+    /** Configuration changes listener. */
     private final AtomicReference<ConfigurationStorageListener> lsnrRef = new AtomicReference<>();
 
-    private final ExecutorService threadPool = Executors.newFixedThreadPool(2, new NamedThreadFactory("loc-cfg-file", LOG));
+    /** Thread pool for configuration updates notifications. */
+    private final ExecutorService threadPool = Executors.newFixedThreadPool(
+            2, new NamedThreadFactory("loc-cfg-file", LOG)
+    );
+
+    /** Thread pool for configuration updates notifications. */
+    private final ExecutorService workerThreadPool = Executors.newFixedThreadPool(
+            2, new NamedThreadFactory("cfg-file-worker", LOG)
+    );
 
+    /** Tracks all running futures. */
     private final InFlightFutures futureTracker = new InFlightFutures();
 
+    /** Last revision for configuration. */
     private long lastRevision = 0L;
 
+    /**
+     * Constructor without configuration extensions.
+     *
+     * @param configPath Path to node bootstrap configuration file.
+     * @param rootKeys Configuration root keys.
+     */
+    public LocalFileConfigurationStorage(Path configPath, Collection<RootKey<?, ?>> rootKeys) {
+        this(configPath, rootKeys, Collections.emptyList(), Collections.emptyList());
+    }
+
     /**
      * Constructor.
      *
      * @param configPath Path to node bootstrap configuration file.
+     * @param rootKeys Configuration root keys.
+     * @param internalSchemaExtensions Internal schema extensions.
+     * @param polymorphicSchemaExtensions Polymorphic schema extensions.
      */
-    public LocalFileConfigurationStorage(Path configPath) {
+    public LocalFileConfigurationStorage(Path configPath, Collection<RootKey<?, ?>> rootKeys,
+            Collection<Class<?>> internalSchemaExtensions, Collection<Class<?>> polymorphicSchemaExtensions) {
         this.configPath = configPath;
-        tempConfigPath = configPath.resolveSibling(configPath.getFileName() + ".tmp");
+        this.rootKeys = rootKeys.stream().collect(toMap(RootKey::key, identity()));
+        this.tempConfigPath = configPath.resolveSibling(configPath.getFileName() + ".tmp");
+
+        Map<Class<?>, Set<Class<?>>> internalExtensions = internalSchemaExtensions(internalSchemaExtensions);
+        Map<Class<?>, Set<Class<?>>> polymorphicExtensions = polymorphicSchemaExtensions(polymorphicSchemaExtensions);
+
+        rootKeys.forEach(key -> cgen.compileRootSchema(key.schemaClass(), internalExtensions, polymorphicExtensions));
+
         checkAndRestoreConfigFile();
     }
 
+    private static void setValues(SuperRoot target, Config source) {
+        HoconConverter.hoconSource(source.root()).descend(target);
+    }

Review Comment:
   I think this method may be inlined



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java:
##########
@@ -40,114 +53,187 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.configuration.RootKey;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
 import org.apache.ignite.internal.configuration.NodeConfigCreateException;
 import org.apache.ignite.internal.configuration.NodeConfigWriteException;
+import org.apache.ignite.internal.configuration.RootInnerNode;
+import org.apache.ignite.internal.configuration.SuperRoot;
+import org.apache.ignite.internal.configuration.asm.ConfigurationAsmGenerator;
+import org.apache.ignite.internal.configuration.hocon.HoconConverter;
+import org.apache.ignite.internal.configuration.tree.ConverterToMapVisitor;
+import org.apache.ignite.internal.configuration.tree.InnerNode;
 import org.apache.ignite.internal.future.InFlightFutures;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Implementation of {@link ConfigurationStorage} based on local file configuration storage.
  */
 public class LocalFileConfigurationStorage implements ConfigurationStorage {
     private static final IgniteLogger LOG = Loggers.forClass(LocalFileConfigurationStorage.class);
 
-    /**
-     * Path to config file.
-     */
+    /** Path to config file. */
     private final Path configPath;
 
-    /**
-     * Path to temporary configuration storage.
-     */
+    /** Path to temporary configuration storage. */
     private final Path tempConfigPath;
 
+    /** R/W lock to guard the latest configuration and config file. */
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    /**
-     * Latest state of last applied configuration.
-     */
+    /** Latest state of last applied configuration. */
     private final Map<String, Serializable> latest = new ConcurrentHashMap<>();
 
-    /**
-     *  Configuration changes listener.
-     *  */
+    /** Configuration nodes generator. */
+    private final ConfigurationAsmGenerator cgen = new ConfigurationAsmGenerator();
+
+    /** Map of root keys that are needed to generate configuration tree. */
+    private final Map<String, RootKey<?, ?>> rootKeys;
+
+    /** Configuration changes listener. */
     private final AtomicReference<ConfigurationStorageListener> lsnrRef = new AtomicReference<>();
 
-    private final ExecutorService threadPool = Executors.newFixedThreadPool(2, new NamedThreadFactory("loc-cfg-file", LOG));
+    /** Thread pool for configuration updates notifications. */
+    private final ExecutorService threadPool = Executors.newFixedThreadPool(
+            2, new NamedThreadFactory("loc-cfg-file", LOG)
+    );
+
+    /** Thread pool for configuration updates notifications. */
+    private final ExecutorService workerThreadPool = Executors.newFixedThreadPool(
+            2, new NamedThreadFactory("cfg-file-worker", LOG)
+    );
 
+    /** Tracks all running futures. */
     private final InFlightFutures futureTracker = new InFlightFutures();
 
+    /** Last revision for configuration. */
     private long lastRevision = 0L;
 
+    /**
+     * Constructor without configuration extensions.
+     *
+     * @param configPath Path to node bootstrap configuration file.
+     * @param rootKeys Configuration root keys.
+     */
+    public LocalFileConfigurationStorage(Path configPath, Collection<RootKey<?, ?>> rootKeys) {
+        this(configPath, rootKeys, Collections.emptyList(), Collections.emptyList());
+    }
+
     /**
      * Constructor.
      *
      * @param configPath Path to node bootstrap configuration file.
+     * @param rootKeys Configuration root keys.
+     * @param internalSchemaExtensions Internal schema extensions.
+     * @param polymorphicSchemaExtensions Polymorphic schema extensions.
      */
-    public LocalFileConfigurationStorage(Path configPath) {
+    public LocalFileConfigurationStorage(Path configPath, Collection<RootKey<?, ?>> rootKeys,
+            Collection<Class<?>> internalSchemaExtensions, Collection<Class<?>> polymorphicSchemaExtensions) {
         this.configPath = configPath;
-        tempConfigPath = configPath.resolveSibling(configPath.getFileName() + ".tmp");
+        this.rootKeys = rootKeys.stream().collect(toMap(RootKey::key, identity()));
+        this.tempConfigPath = configPath.resolveSibling(configPath.getFileName() + ".tmp");
+
+        Map<Class<?>, Set<Class<?>>> internalExtensions = internalSchemaExtensions(internalSchemaExtensions);
+        Map<Class<?>, Set<Class<?>>> polymorphicExtensions = polymorphicSchemaExtensions(polymorphicSchemaExtensions);
+
+        rootKeys.forEach(key -> cgen.compileRootSchema(key.schemaClass(), internalExtensions, polymorphicExtensions));
+
         checkAndRestoreConfigFile();
     }

Review Comment:
   I think that you can introduce RootKeysProvider abstraction and encapsulate all logic about root keys. This constructor should have only provider here as parameter



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java:
##########
@@ -175,47 +261,66 @@ public CompletableFuture<Void> writeConfigurationRevision(long prevRevision, lon
 
     @Override
     public void close() {
+        IgniteUtils.shutdownAndAwaitTermination(workerThreadPool, 10, TimeUnit.SECONDS);
         IgniteUtils.shutdownAndAwaitTermination(threadPool, 10, TimeUnit.SECONDS);
 
         futureTracker.cancelInFlightFutures();
     }
 
-    private void saveValues(Map<String, ? extends Serializable> values) {
+    private void saveConfigFile() {
         try {
-            Files.write(tempConfigPath, renderHoconString(values).getBytes(StandardCharsets.UTF_8),
-                    StandardOpenOption.SYNC, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
-            Files.move(tempConfigPath, configPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+            Files.write(
+                    tempConfigPath,
+                    renderHoconString().getBytes(StandardCharsets.UTF_8),
+                    StandardOpenOption.SYNC, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING
+            );
+
+            Files.move(
+                    tempConfigPath,
+                    configPath,
+                    StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING
+            );
         } catch (IOException e) {
             throw new NodeConfigWriteException(
-                    "Failed to write values " + values + " to config file.", e);
+                    "Failed to write values to config file.", e);
         }
     }
 
     /**
      * Convert provided map to Hocon String representation.
      *
-     * @param values Values of configuration.
      * @return Configuration file string representation in HOCON format.
      */
-    private String renderHoconString(Map<String, ? extends Serializable> values) {
-        Map<String, Object> map = values.entrySet().stream().collect(Collectors.toMap(Entry::getKey, stringEntry -> {
-            Serializable value = stringEntry.getValue();
-            if (value.getClass().isArray()) {
-                return Arrays.asList((Object[]) value);
-            }
-            return value;
-        }));
-        Config other = ConfigFactory.parseMap(map);
-        Config newConfig = other.withFallback(parseConfigOptions()).resolve();
+    private String renderHoconString() {
+        // Super root that'll be filled from the storage data.
+        SuperRoot rootNode = new SuperRoot(rootCreator());
+
+        fillFromPrefixMap(rootNode, toPrefixMap(latest));
+
+        addDefaults(rootNode);
+
+        ConfigValue conf = ConfigImpl.fromAnyRef(HoconConverter.represent(rootNode, new ConverterToMapVisitor(false)), null);
+
+        Config newConfig = ((ConfigObject) conf).toConfig().resolve();
         return newConfig.isEmpty()
                 ? ""
                 : newConfig.root().render(ConfigRenderOptions.concise().setFormatted(true).setJson(false));
     }
 
+    private Function<String, RootInnerNode> rootCreator() {
+        return key -> {
+            RootKey<?, ?> rootKey = rootKeys.get(key);
+
+            return rootKey == null ? null : new RootInnerNode(rootKey, createRootNode(rootKey));
+        };
+    }
+
+    private InnerNode createRootNode(RootKey<?, ?> rootKey) {
+        return cgen.instantiateNode(rootKey.schemaClass());
+    }

Review Comment:
   As I understood correct this is copypasted code from ConfigurationRegistry. May you extract it to separate abstraction layer and avoid code duplicate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org