You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2022/03/15 14:28:50 UTC
[ignite] branch master updated: IGNITE-16687 .NET: Fix platform service with node filter cancellation on joining pure java node (#9891)
This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 0cd4c08 IGNITE-16687 .NET: Fix platform service with node filter cancellation on joining pure java node (#9891)
0cd4c08 is described below
commit 0cd4c084c68fc823b8e38019ebb693cdbd200f14
Author: Ivan Daschinskiy <iv...@apache.org>
AuthorDate: Tue Mar 15 17:23:29 2022 +0300
IGNITE-16687 .NET: Fix platform service with node filter cancellation on joining pure java node (#9891)
---
.../processors/platform/utils/PlatformUtils.java | 22 ++++++
.../processors/service/IgniteServiceProcessor.java | 14 ++--
.../service/ServiceDeploymentActions.java | 37 ++++++++--
.../processors/service/ServiceDeploymentTask.java | 2 +-
.../platform/PlatformServiceCallPureJavaTask.java | 81 ++++++++++++++++++++++
.../Services/CallPlatformServiceTest.cs | 56 ++++++++++++++-
6 files changed, 197 insertions(+), 15 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 4aa7649..0e2955d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -63,10 +63,12 @@ import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformExtendedException;
import org.apache.ignite.internal.processors.platform.PlatformNativeException;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServiceImpl;
import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.service.LazyServiceConfiguration;
import org.apache.ignite.internal.util.MutableSingletonList;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -76,6 +78,7 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.services.ServiceConfiguration;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PREFIX;
@@ -1342,6 +1345,25 @@ public class PlatformUtils {
}
/**
+ * Get service platform.
+ *
+ * @param svcCfg Service configuration.
+ * @return Service platform or empty string if this is a java service.
+ */
+ public static String servicePlatform(ServiceConfiguration svcCfg) {
+ if (svcCfg instanceof LazyServiceConfiguration) {
+ String svcClsName = ((LazyServiceConfiguration)svcCfg).serviceClassName();
+
+ if (PlatformDotNetServiceImpl.class.getName().equals(svcClsName))
+ return PLATFORM_DOTNET;
+ }
+ else if (svcCfg instanceof ServiceConfiguration && svcCfg.getService() instanceof PlatformDotNetServiceImpl)
+ return PLATFORM_DOTNET;
+
+ return "";
+ }
+
+ /**
* Private constructor.
*/
private PlatformUtils() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
index 4eaad94..edafbf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
@@ -1637,9 +1637,9 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni
ServiceDeploymentActions depActions = null;
if (!registeredServices.isEmpty()) {
- depActions = new ServiceDeploymentActions();
+ depActions = new ServiceDeploymentActions(ctx);
- depActions.servicesToDeploy(new HashMap<>(registeredServices));
+ depActions.servicesToDeploy(registeredServices);
}
depMgr.onLocalJoin(evt, discoCache, depActions);
@@ -1785,7 +1785,7 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni
}
if (!toDeploy.isEmpty() || !toUndeploy.isEmpty()) {
- ServiceDeploymentActions depActions = new ServiceDeploymentActions();
+ ServiceDeploymentActions depActions = new ServiceDeploymentActions(ctx);
if (!toDeploy.isEmpty())
depActions.servicesToDeploy(toDeploy);
@@ -1813,10 +1813,10 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni
if (msg.activate() && registeredServices.isEmpty())
return;
- ServiceDeploymentActions depActions = new ServiceDeploymentActions();
+ ServiceDeploymentActions depActions = new ServiceDeploymentActions(ctx);
if (msg.activate())
- depActions.servicesToDeploy(new HashMap<>(registeredServices));
+ depActions.servicesToDeploy(registeredServices);
else
depActions.deactivate(true);
@@ -1846,7 +1846,7 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni
}
if (!toUndeploy.isEmpty()) {
- ServiceDeploymentActions depActions = new ServiceDeploymentActions();
+ ServiceDeploymentActions depActions = new ServiceDeploymentActions(ctx);
depActions.servicesToUndeploy(toUndeploy);
@@ -1890,7 +1890,7 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni
servicesTopsUpdateMux.notifyAll();
}
- ServiceDeploymentActions depActions = new ServiceDeploymentActions();
+ ServiceDeploymentActions depActions = new ServiceDeploymentActions(ctx);
depActions.deploymentTopologies(fullTops);
depActions.deploymentErrors(fullErrors);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
index b33a646..64ef4c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java
@@ -19,9 +19,13 @@ package org.apache.ignite.internal.processors.service;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.ServiceConfiguration;
import org.jetbrains.annotations.NotNull;
/**
@@ -43,11 +47,36 @@ public class ServiceDeploymentActions {
/** Services deployment errors. */
private Map<IgniteUuid, Collection<byte[]>> depErrors;
+ /** Current platform */
+ private final String platform;
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public ServiceDeploymentActions(GridKernalContext ctx) {
+ platform = ctx.platform().hasContext() ? ctx.platform().context().platform() : "";
+ }
+
/**
* @param servicesToDeploy Services info to deploy.
*/
public void servicesToDeploy(@NotNull Map<IgniteUuid, ServiceInfo> servicesToDeploy) {
- this.servicesToDeploy = servicesToDeploy;
+ Map<IgniteUuid, ServiceInfo> res = new HashMap<>();
+
+ for (Map.Entry<IgniteUuid, ServiceInfo> kv: servicesToDeploy.entrySet()) {
+ ServiceInfo dsc = kv.getValue();
+
+ ServiceConfiguration cfg = dsc.configuration();
+
+ String svcPlatform = PlatformUtils.servicePlatform(cfg);
+
+ if (!svcPlatform.isEmpty() && !platform.equals(svcPlatform))
+ continue;
+
+ res.put(kv.getKey(), dsc);
+ }
+
+ this.servicesToDeploy = Collections.unmodifiableMap(res);
}
/**
@@ -61,7 +90,7 @@ public class ServiceDeploymentActions {
* @param servicesToUndeploy Services info to undeploy.
*/
public void servicesToUndeploy(@NotNull Map<IgniteUuid, ServiceInfo> servicesToUndeploy) {
- this.servicesToUndeploy = servicesToUndeploy;
+ this.servicesToUndeploy = Collections.unmodifiableMap(new HashMap<>(servicesToUndeploy));
}
/**
@@ -96,7 +125,7 @@ public class ServiceDeploymentActions {
* @param depTops Deployment topologies.
*/
public void deploymentTopologies(@NotNull Map<IgniteUuid, Map<UUID, Integer>> depTops) {
- this.depTops = depTops;
+ this.depTops = Collections.unmodifiableMap(new HashMap<>(depTops));
}
/**
@@ -110,6 +139,6 @@ public class ServiceDeploymentActions {
* @param depErrors Deployment errors.
*/
public void deploymentErrors(@NotNull Map<IgniteUuid, Collection<byte[]>> depErrors) {
- this.depErrors = depErrors;
+ this.depErrors = Collections.unmodifiableMap(new HashMap<>(depErrors));
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
index 29cd6ad..e33ee33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
@@ -242,7 +242,7 @@ class ServiceDeploymentTask {
return;
}
- depActions = new ServiceDeploymentActions();
+ depActions = new ServiceDeploymentActions(ctx);
depActions.servicesToDeploy(toDeploy);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformServiceCallPureJavaTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformServiceCallPureJavaTask.java
new file mode 100644
index 0000000..a44a637
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformServiceCallPureJavaTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.platform;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.platform.services.PlatformService;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+
+/**
+ * Test invoke {@link PlatformService} methods with collections and arrays as arguments and return type from
+ * pure java node.
+ */
+public class PlatformServiceCallPureJavaTask extends AbstractPlatformServiceCallTask {
+ /** {@inheritDoc} */
+ @Override ComputeJobAdapter createJob(String svcName) {
+ return new PlatformServiceCallPureJavaTask.PlatformServiceCallPureJavaJob(svcName);
+ }
+
+ /** */
+ static class PlatformServiceCallPureJavaJob extends
+ PlatformServiceCallCollectionsTask.PlatformServiceCallCollectionsJob {
+ /** Pure java node. */
+ Ignite pureJavaIgnite;
+
+ /**
+ * @param srvcName Service name.
+ */
+ PlatformServiceCallPureJavaJob(String srvcName) {
+ super(srvcName);
+ }
+
+ /** {@inheritDoc} */
+ @Override TestPlatformService serviceProxy() {
+ return pureJavaIgnite.services().serviceProxy(srvcName, TestPlatformService.class, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override void runTest() {
+ IgniteConfiguration currCfg = ignite.configuration();
+ TcpDiscoverySpi currSpi = (TcpDiscoverySpi)currCfg.getDiscoverySpi();
+ TcpDiscoveryVmIpFinder currIpFinder = (TcpDiscoveryVmIpFinder)currSpi.getIpFinder();
+
+ pureJavaIgnite = Ignition.start(new IgniteConfiguration()
+ .setIgniteInstanceName("pure-java-node")
+ .setDiscoverySpi(new TcpDiscoverySpi()
+ .setIpFinder(currIpFinder)
+ .setLocalAddress(currSpi.getLocalAddress())
+ .setLocalPortRange(currSpi.getLocalPortRange())
+ )
+ .setLocalHost(currCfg.getLocalHost())
+ .setBinaryConfiguration(currCfg.getBinaryConfiguration())
+ );
+
+ try {
+ super.runTest();
+ }
+ finally {
+ U.close(pureJavaIgnite, ignite.log().getLogger(getClass()));
+ }
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/CallPlatformServiceTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/CallPlatformServiceTest.cs
index 1c52dfe..1ba025b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/CallPlatformServiceTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/CallPlatformServiceTest.cs
@@ -20,8 +20,10 @@ namespace Apache.Ignite.Core.Tests.Services
{
using System;
using System.Collections;
+ using System.Collections.Generic;
using System.Linq;
using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Resource;
using Apache.Ignite.Core.Services;
using NUnit.Framework;
@@ -42,10 +44,20 @@ namespace Apache.Ignite.Core.Tests.Services
/** */
private const string CheckThinTaskName = "org.apache.ignite.platform.PlatformServiceCallThinTask";
+
+ /** */
+ private const string NODE_TYPE_ATTR = "TYPE";
+
+ /** */
+ private const string DOTNET_SRV_NODE_TYPE = "dotnet-srv";
/** */
private const string CheckCollectionsThinTaskName =
"org.apache.ignite.platform.PlatformServiceCallCollectionsThinTask";
+
+ /** */
+ private const string PlatformServiceCallPureJavaTask =
+ "org.apache.ignite.platform.PlatformServiceCallPureJavaTask";
/** */
private readonly bool _useBinaryArray;
@@ -94,11 +106,15 @@ namespace Apache.Ignite.Core.Tests.Services
/// in which real invocation of the service is made.
/// <para/>
/// <param name="local">If true call on local node.</param>
+ /// <param name="withNodeFilter">If true, deploy service with node filter.</param>
/// <param name="taskName">Task to test.</param>
/// </summary>
[Test]
- public void TestCallPlatformService([Values(true, false)] bool local,
- [Values(CheckTaskName, CheckCollectionsTaskName, CheckThinTaskName, CheckCollectionsThinTaskName)]
+ public void TestCallPlatformService(
+ [Values(true, false)] bool local,
+ [Values(true, false)] bool withNodeFilter,
+ [Values(CheckTaskName, CheckCollectionsTaskName, CheckThinTaskName, CheckCollectionsThinTaskName,
+ PlatformServiceCallPureJavaTask)]
string taskName)
{
var cfg = new ServiceConfiguration
@@ -108,6 +124,9 @@ namespace Apache.Ignite.Core.Tests.Services
Service = new TestPlatformService()
};
+ if (withNodeFilter)
+ cfg.NodeFilter = new NodeTypeFilter(DOTNET_SRV_NODE_TYPE);
+
Grid1.GetServices().Deploy(cfg);
Grid1.GetCompute().ExecuteJavaTask<object>(taskName, new object[] { ServiceName, local });
@@ -149,9 +168,40 @@ namespace Apache.Ignite.Core.Tests.Services
{
NameMapper = BinaryBasicNameMapper.SimpleNameInstance
},
- LifecycleHandlers = _useBinaryArray ? new[] { new SetUseBinaryArray() } : null
+ LifecycleHandlers = _useBinaryArray ? new[] { new SetUseBinaryArray() } : null,
+ UserAttributes = new Dictionary<string, object> {{NODE_TYPE_ATTR, DOTNET_SRV_NODE_TYPE}}
};
}
+
+ /// <summary>
+ /// Filter node by TYPE attribute.
+ /// </summary>
+ public class NodeTypeFilter : IClusterNodeFilter
+ {
+ /** */
+ private readonly string _type;
+
+ /// <summary>
+ /// Initializes a new instance of <see cref="NodeTypeFilter"/> class.
+ /// </summary>
+ /// <param name="type">Value of TYPE attribute to compare with.</param>
+ public NodeTypeFilter(string type)
+ {
+ _type= type;
+ }
+
+ /** <inheritdoc /> */
+ public bool Invoke(IClusterNode node)
+ {
+ if (node.TryGetAttribute<string>(NODE_TYPE_ATTR, out var attr)
+ && string.Compare(attr, _type, true) == 0)
+ {
+ return true;
+ }
+
+ return false;
+ }
+ }
/** */
public interface ITestPlatformService : IService