You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2022/03/15 14:28:50 UTC

[ignite] branch master updated: IGNITE-16687 .NET: Fix platform service with node filter cancellation on joining pure java node (#9891)

This is an automated email from the ASF dual-hosted git repository.

ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cd4c08  IGNITE-16687 .NET: Fix platform service with node filter cancellation on joining pure java node (#9891)
0cd4c08 is described below

commit 0cd4c084c68fc823b8e38019ebb693cdbd200f14
Author: Ivan Daschinskiy <iv...@apache.org>
AuthorDate: Tue Mar 15 17:23:29 2022 +0300

    IGNITE-16687 .NET: Fix platform service with node filter cancellation on joining pure java node (#9891)
---
 .../processors/platform/utils/PlatformUtils.java   | 22 ++++++
 .../processors/service/IgniteServiceProcessor.java | 14 ++--
 .../service/ServiceDeploymentActions.java          | 37 ++++++++--
 .../processors/service/ServiceDeploymentTask.java  |  2 +-
 .../platform/PlatformServiceCallPureJavaTask.java  | 81 ++++++++++++++++++++++
 .../Services/CallPlatformServiceTest.cs            | 56 ++++++++++++++-
 6 files changed, 197 insertions(+), 15 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 4aa7649..0e2955d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -63,10 +63,12 @@ import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.PlatformExtendedException;
 import org.apache.ignite.internal.processors.platform.PlatformNativeException;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServiceImpl;
 import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils;
 import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.service.LazyServiceConfiguration;
 import org.apache.ignite.internal.util.MutableSingletonList;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -76,6 +78,7 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.services.ServiceConfiguration;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PREFIX;
@@ -1342,6 +1345,25 @@ public class PlatformUtils {
     }
 
     /**
+     * Get service platform.
+     *
+     * @param svcCfg Service configuration.
+     * @return Service platform or empty string if this is a java service.
+     */
+    public static String servicePlatform(ServiceConfiguration svcCfg) {
+        if (svcCfg instanceof LazyServiceConfiguration) {
+            String svcClsName = ((LazyServiceConfiguration)svcCfg).serviceClassName();
+
+            if (PlatformDotNetServiceImpl.class.getName().equals(svcClsName))
+                return PLATFORM_DOTNET;
+        }
+        else if (svcCfg instanceof ServiceConfiguration && svcCfg.getService() instanceof PlatformDotNetServiceImpl)
+            return PLATFORM_DOTNET;
+
+        return "";
+    }
+
+    /**
      * Private constructor.
      */
     private PlatformUtils() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
index 4eaad94..edafbf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
@@ -1637,9 +1637,9 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni
         ServiceDeploymentActions depActions = null;
 
         if (!registeredServices.isEmpty()) {
-            depActions = new ServiceDeploymentActions();
+            depActions = new ServiceDeploymentActions(ctx);
 
-            depActions.servicesToDeploy(new HashMap<>(registeredServices));
+            depActions.servicesToDeploy(registeredServices);
         }
 
         depMgr.onLocalJoin(evt, discoCache, depActions);
@@ -1785,7 +1785,7 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni
         }
 
         if (!toDeploy.isEmpty() || !toUndeploy.isEmpty()) {
-            ServiceDeploymentActions depActions = new ServiceDeploymentActions();
+            ServiceDeploymentActions depActions = new ServiceDeploymentActions(ctx);
 
             if (!toDeploy.isEmpty())
                 depActions.servicesToDeploy(toDeploy);
@@ -1813,10 +1813,10 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni
         if (msg.activate() && registeredServices.isEmpty())
             return;
 
-        ServiceDeploymentActions depActions = new ServiceDeploymentActions();
+        ServiceDeploymentActions depActions = new ServiceDeploymentActions(ctx);
 
         if (msg.activate())
-            depActions.servicesToDeploy(new HashMap<>(registeredServices));
+            depActions.servicesToDeploy(registeredServices);
         else
             depActions.deactivate(true);
 
@@ -1846,7 +1846,7 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni
         }
 
         if (!toUndeploy.isEmpty()) {
-            ServiceDeploymentActions depActions = new ServiceDeploymentActions();
+            ServiceDeploymentActions depActions = new ServiceDeploymentActions(ctx);
 
             depActions.servicesToUndeploy(toUndeploy);
 
@@ -1890,7 +1890,7 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni
             servicesTopsUpdateMux.notifyAll();
         }
 
-        ServiceDeploymentActions depActions = new ServiceDeploymentActions();
+        ServiceDeploymentActions depActions = new ServiceDeploymentActions(ctx);
 
         depActions.deploymentTopologies(fullTops);
         depActions.deploymentErrors(fullErrors);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
index b33a646..64ef4c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
@@ -19,9 +19,13 @@ package org.apache.ignite.internal.processors.service;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.ServiceConfiguration;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -43,11 +47,36 @@ public class ServiceDeploymentActions {
     /** Services deployment errors. */
     private Map<IgniteUuid, Collection<byte[]>> depErrors;
 
+    /** Current platform */
+    private final String platform;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public ServiceDeploymentActions(GridKernalContext ctx) {
+        platform = ctx.platform().hasContext() ? ctx.platform().context().platform() : "";
+    }
+
     /**
      * @param servicesToDeploy Services info to deploy.
      */
     public void servicesToDeploy(@NotNull Map<IgniteUuid, ServiceInfo> servicesToDeploy) {
-        this.servicesToDeploy = servicesToDeploy;
+        Map<IgniteUuid, ServiceInfo> res = new HashMap<>();
+
+        for (Map.Entry<IgniteUuid, ServiceInfo> kv: servicesToDeploy.entrySet()) {
+            ServiceInfo dsc = kv.getValue();
+
+            ServiceConfiguration cfg = dsc.configuration();
+
+            String svcPlatform = PlatformUtils.servicePlatform(cfg);
+
+            if (!svcPlatform.isEmpty() && !platform.equals(svcPlatform))
+                continue;
+
+            res.put(kv.getKey(), dsc);
+        }
+
+        this.servicesToDeploy = Collections.unmodifiableMap(res);
     }
 
     /**
@@ -61,7 +90,7 @@ public class ServiceDeploymentActions {
      * @param servicesToUndeploy Services info to undeploy.
      */
     public void servicesToUndeploy(@NotNull Map<IgniteUuid, ServiceInfo> servicesToUndeploy) {
-        this.servicesToUndeploy = servicesToUndeploy;
+        this.servicesToUndeploy = Collections.unmodifiableMap(new HashMap<>(servicesToUndeploy));
     }
 
     /**
@@ -96,7 +125,7 @@ public class ServiceDeploymentActions {
      * @param depTops Deployment topologies.
      */
     public void deploymentTopologies(@NotNull Map<IgniteUuid, Map<UUID, Integer>> depTops) {
-        this.depTops = depTops;
+        this.depTops = Collections.unmodifiableMap(new HashMap<>(depTops));
     }
 
     /**
@@ -110,6 +139,6 @@ public class ServiceDeploymentActions {
      * @param depErrors Deployment errors.
      */
     public void deploymentErrors(@NotNull Map<IgniteUuid, Collection<byte[]>> depErrors) {
-        this.depErrors = depErrors;
+        this.depErrors = Collections.unmodifiableMap(new HashMap<>(depErrors));
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
index 29cd6ad..e33ee33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
@@ -242,7 +242,7 @@ class ServiceDeploymentTask {
                     return;
                 }
 
-                depActions = new ServiceDeploymentActions();
+                depActions = new ServiceDeploymentActions(ctx);
 
                 depActions.servicesToDeploy(toDeploy);
             }
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformServiceCallPureJavaTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformServiceCallPureJavaTask.java
new file mode 100644
index 0000000..a44a637
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformServiceCallPureJavaTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.platform;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.platform.services.PlatformService;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+
+/**
+ * Test invoke {@link PlatformService} methods with collections and arrays as arguments and return type from
+ * pure java node.
+ */
+public class PlatformServiceCallPureJavaTask extends AbstractPlatformServiceCallTask {
+    /** {@inheritDoc} */
+    @Override ComputeJobAdapter createJob(String svcName) {
+        return new PlatformServiceCallPureJavaTask.PlatformServiceCallPureJavaJob(svcName);
+    }
+
+    /** */
+    static class PlatformServiceCallPureJavaJob extends
+        PlatformServiceCallCollectionsTask.PlatformServiceCallCollectionsJob {
+        /** Pure java node. */
+        Ignite pureJavaIgnite;
+
+        /**
+         * @param srvcName Service name.
+         */
+        PlatformServiceCallPureJavaJob(String srvcName) {
+            super(srvcName);
+        }
+
+        /** {@inheritDoc} */
+        @Override TestPlatformService serviceProxy() {
+            return pureJavaIgnite.services().serviceProxy(srvcName, TestPlatformService.class, false);
+        }
+
+        /** {@inheritDoc} */
+        @Override void runTest() {
+            IgniteConfiguration currCfg = ignite.configuration();
+            TcpDiscoverySpi currSpi = (TcpDiscoverySpi)currCfg.getDiscoverySpi();
+            TcpDiscoveryVmIpFinder currIpFinder = (TcpDiscoveryVmIpFinder)currSpi.getIpFinder();
+
+            pureJavaIgnite = Ignition.start(new IgniteConfiguration()
+                .setIgniteInstanceName("pure-java-node")
+                .setDiscoverySpi(new TcpDiscoverySpi()
+                    .setIpFinder(currIpFinder)
+                    .setLocalAddress(currSpi.getLocalAddress())
+                    .setLocalPortRange(currSpi.getLocalPortRange())
+                )
+                .setLocalHost(currCfg.getLocalHost())
+                .setBinaryConfiguration(currCfg.getBinaryConfiguration())
+            );
+
+            try {
+                super.runTest();
+            }
+            finally {
+                U.close(pureJavaIgnite, ignite.log().getLogger(getClass()));
+            }
+        }
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/CallPlatformServiceTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/CallPlatformServiceTest.cs
index 1c52dfe..1ba025b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/CallPlatformServiceTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/CallPlatformServiceTest.cs
@@ -20,8 +20,10 @@ namespace Apache.Ignite.Core.Tests.Services
 {
     using System;
     using System.Collections;
+    using System.Collections.Generic;
     using System.Linq;
     using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Cluster;
     using Apache.Ignite.Core.Resource;
     using Apache.Ignite.Core.Services;
     using NUnit.Framework;
@@ -42,10 +44,20 @@ namespace Apache.Ignite.Core.Tests.Services
 
         /** */
         private const string CheckThinTaskName = "org.apache.ignite.platform.PlatformServiceCallThinTask";
+        
+        /** */
+        private const string NODE_TYPE_ATTR = "TYPE";
+        
+        /** */
+        private const string DOTNET_SRV_NODE_TYPE = "dotnet-srv";
 
         /** */
         private const string CheckCollectionsThinTaskName =
             "org.apache.ignite.platform.PlatformServiceCallCollectionsThinTask";
+        
+        /** */
+        private const string PlatformServiceCallPureJavaTask = 
+            "org.apache.ignite.platform.PlatformServiceCallPureJavaTask";
 
         /** */
         private readonly bool _useBinaryArray;
@@ -94,11 +106,15 @@ namespace Apache.Ignite.Core.Tests.Services
         /// in which real invocation of the service is made.
         /// <para/>
         /// <param name="local">If true call on local node.</param>
+        /// <param name="withNodeFilter">If true, deploy service with node filter.</param>
         /// <param name="taskName">Task to test.</param>
         /// </summary>
         [Test]
-        public void TestCallPlatformService([Values(true, false)] bool local,
-            [Values(CheckTaskName, CheckCollectionsTaskName, CheckThinTaskName, CheckCollectionsThinTaskName)]
+        public void TestCallPlatformService(
+            [Values(true, false)] bool local, 
+            [Values(true, false)] bool withNodeFilter,
+            [Values(CheckTaskName, CheckCollectionsTaskName, CheckThinTaskName, CheckCollectionsThinTaskName,
+                PlatformServiceCallPureJavaTask)]
             string taskName)
         {
             var cfg = new ServiceConfiguration
@@ -108,6 +124,9 @@ namespace Apache.Ignite.Core.Tests.Services
                 Service = new TestPlatformService()
             };
 
+            if (withNodeFilter)
+                cfg.NodeFilter = new NodeTypeFilter(DOTNET_SRV_NODE_TYPE);
+
             Grid1.GetServices().Deploy(cfg);
 
             Grid1.GetCompute().ExecuteJavaTask<object>(taskName, new object[] { ServiceName, local });
@@ -149,9 +168,40 @@ namespace Apache.Ignite.Core.Tests.Services
                 {
                     NameMapper = BinaryBasicNameMapper.SimpleNameInstance
                 },
-                LifecycleHandlers = _useBinaryArray ? new[] { new SetUseBinaryArray() } : null
+                LifecycleHandlers = _useBinaryArray ? new[] { new SetUseBinaryArray() } : null,
+                UserAttributes = new Dictionary<string, object> {{NODE_TYPE_ATTR, DOTNET_SRV_NODE_TYPE}}
             };
         }
+        
+        /// <summary>
+        /// Filter node by TYPE attribute.
+        /// </summary>
+        public class NodeTypeFilter : IClusterNodeFilter
+        {
+            /** */
+            private readonly string _type;
+
+            /// <summary>
+            /// Initializes a new instance of <see cref="NodeTypeFilter"/> class.
+            /// </summary>
+            /// <param name="type">Value of TYPE attribute to compare with.</param>
+            public NodeTypeFilter(string type)
+            {
+                _type= type;
+            }
+            
+            /** <inheritdoc /> */
+            public bool Invoke(IClusterNode node)
+            {
+                if (node.TryGetAttribute<string>(NODE_TYPE_ATTR, out var attr) 
+                    && string.Compare(attr, _type, true) == 0)
+                {
+                    return true;
+                }
+                
+                return false;
+            }
+        }
 
         /** */
         public interface ITestPlatformService : IService