You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "valepakh (via GitHub)" <gi...@apache.org> on 2023/05/29 10:42:27 UTC

[GitHub] [ignite-3] valepakh commented on a diff in pull request #2102: IGNITE-19476 On demand deploy units API

valepakh commented on code in PR #2102:
URL: https://github.com/apache/ignite-3/pull/2102#discussion_r1209099872


##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java:
##########
@@ -103,27 +104,23 @@ public void subscribe() {
     }
 
     /**
-     * Start deployment process to all nodes from CMG group.
+     * Download deployment unit content from randomly selected node.
      *
      * @param id Deployment unit identifier.
      * @param version Deployment unit version.
-     * @param unitContent Deployment unit file names and content.
-     * @param nodeId Node consistent identifier.
-     * @return Future with deployment result.
+     * @param nodes Nodes where

Review Comment:
   `Nodes where` what?



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java:
##########
@@ -117,15 +136,16 @@ default CompletableFuture<Boolean> createNodeStatus(String id, Version version,
      * @param status New deployment status.
      * @return Future with {@code true} result if status updated successfully.
      */
-    CompletableFuture<Boolean> updateNodeStatus(String id, Version version, String nodeId, DeploymentStatus status);
+    CompletableFuture<Boolean> updateNodeStatus(String nodeId, String id, Version version, DeploymentStatus status);
 
     /**
-     * Returns cluster statuses of all deployment units which deployed on provided node.
+     * Returns all nodes list where deployed unit with provided identifier and version.
      *
-     * @param nodeId Node consistent identifier.
-     * @return Cluster statuses of all deployment units which deployed on provided node.
+     * @param id Deployment unit identifier.
+     * @param version Deployment unit version.
+     * @return All nodes list where deployed unit with provided identifier and version or empty list.

Review Comment:
   ```suggestion
        * @return A list of nodes where unit with provided identifier and version is deployed or empty list.
   ```



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore.status;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.ignite.internal.deployunit.UnitStatus;
+import org.apache.ignite.internal.deployunit.version.Version;
+import org.apache.ignite.internal.rest.api.deployment.DeploymentStatus;
+
+/**
+ * Deployment unit cluster status.
+ */
+public class UnitClusterStatus extends UnitStatus {
+    private final Set<String> initialNodesToDeploy;
+
+    /**
+     * Constructor.
+     *
+     * @param id Unit identifier.
+     * @param version Unit version.
+     * @param status Unit status.
+     * @param initialNodesToDeploy Nodes required for initial deploy.
+     */
+    public UnitClusterStatus(String id, Version version, DeploymentStatus status, Set<String> initialNodesToDeploy) {
+        super(id, version, status);
+        this.initialNodesToDeploy = Collections.unmodifiableSet(initialNodesToDeploy);
+    }
+
+    public Set<String> initialNodesToDeploy() {
+        return initialNodesToDeploy;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+
+        UnitClusterStatus status = (UnitClusterStatus) o;
+
+        return initialNodesToDeploy != null ? initialNodesToDeploy.equals(status.initialNodesToDeploy)
+                : status.initialNodesToDeploy == null;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + (initialNodesToDeploy != null ? initialNodesToDeploy.hashCode() : 0);
+        return result;
+    }
+
+    public static byte[] serialize(UnitClusterStatus status) {
+        return SerializeUtils.serialize(status.id(), status.version(), status.status(), status.initialNodesToDeploy);
+    }
+
+    /**
+     * Deserialize method.
+     *
+     * @param value Serialized deployment unit cluster status.
+     * @return Deserialized deployment unit cluster status.
+     */
+    public static UnitClusterStatus deserialize(byte[] value) {
+        if (value == null || value.length == 0) {
+            return new UnitClusterStatus(null, null, null, null);
+        }
+
+        String[] values = SerializeUtils.deserialize(value);
+
+        String id = values.length > 0 ? SerializeUtils.decode(values[0]) : null;
+        Version version = values.length > 1 ? Version.parseVersion(SerializeUtils.decode(values[1])) : null;
+        DeploymentStatus status = values.length > 2 ? DeploymentStatus.valueOf(SerializeUtils.decode(values[2])) : null;
+        Set<String> nodes = values.length > 3 ? SerializeUtils.decodeAsSet(values[3]) : Collections.emptySet();

Review Comment:
   ```suggestion
           Set<String> nodes = values.length > 3 ? SerializeUtils.decodeAsSet(values[3]) : Set.of();
   ```



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java:
##########
@@ -111,10 +115,38 @@ public DeploymentManagerImpl(ClusterService clusterService,
         this.configuration = configuration;
         this.cmgManager = cmgManager;
         this.workDir = workDir;
-        this.tracker = new DeployTracker();
-        metastore = new DeploymentUnitStoreImpl(metaStorage);
+        tracker = new DeployTracker();
         deployer = new FileDeployerService();
         messaging = new DeployMessagingService(clusterService, cmgManager, deployer, tracker);
+        metastore = new DeploymentUnitStoreImpl(metaStorage,

Review Comment:
   Let's rename `metastore` field to something like `deploymentUnitStore`?



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java:
##########
@@ -113,4 +117,34 @@ public CompletableFuture<Boolean> undeploy(String id, String version) {
             }
         }, executor);
     }
+
+    /**
+     * Read from local FileSystem and returns deployment unit content.
+     *
+     * @param id Deployment unit identifier.
+     * @param version Deployment unit version.
+     * @return Deployment unit content.
+     */
+    public CompletableFuture<UnitContent> getUnitContent(String id, String version) {
+        return CompletableFuture.supplyAsync(() -> {
+            Map<String, byte[]> result = new HashMap<>();
+            try {
+                Path unitPath = unitsFolder
+                        .resolve(id)
+                        .resolve(version);
+
+                Files.walkFileTree(unitPath, new SimpleFileVisitor<>() {
+                    @Override
+                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+                        result.put(file.getFileName().toString(), Files.readAllBytes(file));
+                        return FileVisitResult.CONTINUE;
+                    }
+                });
+
+            } catch (IOException e) {
+                LOG.debug("Failed to undeploy unit " + id + ":" + version, e);

Review Comment:
   ```suggestion
                   LOG.debug("Failed to get content for unit " + id + ":" + version, e);
   ```



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployCallRequest.java:
##########
@@ -17,15 +17,15 @@
 
 package org.apache.ignite.internal.deployunit.message;
 
-import java.util.Map;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * Deploy unit request.
  */
-@Transferable(DeployUnitMessageTypes.DEPLOY_UNIT_REQUEST)
-public interface DeployUnitRequest extends NetworkMessage {
+//TODO: Remove and add metastore listener.

Review Comment:
   The ticket should be created for this TODO



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java:
##########
@@ -117,15 +136,16 @@ default CompletableFuture<Boolean> createNodeStatus(String id, Version version,
      * @param status New deployment status.
      * @return Future with {@code true} result if status updated successfully.
      */
-    CompletableFuture<Boolean> updateNodeStatus(String id, Version version, String nodeId, DeploymentStatus status);
+    CompletableFuture<Boolean> updateNodeStatus(String nodeId, String id, Version version, DeploymentStatus status);
 
     /**
-     * Returns cluster statuses of all deployment units which deployed on provided node.
+     * Returns all nodes list where deployed unit with provided identifier and version.

Review Comment:
   ```suggestion
        * Returns a list of nodes where unit with provided identifier and version is deployed.
   ```



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java:
##########
@@ -18,131 +18,226 @@
 package org.apache.ignite.internal.deployunit.metastore;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitKey.allUnits;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitKey.clusterStatusKey;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitKey.nodeStatusKey;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitKey.nodes;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitMetaSerializer.deserialize;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitMetaSerializer.serialize;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.DEPLOYED;
 import static org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.UPLOADING;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Predicate;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import org.apache.ignite.internal.deployunit.UnitStatus;
-import org.apache.ignite.internal.deployunit.UnitStatuses;
+import org.apache.ignite.internal.deployunit.metastore.accumulator.ClusterStatusAccumulator;
+import org.apache.ignite.internal.deployunit.metastore.accumulator.KeyAccumulator;
+import org.apache.ignite.internal.deployunit.metastore.accumulator.NodeStatusAccumulator;
+import org.apache.ignite.internal.deployunit.metastore.status.ClusterStatusKey;
+import org.apache.ignite.internal.deployunit.metastore.status.NodeStatusKey;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
 import org.apache.ignite.internal.deployunit.version.Version;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Conditions;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.internal.rest.api.deployment.DeploymentStatus;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.lang.ByteArray;
 
 /**
  * Implementation of {@link DeploymentUnitStore} based on {@link MetaStorageManager}.
  */
 public class DeploymentUnitStoreImpl implements DeploymentUnitStore {
+    private static final IgniteLogger LOG = Loggers.forClass(DeploymentUnitStoreImpl.class);
+
     private final MetaStorageManager metaStorage;
 
-    public DeploymentUnitStoreImpl(MetaStorageManager metaStorage) {
+
+    private final ExecutorService executor = Executors.newFixedThreadPool(
+            4, new NamedThreadFactory("deployment", LOG));
+
+    /**
+     * Constructor.
+     *
+     * @param metaStorage Meta storage manager.
+     * @param localNodeProvider Cluster service.
+     * @param listener Node events listener.
+     */
+    public DeploymentUnitStoreImpl(MetaStorageManager metaStorage,
+            Supplier<String> localNodeProvider,
+            NodeEventListener listener
+    ) {
         this.metaStorage = metaStorage;
-    }
 
-    @Override
-    public CompletableFuture<UnitStatus> getClusterStatus(String id, Version version) {
-        return metaStorage.get(clusterStatusKey(id, version)).thenApply(entry -> {
-            byte[] value = entry.value();
-            if (value == null) {
-                return null;
+        metaStorage.registerPrefixWatch(NodeStatusKey.builder().build().toKey(), new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                for (EntryEvent e : event.entryEvents()) {
+                    Entry entry = e.newEntry();
+
+                    byte[] key = entry.key();
+                    byte[] value = entry.value();
+
+                    NodeStatusKey nodeStatusKey = NodeStatusKey.fromKey(key);
+
+                    if (!Objects.equals(localNodeProvider.get(), nodeStatusKey.nodeId())
+                            || value == null) {
+                        continue;
+                    }
+
+                    UnitNodeStatus nodeStatus = UnitNodeStatus.deserialize(value);
+
+                    CompletableFuture.supplyAsync(() -> nodeStatus, executor)
+                            .thenCompose(status1 -> getAllNodes(status1.id(), status1.version()))

Review Comment:
   ```suggestion
                               .thenComposeAsync(status -> getAllNodes(status.id(), status.version()), executor)
   ```



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java:
##########
@@ -18,131 +18,226 @@
 package org.apache.ignite.internal.deployunit.metastore;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitKey.allUnits;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitKey.clusterStatusKey;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitKey.nodeStatusKey;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitKey.nodes;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitMetaSerializer.deserialize;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitMetaSerializer.serialize;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.DEPLOYED;
 import static org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.UPLOADING;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Predicate;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import org.apache.ignite.internal.deployunit.UnitStatus;
-import org.apache.ignite.internal.deployunit.UnitStatuses;
+import org.apache.ignite.internal.deployunit.metastore.accumulator.ClusterStatusAccumulator;
+import org.apache.ignite.internal.deployunit.metastore.accumulator.KeyAccumulator;
+import org.apache.ignite.internal.deployunit.metastore.accumulator.NodeStatusAccumulator;
+import org.apache.ignite.internal.deployunit.metastore.status.ClusterStatusKey;
+import org.apache.ignite.internal.deployunit.metastore.status.NodeStatusKey;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
 import org.apache.ignite.internal.deployunit.version.Version;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Conditions;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.internal.rest.api.deployment.DeploymentStatus;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.lang.ByteArray;
 
 /**
  * Implementation of {@link DeploymentUnitStore} based on {@link MetaStorageManager}.
  */
 public class DeploymentUnitStoreImpl implements DeploymentUnitStore {
+    private static final IgniteLogger LOG = Loggers.forClass(DeploymentUnitStoreImpl.class);
+
     private final MetaStorageManager metaStorage;
 
-    public DeploymentUnitStoreImpl(MetaStorageManager metaStorage) {
+
+    private final ExecutorService executor = Executors.newFixedThreadPool(
+            4, new NamedThreadFactory("deployment", LOG));
+
+    /**
+     * Constructor.
+     *
+     * @param metaStorage Meta storage manager.
+     * @param localNodeProvider Cluster service.
+     * @param listener Node events listener.
+     */
+    public DeploymentUnitStoreImpl(MetaStorageManager metaStorage,
+            Supplier<String> localNodeProvider,
+            NodeEventListener listener
+    ) {
         this.metaStorage = metaStorage;
-    }
 
-    @Override
-    public CompletableFuture<UnitStatus> getClusterStatus(String id, Version version) {
-        return metaStorage.get(clusterStatusKey(id, version)).thenApply(entry -> {
-            byte[] value = entry.value();
-            if (value == null) {
-                return null;
+        metaStorage.registerPrefixWatch(NodeStatusKey.builder().build().toKey(), new WatchListener() {

Review Comment:
   I think this listener could be extracted to the separate class.



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java:
##########
@@ -18,131 +18,226 @@
 package org.apache.ignite.internal.deployunit.metastore;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitKey.allUnits;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitKey.clusterStatusKey;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitKey.nodeStatusKey;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitKey.nodes;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitMetaSerializer.deserialize;
-import static org.apache.ignite.internal.deployunit.metastore.key.UnitMetaSerializer.serialize;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.DEPLOYED;
 import static org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.UPLOADING;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Predicate;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import org.apache.ignite.internal.deployunit.UnitStatus;
-import org.apache.ignite.internal.deployunit.UnitStatuses;
+import org.apache.ignite.internal.deployunit.metastore.accumulator.ClusterStatusAccumulator;
+import org.apache.ignite.internal.deployunit.metastore.accumulator.KeyAccumulator;
+import org.apache.ignite.internal.deployunit.metastore.accumulator.NodeStatusAccumulator;
+import org.apache.ignite.internal.deployunit.metastore.status.ClusterStatusKey;
+import org.apache.ignite.internal.deployunit.metastore.status.NodeStatusKey;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
 import org.apache.ignite.internal.deployunit.version.Version;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Conditions;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.internal.rest.api.deployment.DeploymentStatus;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.lang.ByteArray;
 
 /**
  * Implementation of {@link DeploymentUnitStore} based on {@link MetaStorageManager}.
  */
 public class DeploymentUnitStoreImpl implements DeploymentUnitStore {
+    private static final IgniteLogger LOG = Loggers.forClass(DeploymentUnitStoreImpl.class);
+
     private final MetaStorageManager metaStorage;
 
-    public DeploymentUnitStoreImpl(MetaStorageManager metaStorage) {
+
+    private final ExecutorService executor = Executors.newFixedThreadPool(
+            4, new NamedThreadFactory("deployment", LOG));
+
+    /**
+     * Constructor.
+     *
+     * @param metaStorage Meta storage manager.
+     * @param localNodeProvider Cluster service.

Review Comment:
   ```suggestion
        * @param localNodeProvider Local node id provider.
   ```



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore.status;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.ignite.internal.deployunit.UnitStatus;
+import org.apache.ignite.internal.deployunit.version.Version;
+import org.apache.ignite.internal.rest.api.deployment.DeploymentStatus;
+
+/**
+ * Deployment unit cluster status.
+ */
+public class UnitClusterStatus extends UnitStatus {
+    private final Set<String> initialNodesToDeploy;
+
+    /**
+     * Constructor.
+     *
+     * @param id Unit identifier.
+     * @param version Unit version.
+     * @param status Unit status.
+     * @param initialNodesToDeploy Nodes required for initial deploy.
+     */
+    public UnitClusterStatus(String id, Version version, DeploymentStatus status, Set<String> initialNodesToDeploy) {
+        super(id, version, status);
+        this.initialNodesToDeploy = Collections.unmodifiableSet(initialNodesToDeploy);
+    }
+
+    public Set<String> initialNodesToDeploy() {
+        return initialNodesToDeploy;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+
+        UnitClusterStatus status = (UnitClusterStatus) o;
+
+        return initialNodesToDeploy != null ? initialNodesToDeploy.equals(status.initialNodesToDeploy)
+                : status.initialNodesToDeploy == null;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + (initialNodesToDeploy != null ? initialNodesToDeploy.hashCode() : 0);
+        return result;
+    }
+
+    public static byte[] serialize(UnitClusterStatus status) {
+        return SerializeUtils.serialize(status.id(), status.version(), status.status(), status.initialNodesToDeploy);
+    }
+
+    /**
+     * Deserialize method.
+     *
+     * @param value Serialized deployment unit cluster status.
+     * @return Deserialized deployment unit cluster status.
+     */
+    public static UnitClusterStatus deserialize(byte[] value) {
+        if (value == null || value.length == 0) {
+            return new UnitClusterStatus(null, null, null, null);

Review Comment:
   ```suggestion
               return new UnitClusterStatus(null, null, null, Set.of());
   ```



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/SerializeUtils.java:
##########
@@ -90,7 +101,11 @@ private static String encode(String s) {
         return new String(Base64.getEncoder().encode(s.getBytes(UTF_8)), UTF_8);
     }
 
-    private static String decode(String s) {
+    static String decode(String s) {
         return new String(Base64.getDecoder().decode(s), UTF_8);
     }
+
+    private static Collection<String> decode(Collection<String> collection) {

Review Comment:
   This method is unused.



##########
modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.deployment;
+
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.deployunit.FileDeployerService;
+import org.apache.ignite.internal.deployunit.UnitContent;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for {@link FileDeployerService}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class FileDeployerServiceTest {
+    private final FileDeployerService service = new FileDeployerService();
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Path file1;
+    private Path file2;
+    private Path file3;
+
+    @BeforeEach
+    public void setup() throws IOException {
+        service.initUnitsFolder(workDir);
+
+        file1 = workDir.resolve("file1");
+        file2 = workDir.resolve("file2");
+        file3 = workDir.resolve("file3");
+        IgniteUtils.fillDummyFile(file1, 1024);
+        IgniteUtils.fillDummyFile(file2, 1024);
+        IgniteUtils.fillDummyFile(file3, 1024);
+    }
+
+    @Test
+    public void test() throws IOException {
+        CompletableFuture<Boolean> de = service.deploy("id", "1.0.0", content());
+        assertThat(de, willBe(true));

Review Comment:
   ```suggestion
           CompletableFuture<Boolean> deployed = service.deploy("id", "1.0.0", content());
           assertThat(deployed, willBe(true));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org