You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by da...@apache.org on 2021/11/27 20:25:22 UTC
[ignite] branch master updated: IGNITE-15758 Remove legacy service grid implementation (#9577)
This is an automated email from the ASF dual-hosted git repository.
daradurvs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3993634 IGNITE-15758 Remove legacy service grid implementation (#9577)
3993634 is described below
commit 3993634d49d0119ae78b2a3e1ba711322a1e1ed4
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Sat Nov 27 23:24:36 2021 +0300
IGNITE-15758 Remove legacy service grid implementation (#9577)
---
.../org/apache/ignite/IgniteSystemProperties.java | 19 -
.../apache/ignite/internal/GridKernalContext.java | 4 +-
.../ignite/internal/GridKernalContextImpl.java | 11 +-
.../org/apache/ignite/internal/IgniteKernal.java | 27 +-
.../ignite/internal/IgniteNodeAttributes.java | 4 -
.../managers/discovery/GridDiscoveryManager.java | 19 -
.../processors/cache/GridCacheProcessor.java | 10 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 4 -
.../wal/reader/StandaloneGridKernalContext.java | 4 +-
.../cluster/GridClusterStateProcessor.java | 9 -
.../continuous/GridContinuousProcessor.java | 4 -
.../processors/service/GridServiceProcessor.java | 2143 --------------------
.../processors/service/GridServiceProxy.java | 2 +-
.../processors/service/IgniteServiceProcessor.java | 131 +-
.../service/ServiceProcessorAdapter.java | 172 --
.../ignite/spi/discovery/tcp/ServerImpl.java | 49 -
.../main/resources/META-INF/classnames.properties | 3 -
.../IgniteClientReconnectServicesTest.java | 54 -
.../GridDiscoveryManagerAttributesSelfTest.java | 12 -
.../GridCacheContinuousQueryAbstractSelfTest.java | 9 +-
...dServiceDeploymentExceptionPropagationTest.java | 8 -
.../GridServiceProcessorBatchDeploySelfTest.java | 4 -
.../service/GridServiceProcessorStopSelfTest.java | 7 +-
...GridServiceProxyTopologyInitializationTest.java | 10 -
.../IgniteServiceDynamicCachesSelfTest.java | 21 +-
.../service/IgniteServiceReassignmentTest.java | 138 --
...mentDiscoveryListenerNotificationOrderTest.java | 8 -
...mentNonSerializableStaticConfigurationTest.java | 8 -
.../service/ServiceDeploymentOnActivationTest.java | 8 -
.../ServiceDeploymentOnClientDisconnectTest.java | 8 -
.../ServiceDeploymentProcessAbstractTest.java | 8 -
...ServiceHotRedeploymentViaDeploymentSpiTest.java | 8 -
.../ServiceReassignmentFunctionSelfTest.java | 2 +-
.../junits/common/GridCommonAbstractTest.java | 10 -
.../mvcc/CacheMvccBasicContinuousQueryTest.java | 9 +-
35 files changed, 123 insertions(+), 2824 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 0aae6ae..d85ea8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -50,7 +50,6 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheManager.DFLT_JCACHE_DEFAULT_ISOLATED;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_USE_ASYNC_FILE_IO_FACTORY;
-import static org.apache.ignite.internal.IgniteKernal.DFLT_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
import static org.apache.ignite.internal.IgniteKernal.DFLT_LOG_CLASSPATH_CONTENT_ON_STARTUP;
import static org.apache.ignite.internal.IgniteKernal.DFLT_LONG_OPERATIONS_DUMP_TIMEOUT;
import static org.apache.ignite.internal.IgniteKernal.DFLT_PERIODIC_STARVATION_CHECK_FREQ;
@@ -1634,24 +1633,6 @@ public final class IgniteSystemProperties {
public static final String IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE = "IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE";
/**
- * Manages the type of the implementation of the service processor (implementation of the {@link IgniteServices}).
- * All nodes in the cluster must have the same value of this property.
- * <p/>
- * If the property is {@code true} then event-driven implementation of the service processor will be used.
- * <p/>
- * If the property is {@code false} then internal cache based implementation of service processor will be used.
- * <p/>
- * Default is {@code true}.
- */
- @SystemProperty(value = "Manages the type of the implementation of the service processor " +
- "(implementation of the IgniteServices). All nodes in the cluster must have the same value of this property. " +
- "If the property is true then event-driven implementation of the service processor will be used. If the " +
- "property is false then internal cache based implementation of service processor will be used",
- defaults = "" + DFLT_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED)
- public static final String IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED
- = "IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED";
-
- /**
* When set to {@code true}, cache metrics are not included into the discovery metrics update message (in this
* case message contains only cluster metrics). By default cache metrics are included into the message and
* calculated each time the message is sent.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 41b7f7d..05e45e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -69,7 +69,7 @@ import org.apache.ignite.internal.processors.rest.IgniteRestProcessor;
import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
-import org.apache.ignite.internal.processors.service.ServiceProcessorAdapter;
+import org.apache.ignite.internal.processors.service.IgniteServiceProcessor;
import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
@@ -268,7 +268,7 @@ public interface GridKernalContext extends Iterable<GridComponent> {
*
* @return Service processor.
*/
- public ServiceProcessorAdapter service();
+ public IgniteServiceProcessor service();
/**
* Gets port processor.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 3fb78b5..58ea039 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -93,7 +92,7 @@ import org.apache.ignite.internal.processors.rest.IgniteRestProcessor;
import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
-import org.apache.ignite.internal.processors.service.ServiceProcessorAdapter;
+import org.apache.ignite.internal.processors.service.IgniteServiceProcessor;
import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
@@ -235,7 +234,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** */
@GridToStringInclude
- private ServiceProcessorAdapter srvcProc;
+ private IgniteServiceProcessor srvcProc;
/** */
@GridToStringInclude
@@ -541,8 +540,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
portProc = (GridPortProcessor)comp;
else if (comp instanceof GridClosureProcessor)
closProc = (GridClosureProcessor)comp;
- else if (comp instanceof ServiceProcessorAdapter)
- srvcProc = (ServiceProcessorAdapter)comp;
+ else if (comp instanceof IgniteServiceProcessor)
+ srvcProc = (IgniteServiceProcessor)comp;
else if (comp instanceof IgniteScheduleProcessorAdapter)
scheduleProc = (IgniteScheduleProcessorAdapter)comp;
else if (comp instanceof GridSegmentationProcessor)
@@ -726,7 +725,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
- @Override public ServiceProcessorAdapter service() {
+ @Override public IgniteServiceProcessor service() {
return srvcProc;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 25324ac..9a15978 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -129,7 +129,6 @@ import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
import org.apache.ignite.internal.managers.tracing.GridTracingManager;
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.internal.processors.GridProcessor;
-import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
@@ -185,7 +184,6 @@ import org.apache.ignite.internal.processors.security.IgniteSecurity;
import org.apache.ignite.internal.processors.security.IgniteSecurityProcessor;
import org.apache.ignite.internal.processors.security.NoOpIgniteSecurityProcessor;
import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
-import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.processors.service.IgniteServiceProcessor;
import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
@@ -238,7 +236,6 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LOG_CLASSPATH_CONTENT_ON_STARTUP;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_ASCII;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
@@ -263,7 +260,6 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STORAGE_
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_FEATURES;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
@@ -407,9 +403,6 @@ public class IgniteKernal implements IgniteEx, Externalizable {
*/
public static final long DFLT_LONG_OPERATIONS_DUMP_TIMEOUT = 60_000L;
- /** @see IgniteSystemProperties#IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED */
- public static final boolean DFLT_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED = true;
-
/** @see IgniteSystemProperties#IGNITE_LOG_CLASSPATH_CONTENT_ON_STARTUP */
public static final boolean DFLT_LOG_CLASSPATH_CONTENT_ON_STARTUP = true;
@@ -1231,7 +1224,7 @@ public class IgniteKernal implements IgniteEx, Externalizable {
startProcessor(new IndexProcessor(ctx));
startProcessor(new GridQueryProcessor(ctx));
startProcessor(new ClientListenerProcessor(ctx));
- startProcessor(createServiceProcessor());
+ startProcessor(new IgniteServiceProcessor(ctx));
startProcessor(new GridTaskSessionProcessor(ctx));
startProcessor(new GridJobProcessor(ctx));
startProcessor(new GridTaskProcessor(ctx));
@@ -1584,21 +1577,6 @@ public class IgniteKernal implements IgniteEx, Externalizable {
}
/**
- * Creates service processor depend on {@link IgniteSystemProperties#IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED}.
- *
- * @return The service processor. See {@link IgniteServiceProcessor} for details.
- */
- private GridProcessorAdapter createServiceProcessor() {
- final boolean srvcProcMode = getBoolean(IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED,
- DFLT_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED);
-
- if (srvcProcMode)
- return new IgniteServiceProcessor(ctx);
-
- return new GridServiceProcessor(ctx);
- }
-
- /**
* Validates common configuration parameters.
*
* @param cfg Ignite configuration to validate.
@@ -1901,9 +1879,6 @@ public class IgniteKernal implements IgniteEx, Externalizable {
ctx.addNodeAttribute(e.getKey(), e.getValue());
}
}
-
- ctx.addNodeAttribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED,
- ctx.service() instanceof IgniteServiceProcessor);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 02e5146..8120dd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -199,10 +199,6 @@ public final class IgniteNodeAttributes {
/** Supported features. */
public static final String ATTR_IGNITE_FEATURES = ATTR_PREFIX + ".features";
- /** Ignite services processor mode. */
- public static final String ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED = ATTR_PREFIX +
- ".event.driven.service.processor.enabled";
-
/**
* Enforces singleton.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index d5cd169..14c830f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -151,7 +151,6 @@ import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SECURITY_COMPATIBILITY_MODE;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -168,7 +167,6 @@ import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_REGIONS_OFFHEAP_SIZE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2;
@@ -1301,7 +1299,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
boolean locDelayAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
- Boolean locSrvcProcMode = locNode.attribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED);
Boolean locSecurityCompatibilityEnabled = locNode.attribute(ATTR_SECURITY_COMPATIBILITY_MODE);
for (ClusterNode n : nodes) {
@@ -1387,22 +1384,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
", rmtAddrs=" + U.addressesAsString(n) + ", rmtNode=" + U.toShortString(n) + "]");
}
- Boolean rmtSrvcProcModeAttr = n.attribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED);
-
- final boolean rmtSrvcProcMode = rmtSrvcProcModeAttr != null ? rmtSrvcProcModeAttr : false;
-
- if (!F.eq(locSrvcProcMode, rmtSrvcProcMode)) {
- throw new IgniteCheckedException("Local node's " + IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED +
- " property value differs from remote node's value " +
- "(to make sure all nodes in topology have identical service processor mode, " +
- "configure system property explicitly) " +
- "[locSrvcProcMode=" + locSrvcProcMode +
- ", rmtSrvcProcMode=" + rmtSrvcProcMode +
- ", locNodeAddrs=" + U.addressesAsString(locNode) +
- ", rmtNodeAddrs=" + U.addressesAsString(n) +
- ", locNodeId=" + locNode.id() + ", rmtNode=" + U.toShortString(n) + "]");
- }
-
ShutdownPolicy rmtShutdownPolicy = n.attribute(ATTR_SHUTDOWN_POLICY) == null ? null :
ShutdownPolicy.fromOrdinal(n.attribute(ATTR_SHUTDOWN_POLICY));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 40a9a4f..c8f85c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -157,7 +157,6 @@ import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchang
import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
-import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.IgniteCollectors;
@@ -739,9 +738,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (!active)
return;
- if (ctx.service() instanceof GridServiceProcessor)
- ((GridServiceProcessor)ctx.service()).onUtilityCacheStarted();
-
awaitRebalance(joinVer).get();
}
@@ -2980,13 +2976,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (exchActions == null)
return;
- if (exchActions.systemCachesStarting() && exchActions.stateChangeRequest() == null) {
+ if (exchActions.systemCachesStarting() && exchActions.stateChangeRequest() == null)
ctx.dataStructures().restoreStructuresState(ctx);
- if (ctx.service() instanceof GridServiceProcessor)
- ((GridServiceProcessor)ctx.service()).updateUtilityCache();
- }
-
if (err == null)
processCacheStopRequestOnExchangeDone(exchActions);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0f7d891..7e70d7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -110,7 +110,6 @@ import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.security.SecurityContext;
-import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.processors.tracing.NoopSpan;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.SpanTags;
@@ -1381,9 +1380,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
try {
kctx.dataStructures().onDeActivate(kctx);
- if (cctx.kernalContext().service() instanceof GridServiceProcessor)
- ((GridServiceProcessor)kctx.service()).onDeActivate(cctx.kernalContext());
-
assert registerCachesFuture == null : "No caches registration should be scheduled before new caches have started.";
registerCachesFuture = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 19a7d41..1dc10e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -88,7 +88,7 @@ import org.apache.ignite.internal.processors.rest.IgniteRestProcessor;
import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
-import org.apache.ignite.internal.processors.service.GridServiceProcessor;
+import org.apache.ignite.internal.processors.service.IgniteServiceProcessor;
import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
@@ -373,7 +373,7 @@ public class StandaloneGridKernalContext implements GridKernalContext {
}
/** {@inheritDoc} */
- @Override public GridServiceProcessor service() {
+ @Override public IgniteServiceProcessor service() {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 967f8d7..4ef20a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -74,7 +74,6 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadW
import org.apache.ignite.internal.processors.cluster.baseline.autoadjust.BaselineAutoAdjustStatus;
import org.apache.ignite.internal.processors.cluster.baseline.autoadjust.BaselineTopologyUpdater;
import org.apache.ignite.internal.processors.configuration.distributed.DistributePropertyListener;
-import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
@@ -1458,14 +1457,6 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
boolean client = ctx.clientNode();
try {
- if (ctx.service() instanceof GridServiceProcessor) {
- GridServiceProcessor srvcProc = (GridServiceProcessor)ctx.service();
-
- srvcProc.onUtilityCacheStarted();
-
- srvcProc.onActivate(ctx);
- }
-
ctx.dataStructures().onActivate(ctx);
ctx.task().onActivate(ctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 1884a35..bd24293 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -71,7 +71,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
-import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -313,9 +312,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.cacheObjects().onContinuousProcessorStarted(ctx);
- if (ctx.service() instanceof GridServiceProcessor)
- ((GridServiceProcessor)ctx.service()).onContinuousProcessorStarted(ctx);
-
if (log.isDebugEnabled())
log.debug("Continuous processor started.");
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
deleted file mode 100644
index 0debe6a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ /dev/null
@@ -1,2143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.service;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import javax.cache.Cache;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryUpdatedListener;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterGroup;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJobContext;
-import org.apache.ignite.configuration.DeploymentMode;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.GridClosureCallMode;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.DiscoCache;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
-import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
-import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
-import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
-import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.query.CacheQuery;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
-import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
-import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
-import org.apache.ignite.internal.processors.platform.services.PlatformService;
-import org.apache.ignite.internal.processors.task.GridInternal;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
-import org.apache.ignite.internal.util.GridEmptyIterator;
-import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridCloseableIterator;
-import org.apache.ignite.internal.util.lang.GridPlainRunnable;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.lang.IgniteCallable;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.security.SecurityException;
-import org.apache.ignite.plugin.security.SecurityPermission;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.JobContextResource;
-import org.apache.ignite.resources.LoggerResource;
-import org.apache.ignite.services.Service;
-import org.apache.ignite.services.ServiceCallContext;
-import org.apache.ignite.services.ServiceConfiguration;
-import org.apache.ignite.services.ServiceContext;
-import org.apache.ignite.services.ServiceDeploymentException;
-import org.apache.ignite.services.ServiceDescriptor;
-import org.apache.ignite.thread.IgniteThreadFactory;
-import org.apache.ignite.thread.OomExceptionHandler;
-import org.apache.ignite.transactions.Transaction;
-import org.jetbrains.annotations.Nullable;
-
-import static javax.cache.event.EventType.REMOVED;
-import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
-import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
-/**
- * Grid service processor.
- * <p/>
- * Obsolete implementation of service processor, based on replicated system cache.
- * <p/>
- * NOTE: if you fix a bug in this class, please take a look in {@link IgniteServiceProcessor}, perhaps the class
- * contains a similar block of code which also should be updated.
- *
- * @see IgniteServiceProcessor
- * @deprecated Here is improved, but uncompatible implementation {@link IgniteServiceProcessor}, see IEP-17 for details.
- */
-@Deprecated
-@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions"})
-public class GridServiceProcessor extends ServiceProcessorAdapter implements IgniteChangeGlobalStateSupport {
- /** Time to wait before reassignment retries. */
- private static final long RETRY_TIMEOUT = 1000;
-
- /** */
- private static final int[] EVTS = {
- EventType.EVT_NODE_JOINED,
- EventType.EVT_NODE_LEFT,
- EventType.EVT_NODE_FAILED,
- DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT
- };
-
- /** Local service instances. */
- private final Map<String, Collection<ServiceContextImpl>> locSvcs = new HashMap<>();
-
- /** Deployment futures. */
- private final ConcurrentMap<String, GridServiceDeploymentFuture<String>> depFuts = new ConcurrentHashMap<>();
-
- /** Deployment futures. */
- private final ConcurrentMap<String, GridFutureAdapter<?>> undepFuts = new ConcurrentHashMap<>();
-
- /** Pending compute job contexts that waiting for utility cache initialization. */
- private final List<ComputeJobContext> pendingJobCtxs = new ArrayList<>(0);
-
- /** Deployment executor service. */
- private volatile ExecutorService depExe;
-
- /** Busy lock. */
- private volatile GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
- /** Uncaught exception handler for thread pools. */
- private final UncaughtExceptionHandler oomeHnd = new OomExceptionHandler(ctx);
-
- /** Thread factory. */
- private ThreadFactory threadFactory = new IgniteThreadFactory(ctx.igniteInstanceName(), "service",
- oomeHnd);
-
- /** Thread local for service name. */
- private ThreadLocal<String> svcName = new ThreadLocal<>();
-
- /** Service cache. */
- private volatile IgniteInternalCache<Object, Object> serviceCache;
-
- /** Topology listener. */
- private DiscoveryEventListener topLsnr = new TopologyListener();
-
- /** */
- private final CountDownLatch startLatch = new CountDownLatch(1);
-
- /**
- * @param ctx Kernal context.
- */
- public GridServiceProcessor(GridKernalContext ctx) {
- super(ctx);
-
- depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.igniteInstanceName(),
- "srvc-deploy", oomeHnd));
- }
-
- /**
- * @param ctx Context.
- * @throws IgniteCheckedException If failed.
- */
- public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
- if (ctx.clientNode()) {
- assert !ctx.isDaemon();
-
- ctx.continuous().registerStaticRoutine(
- CU.UTILITY_CACHE_NAME, new ServiceEntriesListener(), null, null
- );
- }
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- if (ctx.isDaemon())
- return;
-
- IgniteConfiguration cfg = ctx.config();
-
- DeploymentMode depMode = cfg.getDeploymentMode();
-
- if (cfg.isPeerClassLoadingEnabled() && (depMode == PRIVATE || depMode == ISOLATED) &&
- !F.isEmpty(cfg.getServiceConfiguration()))
- throw new IgniteCheckedException("Cannot deploy services in PRIVATE or ISOLATED deployment mode: " + depMode);
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
- if (ctx.isDaemon() || !active)
- return;
-
- onKernalStart0();
- }
-
- /**
- * Do kernal start.
- *
- * @throws IgniteCheckedException If failed.
- */
- private void onKernalStart0() throws IgniteCheckedException {
- if (!ctx.clientNode())
- ctx.event().addDiscoveryEventListener(topLsnr, EVTS);
-
- updateUtilityCache();
-
- startLatch.countDown();
-
- try {
- if (ctx.deploy().enabled())
- ctx.cache().context().deploy().ignoreOwnership(true);
-
- if (!ctx.clientNode()) {
- DiscoveryDataClusterState clusterState = ctx.state().clusterState();
-
- boolean isLocLsnr = !clusterState.hasBaselineTopology() ||
- CU.baselineNode(ctx.cluster().get().localNode(), clusterState);
-
- // Register query listener and run it for local entries, if data is available locally.
- // It is also invoked on rebalancing.
- // Otherwise remote listener is registered.
- serviceCache.context().continuousQueries().executeInternalQuery(
- new ServiceEntriesListener(), null, isLocLsnr, true, false, false
- );
- }
- else { // Listener for client nodes is registered in onContinuousProcessorStarted method.
- assert !ctx.isDaemon();
-
- ctx.closure().runLocalSafe(new GridPlainRunnable() {
- @Override public void run() {
- try {
- Iterable<CacheEntryEvent<?, ?>> entries =
- serviceCache.context().continuousQueries().existingEntries(false, null);
-
- onSystemCacheUpdated(entries);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to load service entries: " + e, e);
- }
- }
- });
- }
- }
- finally {
- if (ctx.deploy().enabled())
- ctx.cache().context().deploy().ignoreOwnership(false);
- }
-
- ServiceConfiguration[] cfgs = ctx.config().getServiceConfiguration();
-
- if (cfgs != null)
- deployAll(Arrays.asList(cfgs), ctx.cluster().get().forServers().predicate()).get();
-
- if (log.isDebugEnabled())
- log.debug("Started service processor.");
- }
-
- /**
- *
- */
- public void updateUtilityCache() {
- serviceCache = ctx.cache().utilityCache();
- }
-
- /**
- * @return Service cache.
- */
- private IgniteInternalCache<Object, Object> serviceCache() {
- if (serviceCache == null)
- U.awaitQuiet(startLatch);
-
- return serviceCache;
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- if (ctx.isDaemon())
- return;
-
- GridSpinBusyLock busyLock = this.busyLock;
-
- // Will not release it.
- if (busyLock != null) {
- busyLock.block();
-
- this.busyLock = null;
- }
-
- startLatch.countDown();
-
- U.shutdownNow(GridServiceProcessor.class, depExe, log);
-
- if (!ctx.clientNode())
- ctx.event().removeDiscoveryEventListener(topLsnr);
-
- Collection<ServiceContextImpl> ctxs = new ArrayList<>();
-
- synchronized (locSvcs) {
- for (Collection<ServiceContextImpl> ctxs0 : locSvcs.values())
- ctxs.addAll(ctxs0);
-
- locSvcs.clear();
- }
-
- for (ServiceContextImpl ctx : ctxs) {
- ctx.setCancelled(true);
-
- Service svc = ctx.service();
-
- if (svc != null)
- try {
- svc.cancel(ctx);
- }
- catch (Throwable e) {
- log.error("Failed to cancel service (ignoring) [name=" + ctx.name() +
- ", execId=" + ctx.executionId() + ']', e);
-
- if (e instanceof Error)
- throw e;
- }
-
- ctx.executor().shutdownNow();
- }
-
- for (ServiceContextImpl ctx : ctxs) {
- try {
- if (log.isInfoEnabled() && !ctxs.isEmpty())
- log.info("Shutting down distributed service [name=" + ctx.name() + ", execId8=" +
- U.id8(ctx.executionId()) + ']');
-
- ctx.executor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException ignore) {
- Thread.currentThread().interrupt();
-
- U.error(log, "Got interrupted while waiting for service to shutdown (will continue stopping node): " +
- ctx.name());
- }
- }
-
- Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
-
- cancelFutures(depFuts, err);
- cancelFutures(undepFuts, err);
-
- if (log.isDebugEnabled())
- log.debug("Stopped service processor.");
- }
-
- /** {@inheritDoc} */
- @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Activate service processor [nodeId=" + ctx.localNodeId() +
- " topVer=" + ctx.discovery().topologyVersionEx() + " ]");
-
- busyLock = new GridSpinBusyLock();
-
- depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.igniteInstanceName(),
- "srvc-deploy", oomeHnd));
-
- start();
-
- onKernalStart0();
- }
-
- /** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) {
- if (log.isDebugEnabled())
- log.debug("DeActivate service processor [nodeId=" + ctx.localNodeId() +
- " topVer=" + ctx.discovery().topologyVersionEx() + " ]");
-
- cancelFutures(depFuts, new IgniteCheckedException("Failed to deploy service, cluster in active."));
-
- cancelFutures(undepFuts, new IgniteCheckedException("Failed to undeploy service, cluster in active."));
-
- onKernalStop(true);
- }
-
- /** {@inheritDoc} */
- @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
- cancelFutures(depFuts, new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
- "Failed to deploy service, client node disconnected."));
-
- cancelFutures(undepFuts, new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
- "Failed to undeploy service, client node disconnected."));
- }
-
- /**
- * @param futs Futs.
- * @param err Exception.
- */
- private void cancelFutures(ConcurrentMap<String, ? extends GridFutureAdapter<?>> futs, Exception err) {
- for (Map.Entry<String, ? extends GridFutureAdapter<?>> entry : futs.entrySet()) {
- GridFutureAdapter fut = entry.getValue();
-
- fut.onDone(err);
-
- futs.remove(entry.getKey(), fut);
- }
- }
-
- /**
- * Validates service configuration.
- *
- * @param c Service configuration.
- * @throws IgniteException If validation failed.
- */
- private void validate(ServiceConfiguration c) throws IgniteException {
- IgniteConfiguration cfg = ctx.config();
-
- DeploymentMode depMode = cfg.getDeploymentMode();
-
- if (cfg.isPeerClassLoadingEnabled() && (depMode == PRIVATE || depMode == ISOLATED))
- throw new IgniteException("Cannot deploy services in PRIVATE or ISOLATED deployment mode: " + depMode);
-
- ensure(c.getName() != null, "getName() != null", null);
- ensure(c.getTotalCount() >= 0, "getTotalCount() >= 0", c.getTotalCount());
- ensure(c.getMaxPerNodeCount() >= 0, "getMaxPerNodeCount() >= 0", c.getMaxPerNodeCount());
- ensure(c.getService() != null, "getService() != null", c.getService());
- ensure(c.getTotalCount() > 0 || c.getMaxPerNodeCount() > 0,
- "c.getTotalCount() > 0 || c.getMaxPerNodeCount() > 0", null);
- }
-
- /**
- * @param cond Condition.
- * @param desc Description.
- * @param v Value.
- */
- private void ensure(boolean cond, String desc, @Nullable Object v) {
- if (!cond)
- if (v != null)
- throw new IgniteException("Service configuration check failed (" + desc + "): " + v);
- else
- throw new IgniteException("Service configuration check failed (" + desc + ")");
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> deployNodeSingleton(ClusterGroup prj, String name, Service srvc) {
- return deployMultiple(prj, name, srvc, 0, 1);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> deployClusterSingleton(ClusterGroup prj, String name, Service srvc) {
- return deployMultiple(prj, name, srvc, 1, 1);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> deployMultiple(ClusterGroup prj, String name, Service srvc, int totalCnt,
- int maxPerNodeCnt) {
- ServiceConfiguration cfg = new ServiceConfiguration();
-
- cfg.setName(name);
- cfg.setService(srvc);
- cfg.setTotalCount(totalCnt);
- cfg.setMaxPerNodeCount(maxPerNodeCnt);
-
- return deployAll(prj, Collections.singleton(cfg));
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service srvc, String cacheName,
- Object affKey) {
- A.notNull(affKey, "affKey");
-
- ServiceConfiguration cfg = new ServiceConfiguration();
-
- cfg.setName(name);
- cfg.setService(srvc);
- cfg.setCacheName(cacheName);
- cfg.setAffinityKey(affKey);
- cfg.setTotalCount(1);
- cfg.setMaxPerNodeCount(1);
-
- // Ignore projection here.
- return deployAll(Collections.singleton(cfg), null);
- }
-
- /**
- * @param cfgs Service configurations.
- * @param dfltNodeFilter Default NodeFilter.
- * @return Configurations to deploy.
- */
- private PreparedConfigurations<String> prepareServiceConfigurations(Collection<ServiceConfiguration> cfgs,
- IgnitePredicate<ClusterNode> dfltNodeFilter) {
- List<ServiceConfiguration> cfgsCp = new ArrayList<>(cfgs.size());
-
- Marshaller marsh = ctx.config().getMarshaller();
-
- List<GridServiceDeploymentFuture<String>> failedFuts = null;
-
- for (ServiceConfiguration cfg : cfgs) {
- Exception err = null;
-
- // Deploy to projection node by default
- // or only on server nodes if no projection .
- if (cfg.getNodeFilter() == null && dfltNodeFilter != null)
- cfg.setNodeFilter(dfltNodeFilter);
-
- try {
- validate(cfg);
- }
- catch (Exception e) {
- U.error(log, "Failed to validate service configuration [name=" + cfg.getName() +
- ", srvc=" + cfg.getService() + ']', e);
-
- err = e;
- }
-
- if (err == null) {
- try {
- ctx.security().authorize(cfg.getName(), SecurityPermission.SERVICE_DEPLOY);
- }
- catch (Exception e) {
- U.error(log, "Failed to authorize service creation [name=" + cfg.getName() +
- ", srvc=" + cfg.getService() + ']', e);
-
- err = e;
- }
- }
-
- if (err == null) {
- try {
- byte[] srvcBytes = U.marshal(marsh, cfg.getService());
-
- cfgsCp.add(new LazyServiceConfiguration(cfg, srvcBytes));
- }
- catch (Exception e) {
- U.error(log, "Failed to marshal service with configured marshaller [name=" + cfg.getName() +
- ", srvc=" + cfg.getService() + ", marsh=" + marsh + "]", e);
-
- err = e;
- }
- }
-
- if (err != null) {
- if (failedFuts == null)
- failedFuts = new ArrayList<>();
-
- GridServiceDeploymentFuture<String> fut = new GridServiceDeploymentFuture<>(cfg, cfg.getName());
-
- fut.onDone(err);
-
- failedFuts.add(fut);
- }
- }
-
- return new PreparedConfigurations<>(cfgsCp, failedFuts);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> deployAll(ClusterGroup prj, Collection<ServiceConfiguration> cfgs) {
- if (prj == null)
- // Deploy to servers by default if no projection specified.
- return deployAll(cfgs, ctx.cluster().get().forServers().predicate());
- else if (prj.predicate() == F.<ClusterNode>alwaysTrue())
- return deployAll(cfgs, null);
- else
- // Deploy to predicate nodes by default.
- return deployAll(cfgs, prj.predicate());
- }
-
- /**
- * @param cfgs Service configurations.
- * @param dfltNodeFilter Default NodeFilter.
- * @return Future for deployment.
- */
- private IgniteInternalFuture<?> deployAll(Collection<ServiceConfiguration> cfgs,
- @Nullable IgnitePredicate<ClusterNode> dfltNodeFilter) {
- assert cfgs != null;
-
- PreparedConfigurations<String> srvCfg = prepareServiceConfigurations(cfgs, dfltNodeFilter);
-
- List<ServiceConfiguration> cfgsCp = srvCfg.cfgs;
-
- List<GridServiceDeploymentFuture<String>> failedFuts = srvCfg.failedFuts;
-
- Collections.sort(cfgsCp, new Comparator<ServiceConfiguration>() {
- @Override public int compare(ServiceConfiguration cfg1, ServiceConfiguration cfg2) {
- return cfg1.getName().compareTo(cfg2.getName());
- }
- });
-
- GridServiceDeploymentCompoundFuture<String> res;
-
- while (true) {
- res = new GridServiceDeploymentCompoundFuture<>();
-
- if (ctx.deploy().enabled())
- ctx.cache().context().deploy().ignoreOwnership(true);
-
- try {
- if (cfgsCp.size() == 1)
- writeServiceToCache(res, cfgsCp.get(0));
- else if (cfgsCp.size() > 1) {
- try (Transaction tx = serviceCache().txStart(PESSIMISTIC, READ_COMMITTED)) {
- for (ServiceConfiguration cfg : cfgsCp) {
- try {
- writeServiceToCache(res, cfg);
- }
- catch (IgniteCheckedException e) {
- if (X.hasCause(e, ClusterTopologyCheckedException.class))
- throw e; // Retry.
- else
- U.error(log, e.getMessage());
- }
- }
-
- tx.commit();
- }
- }
-
- break;
- }
- catch (IgniteException | IgniteCheckedException e) {
- for (String name : res.servicesToRollback()) {
- GridServiceDeploymentFuture<String> fut;
-
- if ((fut = depFuts.remove(name)) != null)
- fut.onDone(e);
- }
-
- if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
- if (log.isDebugEnabled())
- log.debug("Topology changed while deploying services (will retry): " + e.getMessage());
- }
- else {
- res.onDone(new IgniteCheckedException(
- new ServiceDeploymentException("Failed to deploy provided services.", e, cfgs)));
-
- return res;
- }
- }
- finally {
- if (ctx.deploy().enabled())
- ctx.cache().context().deploy().ignoreOwnership(false);
- }
- }
-
- if (ctx.clientDisconnected()) {
- IgniteClientDisconnectedCheckedException err =
- new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
- "Failed to deploy services, client node disconnected: " + cfgs);
-
- for (String name : res.servicesToRollback()) {
- GridServiceDeploymentFuture<String> fut = depFuts.remove(name);
-
- if (fut != null)
- fut.onDone(err);
- }
-
- return new GridFinishedFuture<>(err);
- }
-
- if (failedFuts != null) {
- for (GridServiceDeploymentFuture<String> fut : failedFuts)
- res.add(fut, false);
- }
-
- res.markInitialized();
-
- return res;
- }
-
- /**
- * @param res Resulting compound future.
- * @param cfg Service configuration.
- * @throws IgniteCheckedException If operation failed.
- */
- private void writeServiceToCache(GridServiceDeploymentCompoundFuture<String> res, ServiceConfiguration cfg)
- throws IgniteCheckedException {
- String name = cfg.getName();
-
- GridServiceDeploymentFuture<String> fut = new GridServiceDeploymentFuture<>(cfg, name);
-
- GridServiceDeploymentFuture<String> old = depFuts.putIfAbsent(name, fut);
-
- try {
- if (old != null) {
- if (!old.configuration().equalsIgnoreNodeFilter(cfg))
- throw new IgniteCheckedException("Failed to deploy service (service already exists with different " +
- "configuration) [deployed=" + old.configuration() + ", new=" + cfg + ']');
- else {
- res.add(old, false);
-
- return;
- }
- }
-
- GridServiceDeploymentKey key = new GridServiceDeploymentKey(name);
-
- GridServiceDeployment dep = (GridServiceDeployment)serviceCache().getAndPutIfAbsent(key,
- new GridServiceDeployment(ctx.localNodeId(), cfg));
-
- if (dep != null) {
- if (!dep.configuration().equalsIgnoreNodeFilter(cfg)) {
- throw new IgniteCheckedException("Failed to deploy service (service already exists with " +
- "different configuration) [deployed=" + dep.configuration() + ", new=" + cfg + ']');
- }
- else {
- res.add(fut, false);
-
- Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceAssignmentsPredicate.INSTANCE);
-
- while (it.hasNext()) {
- Cache.Entry<Object, Object> e = it.next();
-
- GridServiceAssignments assigns = (GridServiceAssignments)e.getValue();
-
- if (assigns.name().equals(name)) {
- fut.onDone();
-
- depFuts.remove(name, fut);
-
- break;
- }
- }
- }
- }
- else
- res.add(fut, true);
- }
- catch (IgniteCheckedException e) {
- fut.onDone(e);
-
- res.add(fut, false);
-
- depFuts.remove(name, fut);
-
- throw e;
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> cancel(String name) {
- while (true) {
- try {
- return removeServiceFromCache(name).fut;
- }
- catch (IgniteException | IgniteCheckedException e) {
- if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
- if (log.isDebugEnabled())
- log.debug("Topology changed while cancelling service (will retry): " + e.getMessage());
- }
- else {
- U.error(log, "Failed to undeploy service: " + name, e);
-
- return new GridFinishedFuture<>(e);
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> cancelAll() {
- Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE);
-
- List<String> svcNames = new ArrayList<>();
-
- while (it.hasNext()) {
- GridServiceDeployment dep = (GridServiceDeployment)it.next().getValue();
-
- svcNames.add(dep.configuration().getName());
- }
-
- return cancelAll(svcNames);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public IgniteInternalFuture<?> cancelAll(Collection<String> servicesNames) {
- List<String> svcNamesCp = new ArrayList<>(servicesNames);
-
- Collections.sort(svcNamesCp);
-
- GridCompoundFuture res;
-
- while (true) {
- res = null;
-
- List<String> toRollback = new ArrayList<>();
-
- try (Transaction tx = serviceCache().txStart(PESSIMISTIC, READ_COMMITTED)) {
- for (String name : servicesNames) {
- if (res == null)
- res = new GridCompoundFuture<>();
-
- try {
- CancelResult cr = removeServiceFromCache(name);
-
- if (cr.rollback)
- toRollback.add(name);
-
- res.add(cr.fut);
- }
- catch (IgniteException | IgniteCheckedException e) {
- if (X.hasCause(e, ClusterTopologyCheckedException.class))
- throw e; // Retry.
- else {
- U.error(log, "Failed to undeploy service: " + name, e);
-
- res.add(new GridFinishedFuture<>(e));
- }
- }
- }
-
- tx.commit();
-
- break;
- }
- catch (IgniteException | IgniteCheckedException e) {
- for (String name : toRollback)
- undepFuts.remove(name).onDone(e);
-
- if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
- if (log.isDebugEnabled())
- log.debug("Topology changed while cancelling service (will retry): " + e.getMessage());
- }
- else
- return new GridFinishedFuture<>(e);
- }
- }
-
- if (res != null) {
- res.markInitialized();
-
- return res;
- }
- else
- return new GridFinishedFuture<>();
- }
-
- /**
- * @param name Name of service to remove from internal cache.
- * @return Cancellation future and a flag whether it should be completed and removed on error.
- * @throws IgniteCheckedException If operation failed.
- */
- private CancelResult removeServiceFromCache(String name) throws IgniteCheckedException {
- try {
- ctx.security().authorize(name, SecurityPermission.SERVICE_CANCEL);
- }
- catch (SecurityException e) {
- return new CancelResult(new GridFinishedFuture<>(e), false);
- }
-
- GridFutureAdapter<?> fut = new GridFutureAdapter<>();
-
- GridFutureAdapter<?> old = undepFuts.putIfAbsent(name, fut);
-
- if (old != null)
- return new CancelResult(old, false);
- else {
- GridServiceDeploymentKey key = new GridServiceDeploymentKey(name);
-
- try {
- if (serviceCache().getAndRemove(key) == null) {
- // Remove future from local map if service was not deployed.
- undepFuts.remove(name, fut);
-
- fut.onDone();
-
- return new CancelResult(fut, false);
- }
- else
- return new CancelResult(fut, true);
- }
- catch (IgniteCheckedException e) {
- undepFuts.remove(name, fut);
-
- fut.onDone(e);
-
- throw e;
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException {
- IgniteInternalCache<Object, Object> cache = serviceCache();
-
- ClusterNode node = cache.affinity().mapKeyToNode(name);
-
- final ServiceTopologyCallable call = new ServiceTopologyCallable(name);
-
- return ctx.closure().callAsyncNoFailover(
- GridClosureCallMode.BROADCAST,
- call,
- Collections.singletonList(node),
- false,
- timeout,
- true).get();
- }
-
- /**
- * @param cache Utility cache.
- * @param svcName Service name.
- * @return Service topology.
- * @throws IgniteCheckedException In case of error.
- */
- private static Map<UUID, Integer> serviceTopology(IgniteInternalCache<Object, Object> cache, String svcName)
- throws IgniteCheckedException {
- GridServiceAssignments val = (GridServiceAssignments)cache.get(new GridServiceAssignmentsKey(svcName));
-
- return val != null ? val.assigns() : null;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<ServiceDescriptor> serviceDescriptors() {
- Collection<ServiceDescriptor> descs = new ArrayList<>();
-
- Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE);
-
- while (it.hasNext()) {
- Cache.Entry<Object, Object> e = it.next();
-
- GridServiceDeployment dep = (GridServiceDeployment)e.getValue();
-
- ServiceDescriptorImpl desc = new ServiceDescriptorImpl(dep);
-
- try {
- GridServiceAssignments assigns = (GridServiceAssignments)serviceCache().getForcePrimary(
- new GridServiceAssignmentsKey(dep.configuration().getName()));
-
- if (assigns != null) {
- desc.topologySnapshot(assigns.assigns());
-
- descs.add(desc);
- }
- }
- catch (IgniteCheckedException ex) {
- log.error("Failed to get assignments from replicated cache for service: " +
- dep.configuration().getName(), ex);
- }
- }
-
- return descs;
- }
-
- /** {@inheritDoc} */
- @Override public <T> T service(String name) {
- ctx.security().authorize(name, SecurityPermission.SERVICE_INVOKE);
-
- Collection<ServiceContextImpl> ctxs;
-
- synchronized (locSvcs) {
- ctxs = locSvcs.get(name);
- }
-
- if (ctxs == null)
- return null;
-
- synchronized (ctxs) {
- if (ctxs.isEmpty())
- return null;
-
- for (ServiceContextImpl ctx : ctxs) {
- Service svc = ctx.service();
-
- if (svc != null)
- return (T)svc;
- }
-
- return null;
- }
- }
-
- /** {@inheritDoc} */
- @Override public ServiceContextImpl serviceContext(String name) {
- Collection<ServiceContextImpl> ctxs;
-
- synchronized (locSvcs) {
- ctxs = locSvcs.get(name);
- }
-
- if (ctxs == null)
- return null;
-
- synchronized (ctxs) {
- if (ctxs.isEmpty())
- return null;
-
- for (ServiceContextImpl ctx : ctxs) {
- if (ctx.service() != null)
- return ctx;
- }
-
- return null;
- }
- }
-
- /** {@inheritDoc} */
- @Override public <T> T serviceProxy(
- ClusterGroup prj,
- String name,
- Class<? super T> srvcCls,
- boolean sticky,
- @Nullable Supplier<ServiceCallContext> callCtxProvider,
- long timeout
- ) throws IgniteException {
- ctx.security().authorize(name, SecurityPermission.SERVICE_INVOKE);
-
- if (hasLocalNode(prj)) {
- ServiceContextImpl ctx = serviceContext(name);
-
- if (ctx != null) {
- Service svc = ctx.service();
-
- if (svc != null) {
- if (srvcCls.isAssignableFrom(svc.getClass()))
- return (T)svc;
- else if (!PlatformService.class.isAssignableFrom(svc.getClass())) {
- throw new IgniteException("Service does not implement specified interface [svcItf=" +
- srvcCls.getName() + ", svcCls=" + svc.getClass().getName() + ']');
- }
- }
- }
- }
-
- return new GridServiceProxy<T>(prj, name, srvcCls, sticky, timeout, ctx, callCtxProvider).proxy();
- }
-
- /**
- * @param prj Grid nodes projection.
- * @return Whether given projection contains any local node.
- */
- private boolean hasLocalNode(ClusterGroup prj) {
- for (ClusterNode n : prj.nodes()) {
- if (n.isLocal())
- return true;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public <T> Collection<T> services(String name) {
- ctx.security().authorize(name, SecurityPermission.SERVICE_INVOKE);
-
- Collection<ServiceContextImpl> ctxs;
-
- synchronized (locSvcs) {
- ctxs = locSvcs.get(name);
- }
-
- if (ctxs == null)
- return null;
-
- synchronized (ctxs) {
- Collection<T> res = new ArrayList<>(ctxs.size());
-
- for (ServiceContextImpl ctx : ctxs) {
- Service svc = ctx.service();
-
- if (svc != null)
- res.add((T)svc);
- }
-
- return res;
- }
- }
-
- /**
- * Reassigns service to nodes.
- *
- * @param dep Service deployment.
- * @param topVer Topology version.
- * @throws IgniteCheckedException If failed.
- */
- private void reassign(GridServiceDeployment dep, AffinityTopologyVersion topVer) throws IgniteCheckedException {
- IgniteInternalCache<Object, Object> cache = serviceCache();
-
- ServiceConfiguration cfg = dep.configuration();
- Object nodeFilter = cfg.getNodeFilter();
-
- if (nodeFilter != null)
- ctx.resource().injectGeneric(nodeFilter);
-
- int totalCnt = cfg.getTotalCount();
- int maxPerNodeCnt = cfg.getMaxPerNodeCount();
- String cacheName = cfg.getCacheName();
- Object affKey = cfg.getAffinityKey();
-
- while (true) {
- GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer.topologyVersion());
-
- Collection<ClusterNode> nodes;
-
- // Call node filter outside of transaction.
- if (affKey == null) {
- nodes = ctx.discovery().nodes(topVer);
-
- if (assigns.nodeFilter() != null) {
- Collection<ClusterNode> nodes0 = new ArrayList<>();
-
- for (ClusterNode node : nodes) {
- if (assigns.nodeFilter().apply(node))
- nodes0.add(node);
- }
-
- nodes = nodes0;
- }
- }
- else
- nodes = null;
-
- try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(cfg.getName());
-
- GridServiceAssignments oldAssigns = (GridServiceAssignments)cache.get(key);
-
- Map<UUID, Integer> cnts = new HashMap<>();
-
- if (affKey != null) {
- ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, topVer);
-
- if (n != null) {
- int cnt = maxPerNodeCnt == 0 ? totalCnt == 0 ? 1 : totalCnt : maxPerNodeCnt;
-
- cnts.put(n.id(), cnt);
- }
- }
- else {
- if (!nodes.isEmpty()) {
- int size = nodes.size();
-
- int perNodeCnt = totalCnt != 0 ? totalCnt / size : maxPerNodeCnt;
- int remainder = totalCnt != 0 ? totalCnt % size : 0;
-
- if (perNodeCnt >= maxPerNodeCnt && maxPerNodeCnt != 0) {
- perNodeCnt = maxPerNodeCnt;
- remainder = 0;
- }
-
- for (ClusterNode n : nodes)
- cnts.put(n.id(), perNodeCnt);
-
- assert perNodeCnt >= 0;
- assert remainder >= 0;
-
- if (remainder > 0) {
- int cnt = perNodeCnt + 1;
-
- if (oldAssigns != null) {
- Collection<UUID> used = new HashSet<>();
-
- // Avoid redundant moving of services.
- for (Map.Entry<UUID, Integer> e : oldAssigns.assigns().entrySet()) {
- // Do not assign services to left nodes.
- if (ctx.discovery().node(e.getKey()) == null)
- continue;
-
- // If old count and new count match, then reuse the assignment.
- if (e.getValue() == cnt) {
- cnts.put(e.getKey(), cnt);
-
- used.add(e.getKey());
-
- if (--remainder == 0)
- break;
- }
- }
-
- if (remainder > 0) {
- List<Map.Entry<UUID, Integer>> entries = new ArrayList<>(cnts.entrySet());
-
- // Randomize.
- Collections.shuffle(entries);
-
- for (Map.Entry<UUID, Integer> e : entries) {
- // Assign only the ones that have not been reused from previous assignments.
- if (!used.contains(e.getKey())) {
- if (e.getValue() < maxPerNodeCnt || maxPerNodeCnt == 0) {
- e.setValue(e.getValue() + 1);
-
- if (--remainder == 0)
- break;
- }
- }
- }
- }
- }
- else {
- List<Map.Entry<UUID, Integer>> entries = new ArrayList<>(cnts.entrySet());
-
- // Randomize.
- Collections.shuffle(entries);
-
- for (Map.Entry<UUID, Integer> e : entries) {
- e.setValue(e.getValue() + 1);
-
- if (--remainder == 0)
- break;
- }
- }
- }
- }
- }
-
- assigns.assigns(cnts);
-
- cache.put(key, assigns);
-
- tx.commit();
-
- break;
- }
- catch (ClusterTopologyCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Topology changed while reassigning (will retry): " + e.getMessage());
-
- U.sleep(10);
- }
- }
- }
-
- /**
- * Redeploys local services based on assignments.
- *
- * @param assigns Assignments.
- */
- private void redeploy(GridServiceAssignments assigns) {
- if (assigns.topologyVersion() < ctx.discovery().topologyVersion()) {
- if (log.isDebugEnabled())
- log.debug("Skip outdated assignment [assigns=" + assigns +
- ", topVer=" + ctx.discovery().topologyVersion() + ']');
-
- return;
- }
-
- String svcName = assigns.name();
-
- Integer assignCnt = assigns.assigns().get(ctx.localNodeId());
-
- if (assignCnt == null)
- assignCnt = 0;
-
- Collection<ServiceContextImpl> ctxs;
-
- synchronized (locSvcs) {
- ctxs = locSvcs.get(svcName);
-
- if (ctxs == null)
- locSvcs.put(svcName, ctxs = new ArrayList<>());
- }
-
- Collection<ServiceContextImpl> toInit = new ArrayList<>();
-
- synchronized (ctxs) {
- if (ctxs.size() > assignCnt) {
- int cancelCnt = ctxs.size() - assignCnt;
-
- cancel(ctxs, cancelCnt);
- }
- else if (ctxs.size() < assignCnt) {
- int createCnt = assignCnt - ctxs.size();
-
- for (int i = 0; i < createCnt; i++) {
- ServiceContextImpl svcCtx = new ServiceContextImpl(assigns.name(),
- UUID.randomUUID(),
- assigns.cacheName(),
- assigns.affinityKey(),
- Executors.newSingleThreadExecutor(threadFactory));
-
- ctxs.add(svcCtx);
-
- toInit.add(svcCtx);
- }
- }
- }
-
- for (final ServiceContextImpl svcCtx : toInit) {
- final Service svc;
-
- try {
- svc = copyAndInject(assigns.configuration(), svcCtx);
-
- // Initialize service.
- svc.init(svcCtx);
-
- svcCtx.service(svc);
- }
- catch (Throwable e) {
- U.error(log, "Failed to initialize service (service will not be deployed): " + assigns.name(), e);
-
- synchronized (ctxs) {
- ctxs.removeAll(toInit);
- }
-
- if (e instanceof Error)
- throw (Error)e;
-
- if (e instanceof RuntimeException)
- throw (RuntimeException)e;
-
- return;
- }
-
- if (log.isInfoEnabled())
- log.info("Starting service instance [name=" + svcCtx.name() + ", execId=" +
- svcCtx.executionId() + ']');
-
- // Start service in its own thread.
- final ExecutorService exe = svcCtx.executor();
-
- exe.execute(new Runnable() {
- @Override public void run() {
- try {
- svc.execute(svcCtx);
- }
- catch (InterruptedException | IgniteInterruptedCheckedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Service thread was interrupted [name=" + svcCtx.name() + ", execId=" +
- svcCtx.executionId() + ']');
- }
- catch (IgniteException e) {
- if (e.hasCause(InterruptedException.class) ||
- e.hasCause(IgniteInterruptedCheckedException.class)) {
- if (log.isDebugEnabled())
- log.debug("Service thread was interrupted [name=" + svcCtx.name() +
- ", execId=" + svcCtx.executionId() + ']');
- }
- else {
- U.error(log, "Service execution stopped with error [name=" + svcCtx.name() +
- ", execId=" + svcCtx.executionId() + ']', e);
- }
- }
- catch (Throwable e) {
- log.error("Service execution stopped with error [name=" + svcCtx.name() +
- ", execId=" + svcCtx.executionId() + ']', e);
-
- if (e instanceof Error)
- throw (Error)e;
- }
- finally {
- // Suicide.
- exe.shutdownNow();
- }
- }
- });
- }
- }
-
- /**
- * @param cfg Service configuration.
- * @param svcCtx Service context to be injected into the service.
- * @return Copy of service.
- * @throws IgniteCheckedException If failed.
- */
- private Service copyAndInject(ServiceConfiguration cfg, ServiceContext svcCtx) throws IgniteCheckedException {
- Marshaller m = ctx.config().getMarshaller();
-
- if (cfg instanceof LazyServiceConfiguration) {
- byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes();
-
- Service srvc = U.unmarshal(m, bytes, U.resolveClassLoader(null, ctx.config()));
-
- ctx.resource().inject(srvc, svcCtx);
-
- return srvc;
- }
- else {
- Service svc = cfg.getService();
-
- try {
- byte[] bytes = U.marshal(m, svc);
-
- Service cp = U.unmarshal(m, bytes, U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config()));
-
- ctx.resource().inject(cp, svcCtx);
-
- return cp;
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to copy service (will reuse same instance): " + svc.getClass(), e);
-
- return svc;
- }
- }
- }
-
- /**
- * @param ctxs Contexts to cancel.
- * @param cancelCnt Number of contexts to cancel.
- */
- private void cancel(Iterable<ServiceContextImpl> ctxs, int cancelCnt) {
- for (Iterator<ServiceContextImpl> it = ctxs.iterator(); it.hasNext(); ) {
- ServiceContextImpl svcCtx = it.next();
-
- // Flip cancelled flag.
- svcCtx.setCancelled(true);
-
- // Notify service about cancellation.
- Service svc = svcCtx.service();
-
- if (svc != null) {
- try {
- svc.cancel(svcCtx);
- }
- catch (Throwable e) {
- log.error("Failed to cancel service (ignoring) [name=" + svcCtx.name() +
- ", execId=" + svcCtx.executionId() + ']', e);
-
- if (e instanceof Error)
- throw e;
- }
- finally {
- try {
- ctx.resource().cleanup(svc);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to clean up service (will ignore): " + svcCtx.name(), e);
- }
- }
- }
-
- // Close out executor thread for the service.
- // This will cause the thread to be interrupted.
- svcCtx.executor().shutdownNow();
-
- it.remove();
-
- if (log.isInfoEnabled())
- log.info("Cancelled service instance [name=" + svcCtx.name() + ", execId=" +
- svcCtx.executionId() + ']');
-
- if (--cancelCnt == 0)
- break;
- }
- }
-
- /**
- * @param p Entry predicate used to execute query from client node.
- * @return Service deployment entries.
- */
- @SuppressWarnings("unchecked")
- private Iterator<Cache.Entry<Object, Object>> serviceEntries(IgniteBiPredicate<Object, Object> p) {
- try {
- IgniteInternalCache<Object, Object> cache = serviceCache();
-
- GridCacheQueryManager qryMgr = cache.context().queries();
-
- CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, null, false, null);
-
- DiscoveryDataClusterState clusterState = ctx.state().clusterState();
-
- if ((clusterState.hasBaselineTopology()
- && !CU.baselineNode(ctx.cluster().get().localNode(), clusterState))
- || !cache.context().affinityNode()) {
- ClusterNode oldestSrvNode =
- ctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
-
- if (oldestSrvNode == null)
- return new GridEmptyIterator<>();
-
- qry.projection(ctx.cluster().get().forNode(oldestSrvNode));
- }
- else
- qry.projection(ctx.cluster().get().forLocal());
-
- GridCloseableIterator<Map.Entry<Object, Object>> iter = qry.executeScanQuery();
-
- return cache.context().itHolder().iterator(iter,
- new CacheIteratorConverter<Cache.Entry<Object, Object>, Map.Entry<Object, Object>>() {
- @Override protected Cache.Entry<Object, Object> convert(Map.Entry<Object, Object> e) {
- // Actually Scan Query returns Iterator<CacheQueryEntry> by default,
- // CacheQueryEntry implements both Map.Entry and Cache.Entry interfaces.
- return (Cache.Entry<Object, Object>)e;
- }
-
- @Override protected void remove(Cache.Entry<Object, Object> item) {
- throw new UnsupportedOperationException();
- }
- });
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
-
- /**
- * Called right after utility cache is started and ready for the usage.
- */
- public void onUtilityCacheStarted() {
- synchronized (pendingJobCtxs) {
- if (pendingJobCtxs.isEmpty())
- return;
-
- Iterator<ComputeJobContext> iter = pendingJobCtxs.iterator();
-
- while (iter.hasNext()) {
- iter.next().callcc();
- iter.remove();
- }
- }
- }
-
- /**
- * Service deployment listener.
- */
- private class ServiceEntriesListener implements CacheEntryUpdatedListener<Object, Object> {
- /** {@inheritDoc} */
- @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> deps) {
- GridSpinBusyLock busyLock = GridServiceProcessor.this.busyLock;
-
- if (busyLock == null || !busyLock.enterBusy())
- return;
-
- try {
- depExe.execute(new DepRunnable() {
- @Override public void run0() {
- onSystemCacheUpdated(deps);
- }
- });
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- }
-
- /**
- * @param evts Update events.
- */
- private void onSystemCacheUpdated(final Iterable<CacheEntryEvent<?, ?>> evts) {
- for (CacheEntryEvent<?, ?> e : evts) {
- if (e.getKey() instanceof GridServiceDeploymentKey)
- processDeployment((CacheEntryEvent)e);
- else if (e.getKey() instanceof GridServiceAssignmentsKey)
- processAssignment((CacheEntryEvent)e);
- }
- }
-
- /**
- * @param e Entry.
- */
- private void processDeployment(CacheEntryEvent<GridServiceDeploymentKey, GridServiceDeployment> e) {
- GridServiceDeployment dep;
-
- try {
- dep = e.getValue();
- }
- catch (IgniteException ex) {
- if (X.hasCause(ex, ClassNotFoundException.class))
- return;
- else
- throw ex;
- }
-
- if (e.getEventType() != REMOVED) {
- svcName.set(dep.configuration().getName());
-
- // Ignore other utility cache events.
- AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
-
- ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
-
- // Process deployment on coordinator only.
- if (oldest.isLocal())
- onDeployment(dep, topVer);
- }
- // Handle undeployment.
- else {
- String name = e.getKey().name();
-
- undeploy(name);
-
- // Finish deployment futures if undeployment happened.
- GridFutureAdapter<?> fut = depFuts.remove(name);
-
- if (fut != null)
- fut.onDone();
-
- // Complete undeployment future.
- fut = undepFuts.remove(name);
-
- if (fut != null)
- fut.onDone();
-
- GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
-
- // Remove assignment on primary node in case of undeploy.
- IgniteInternalCache<Object, Object> cache = serviceCache();
-
- if (cache.cache().affinity().isPrimary(ctx.discovery().localNode(), key)) {
- try {
- cache.getAndRemove(key);
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to remove assignments for undeployed service: " + name, ex);
- }
- }
- }
- }
-
- /**
- * Deployment callback.
- *
- * @param dep Service deployment.
- * @param topVer Topology version.
- */
- private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
- // Retry forever.
- try {
- AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
-
- // If topology version changed, reassignment will happen from topology event.
- if (newTopVer.equals(topVer))
- reassign(dep, topVer);
- }
- catch (IgniteCheckedException e) {
- if (!(e instanceof ClusterTopologyCheckedException))
- log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
-
- AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
-
- if (!newTopVer.equals(topVer)) {
- assert newTopVer.compareTo(topVer) > 0;
-
- // Reassignment will happen from topology event.
- return;
- }
-
- ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
- private IgniteUuid id = IgniteUuid.randomUuid();
-
- private long start = System.currentTimeMillis();
-
- @Override public IgniteUuid timeoutId() {
- return id;
- }
-
- @Override public long endTime() {
- return start + RETRY_TIMEOUT;
- }
-
- @Override public void onTimeout() {
- depExe.execute(new DepRunnable() {
- @Override public void run0() {
- onDeployment(dep, topVer);
- }
- });
- }
- });
- }
- }
-
- /**
- * Topology listener.
- */
- private class TopologyListener implements DiscoveryEventListener {
- /** */
- private volatile AffinityTopologyVersion currTopVer = null;
-
- /**
- * Check that listening-in topology version is the latest and wait until exchange is finished.
- *
- * @param initTopVer listening-in topology version.
- * @return {@code True} if current event is not last and should be skipped.
- */
- private boolean skipExchange(final AffinityTopologyVersion initTopVer) {
- AffinityTopologyVersion pendingTopVer = null;
- AffinityTopologyVersion newTopVer;
-
- if (!initTopVer.equals(newTopVer = currTopVer))
- pendingTopVer = newTopVer;
- else {
- IgniteInternalFuture<?> affReadyFut = ctx.cache().context().exchange().affinityReadyFuture(initTopVer);
-
- if (affReadyFut != null) {
- try {
- affReadyFut.get();
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to wait for affinity ready future " +
- "(the assignment will be recalculated anyway):" + e.toString());
- }
- }
-
- // If exchange already moved forward - skip current version.
- if (!initTopVer.equals(newTopVer = currTopVer))
- pendingTopVer = newTopVer;
- }
-
- boolean skipExchange = pendingTopVer != null;
-
- if (skipExchange && log.isInfoEnabled()) {
- log.info("Service processor detected a topology change during " +
- "assignments calculation (will abort current iteration and " +
- "re-calculate on the newer version): " +
- "[topVer=" + initTopVer + ", newTopVer=" + pendingTopVer + ']');
- }
-
- return skipExchange;
- }
-
- /** {@inheritDoc} */
- @Override public void onEvent(final DiscoveryEvent evt, final DiscoCache discoCache) {
- GridSpinBusyLock busyLock = GridServiceProcessor.this.busyLock;
-
- if (busyLock == null || !busyLock.enterBusy())
- return;
-
- try {
- final AffinityTopologyVersion topVer;
-
- if (evt instanceof DiscoveryCustomEvent) {
- DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
-
- if (msg instanceof CacheAffinityChangeMessage) {
- if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
- return;
- }
- else if (msg instanceof DynamicCacheChangeBatch) {
- if (!((DynamicCacheChangeBatch)msg).exchangeNeeded())
- return;
- }
- else
- return;
-
- if (msg instanceof MetadataUpdateProposedMessage || msg instanceof MetadataUpdateAcceptedMessage)
- return;
-
- topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
- }
- else
- topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
-
- currTopVer = topVer;
-
- depExe.execute(new DepRunnable() {
- @Override public void run0() {
- // In case the cache instance isn't tracked by DiscoveryManager anymore.
- discoCache.updateAlives(ctx.discovery());
-
- ClusterNode oldest = discoCache.oldestAliveServerNode();
-
- if (oldest != null && oldest.isLocal()) {
- final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();
-
- if (ctx.deploy().enabled())
- ctx.cache().context().deploy().ignoreOwnership(true);
-
- try {
- Iterator<Cache.Entry<Object, Object>> it = serviceEntries(
- ServiceDeploymentPredicate.INSTANCE);
-
- while (it.hasNext()) {
- // If topology changed again, let next event handle it.
- if (skipExchange(topVer))
- return;
-
- Cache.Entry<Object, Object> e = it.next();
-
- GridServiceDeployment dep = (GridServiceDeployment)e.getValue();
-
- try {
- svcName.set(dep.configuration().getName());
-
- reassign(dep, topVer);
- }
- catch (IgniteCheckedException ex) {
- if (!(ex instanceof ClusterTopologyCheckedException))
- LT.error(log, ex, "Failed to do service reassignment (will retry): " +
- dep.configuration().getName());
-
- retries.add(dep);
- }
- }
- }
- finally {
- if (ctx.deploy().enabled())
- ctx.cache().context().deploy().ignoreOwnership(false);
- }
-
- if (!retries.isEmpty())
- onReassignmentFailed(topVer, retries);
- }
-
- Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceAssignmentsPredicate.INSTANCE);
-
- // Clean up zombie assignments.
- IgniteInternalCache<Object, Object> cache = serviceCache();
-
- while (it.hasNext()) {
- // If topology changed again, let next event handle it.
- if (skipExchange(topVer))
- return;
-
- Cache.Entry<Object, Object> e = it.next();
-
- if (cache.context().affinity().primaryByKey(ctx.grid().localNode(), e.getKey(), topVer)) {
- String name = ((GridServiceAssignmentsKey)e.getKey()).name();
-
- try {
- if (cache.get(new GridServiceDeploymentKey(name)) == null) {
- if (log.isDebugEnabled())
- log.debug("Removed zombie assignments: " + e.getValue());
-
- cache.getAndRemove(e.getKey());
- }
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to clean up zombie assignments for service: " + name, ex);
- }
- }
- }
- }
- });
- }
- finally {
- busyLock.leaveBusy();
- }
- }
-
- /**
- * Handler for reassignment failures.
- *
- * @param topVer Topology version.
- * @param retries Retries.
- */
- private void onReassignmentFailed(final AffinityTopologyVersion topVer,
- final Collection<GridServiceDeployment> retries) {
- GridSpinBusyLock busyLock = GridServiceProcessor.this.busyLock;
-
- if (busyLock == null || !busyLock.enterBusy())
- return;
-
- try {
- // If topology changed again, let next event handle it.
- if (ctx.discovery().topologyVersionEx().equals(topVer))
- return;
-
- for (Iterator<GridServiceDeployment> it = retries.iterator(); it.hasNext(); ) {
- GridServiceDeployment dep = it.next();
-
- try {
- svcName.set(dep.configuration().getName());
-
- reassign(dep, topVer);
-
- it.remove();
- }
- catch (IgniteCheckedException e) {
- if (!(e instanceof ClusterTopologyCheckedException))
- LT.error(log, e, "Failed to do service reassignment (will retry): " +
- dep.configuration().getName());
- }
- }
-
- if (!retries.isEmpty()) {
- ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
- private IgniteUuid id = IgniteUuid.randomUuid();
-
- private long start = System.currentTimeMillis();
-
- @Override public IgniteUuid timeoutId() {
- return id;
- }
-
- @Override public long endTime() {
- return start + RETRY_TIMEOUT;
- }
-
- @Override public void onTimeout() {
- depExe.execute(new Runnable() {
- @Override public void run() {
- onReassignmentFailed(topVer, retries);
- }
- });
- }
- });
- }
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- }
-
- /**
- * @param e Entry.
- */
- private void processAssignment(CacheEntryEvent<GridServiceAssignmentsKey, GridServiceAssignments> e) {
- GridServiceAssignments assigns;
-
- try {
- assigns = e.getValue();
- }
- catch (IgniteException ex) {
- if (X.hasCause(ex, ClassNotFoundException.class))
- return;
- else
- throw ex;
- }
-
- if (e.getEventType() != REMOVED) {
- svcName.set(assigns.name());
-
- Throwable t = null;
-
- try {
- redeploy(assigns);
- }
- catch (Error | RuntimeException th) {
- t = th;
- }
-
- GridServiceDeploymentFuture<String> fut = depFuts.get(assigns.name());
-
- if (fut != null && fut.configuration().equalsIgnoreNodeFilter(assigns.configuration())) {
- depFuts.remove(assigns.name(), fut);
-
- // Complete deployment futures once the assignments have been stored in cache.
- fut.onDone(null, t);
- }
- }
- // Handle undeployment.
- else
- undeploy(e.getKey().name());
- }
-
- /**
- * @param name Name.
- */
- private void undeploy(String name) {
- svcName.set(name);
-
- Collection<ServiceContextImpl> ctxs;
-
- synchronized (locSvcs) {
- ctxs = locSvcs.remove(name);
- }
-
- if (ctxs != null) {
- synchronized (ctxs) {
- cancel(ctxs, ctxs.size());
- }
- }
- }
-
- /**
- *
- */
- private static class CancelResult {
- /** */
- IgniteInternalFuture<?> fut;
-
- /** */
- boolean rollback;
-
- /**
- * @param fut Future.
- * @param rollback {@code True} if service was cancelled during current call.
- */
- CancelResult(IgniteInternalFuture<?> fut, boolean rollback) {
- this.fut = fut;
- this.rollback = rollback;
- }
- }
-
- /**
- *
- */
- private abstract class DepRunnable implements Runnable {
- /** {@inheritDoc} */
- @Override public void run() {
- GridSpinBusyLock busyLock = GridServiceProcessor.this.busyLock;
-
- if (busyLock == null || !busyLock.enterBusy())
- return;
-
- // Won't block ServiceProcessor stopping process.
- busyLock.leaveBusy();
-
- svcName.set(null);
-
- try {
- run0();
- }
- catch (Throwable t) {
- log.error("Error when executing service: " + svcName.get(), t);
-
- if (t instanceof Error)
- throw t;
- }
- finally {
- svcName.set(null);
- }
- }
-
- /**
- * Abstract run method protected by busy lock.
- */
- public abstract void run0();
- }
-
- /**
- *
- */
- static class ServiceDeploymentPredicate implements IgniteBiPredicate<Object, Object> {
- /** */
- static final ServiceDeploymentPredicate INSTANCE = new ServiceDeploymentPredicate();
-
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public boolean apply(Object key, Object val) {
- return key instanceof GridServiceDeploymentKey;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ServiceDeploymentPredicate.class, this);
- }
- }
-
- /**
- *
- */
- static class ServiceAssignmentsPredicate implements IgniteBiPredicate<Object, Object> {
- /** */
- static final ServiceAssignmentsPredicate INSTANCE = new ServiceAssignmentsPredicate();
-
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public boolean apply(Object key, Object val) {
- return key instanceof GridServiceAssignmentsKey;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ServiceAssignmentsPredicate.class, this);
- }
- }
-
- /**
- */
- @GridInternal
- private static class ServiceTopologyCallable implements IgniteCallable<Map<UUID, Integer>> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final String svcName;
-
- /** */
- private transient boolean waitedCacheInit;
-
- /** */
- @IgniteInstanceResource
- private IgniteEx ignite;
-
- /** */
- @JobContextResource
- private transient ComputeJobContext jCtx;
-
- /** */
- @LoggerResource
- private transient IgniteLogger log;
-
- /**
- * @param svcName Service name.
- */
- public ServiceTopologyCallable(String svcName) {
- this.svcName = svcName;
- }
-
- /** {@inheritDoc} */
- @Override public Map<UUID, Integer> call() throws Exception {
- IgniteInternalCache<Object, Object> cache = ignite.context().cache().utilityCache();
-
- if (cache == null) {
- List<ComputeJobContext> pendingCtxs = ((GridServiceProcessor)ignite.context().service()).pendingJobCtxs;
-
- synchronized (pendingCtxs) {
- // Double check cache reference after lock acqusition.
- cache = ignite.context().cache().utilityCache();
-
- if (cache == null) {
- if (!waitedCacheInit) {
- log.debug("Utility cache hasn't been initialized yet. Waiting.");
-
- // waiting for a minute for cache initialization.
- jCtx.holdcc(60 * 1000);
-
- pendingCtxs.add(jCtx);
-
- waitedCacheInit = true;
-
- return null;
- }
- else {
- log.error("Failed to gather service topology. Utility " +
- "cache initialization is stuck.");
-
- throw new IgniteCheckedException("Failed to gather service topology. Utility " +
- "cache initialization is stuck.");
- }
- }
- }
- }
-
- return serviceTopology(cache, svcName);
- }
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index fba2f41..9c38aef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -229,7 +229,7 @@ public class GridServiceProxy<T> implements Serializable {
// Check if ignorable exceptions are in the cause chain.
Throwable ignorableCause = X.cause(e, ClusterTopologyCheckedException.class);
- if (ignorableCause == null && ctx.service() instanceof GridServiceProcessor)
+ if (ignorableCause == null)
ignorableCause = X.cause(e, GridServiceNotFoundException.class);
if (ignorableCause != null) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
index cb9cf6a..5e38e9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.discovery.CustomEventListener;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.systemview.walker.ServiceViewWalker;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
@@ -115,7 +116,7 @@ import static org.apache.ignite.plugin.security.SecurityPermission.SERVICE_DEPLO
*/
@SkipDaemon
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-public class IgniteServiceProcessor extends ServiceProcessorAdapter implements IgniteChangeGlobalStateSupport {
+public class IgniteServiceProcessor extends GridProcessorAdapter implements IgniteChangeGlobalStateSupport {
/** */
public static final String SVCS_VIEW = "services";
@@ -532,18 +533,33 @@ public class IgniteServiceProcessor extends ServiceProcessorAdapter implements I
throw new IgniteException("Service configuration check failed (" + desc + ")");
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> deployNodeSingleton(ClusterGroup prj, String name, Service srvc) {
+ /**
+ * @param prj Grid projection.
+ * @param name Service name.
+ * @param srvc Service.
+ * @return Future.
+ */
+ public IgniteInternalFuture<?> deployNodeSingleton(ClusterGroup prj, String name, Service srvc) {
return deployMultiple(prj, name, srvc, 0, 1);
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> deployClusterSingleton(ClusterGroup prj, String name, Service srvc) {
+ /**
+ * @param name Service name.
+ * @param srvc Service instance.
+ * @return Future.
+ */
+ public IgniteInternalFuture<?> deployClusterSingleton(ClusterGroup prj, String name, Service srvc) {
return deployMultiple(prj, name, srvc, 1, 1);
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> deployMultiple(ClusterGroup prj, String name, Service srvc, int totalCnt,
+ /**
+ * @param name Service name.
+ * @param srvc Service.
+ * @param totalCnt Total count.
+ * @param maxPerNodeCnt Max per-node count.
+ * @return Future.
+ */
+ public IgniteInternalFuture<?> deployMultiple(ClusterGroup prj, String name, Service srvc, int totalCnt,
int maxPerNodeCnt) {
ServiceConfiguration cfg = new ServiceConfiguration();
@@ -555,8 +571,14 @@ public class IgniteServiceProcessor extends ServiceProcessorAdapter implements I
return deployAll(prj, Collections.singleton(cfg));
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service srvc, String cacheName,
+ /**
+ * @param name Service name.
+ * @param srvc Service.
+ * @param cacheName Cache name.
+ * @param affKey Affinity key.
+ * @return Future.
+ */
+ public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service srvc, String cacheName,
Object affKey) {
A.notNull(affKey, "affKey");
@@ -654,8 +676,12 @@ public class IgniteServiceProcessor extends ServiceProcessorAdapter implements I
}
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> deployAll(ClusterGroup prj, Collection<ServiceConfiguration> cfgs) {
+ /**
+ * @param prj Grid projection.
+ * @param cfgs Service configurations.
+ * @return Future for deployment.
+ */
+ public IgniteInternalFuture<?> deployAll(ClusterGroup prj, Collection<ServiceConfiguration> cfgs) {
if (prj == null)
// Deploy to servers by default if no projection specified.
return deployAll(cfgs, ctx.cluster().get().forServers().predicate());
@@ -746,19 +772,27 @@ public class IgniteServiceProcessor extends ServiceProcessorAdapter implements I
}
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> cancel(String name) {
+ /**
+ * @param name Service name.
+ * @return Future.
+ */
+ public IgniteInternalFuture<?> cancel(String name) {
return cancelAll(Collections.singleton(name));
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> cancelAll() {
+ /**
+ * @return Future.
+ */
+ public IgniteInternalFuture<?> cancelAll() {
return cancelAll(deployedServices.values().stream().map(ServiceInfo::name).collect(Collectors.toSet()));
}
- /** {@inheritDoc} */
+ /**
+ * @param servicesNames Name of services to deploy.
+ * @return Future.
+ */
@SuppressWarnings("unchecked")
- @Override public IgniteInternalFuture<?> cancelAll(@NotNull Collection<String> servicesNames) {
+ public IgniteInternalFuture<?> cancelAll(@NotNull Collection<String> servicesNames) {
opsLock.readLock().lock();
try {
@@ -843,8 +877,13 @@ public class IgniteServiceProcessor extends ServiceProcessorAdapter implements I
}
}
- /** {@inheritDoc} */
- @Override public Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException {
+ /**
+ * @param name Service name.
+ * @param timeout If greater than 0 limits task execution time. Cannot be negative.
+ * @return Service topology.
+ * @throws IgniteCheckedException On error.
+ */
+ public Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException {
assert timeout >= 0;
long startTime = U.currentTimeMillis();
@@ -880,13 +919,19 @@ public class IgniteServiceProcessor extends ServiceProcessorAdapter implements I
}
}
- /** {@inheritDoc} */
- @Override public Collection<ServiceDescriptor> serviceDescriptors() {
+ /**
+ * @return Collection of service descriptors.
+ */
+ public Collection<ServiceDescriptor> serviceDescriptors() {
return new ArrayList<>(registeredServices.values());
}
- /** {@inheritDoc} */
- @Override public <T> T service(String name) {
+ /**
+ * @param name Service name.
+ * @param <T> Service type.
+ * @return Service by specified service name.
+ */
+ public <T> T service(String name) {
if (!enterBusy())
return null;
@@ -917,8 +962,11 @@ public class IgniteServiceProcessor extends ServiceProcessorAdapter implements I
}
}
- /** {@inheritDoc} */
- @Override public ServiceContextImpl serviceContext(String name) {
+ /**
+ * @param name Service name.
+ * @return Service by specified service name.
+ */
+ public ServiceContextImpl serviceContext(String name) {
if (!enterBusy())
return null;
@@ -958,8 +1006,18 @@ public class IgniteServiceProcessor extends ServiceProcessorAdapter implements I
return locServices.get(srvcId);
}
- /** {@inheritDoc} */
- @Override public <T> T serviceProxy(
+ /**
+ * @param prj Grid projection.
+ * @param name Service name.
+ * @param srvcCls Service class.
+ * @param sticky Whether multi-node request should be done.
+ * @param callCtxProvider Caller context provider.
+ * @param timeout If greater than 0 limits service acquire time. Cannot be negative.
+ * @param <T> Service interface type.
+ * @return The proxy of a service by its name and class.
+ * @throws IgniteException If failed to create proxy.
+ */
+ public <T> T serviceProxy(
ClusterGroup prj,
String name,
Class<? super T> srvcCls,
@@ -1002,8 +1060,12 @@ public class IgniteServiceProcessor extends ServiceProcessorAdapter implements I
return false;
}
- /** {@inheritDoc} */
- @Override public <T> Collection<T> services(String name) {
+ /**
+ * @param name Service name.
+ * @param <T> Service type.
+ * @return Services by specified service name.
+ */
+ public <T> Collection<T> services(String name) {
if (!enterBusy())
return null;
@@ -1526,8 +1588,15 @@ public class IgniteServiceProcessor extends ServiceProcessorAdapter implements I
F.eq(ctx.discovery().localNode(), coordinator());
}
- /** {@inheritDoc} */
- @Override public void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache) {
+ /**
+ * Callback for local join events for which the regular events are not generated.
+ * <p/>
+ * Local join event is expected in cases of joining to topology or client reconnect.
+ *
+ * @param evt Discovery event.
+ * @param discoCache Discovery cache.
+ */
+ public void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache) {
assert ctx.localNodeId().equals(evt.eventNode().id());
assert evt.type() == EVT_NODE_JOINED;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java
deleted file mode 100644
index 56628ac..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.service;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
-import java.util.function.Supplier;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterGroup;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.managers.discovery.DiscoCache;
-import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import org.apache.ignite.services.Service;
-import org.apache.ignite.services.ServiceCallContext;
-import org.apache.ignite.services.ServiceConfiguration;
-import org.apache.ignite.services.ServiceDescriptor;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Adapter for different service processor implementations.
- */
-public abstract class ServiceProcessorAdapter extends GridProcessorAdapter {
- /**
- * @param ctx Kernal context.
- */
- protected ServiceProcessorAdapter(GridKernalContext ctx) {
- super(ctx);
- }
-
- /**
- * @param prj Grid projection.
- * @param name Service name.
- * @param srvc Service.
- * @return Future.
- */
- public abstract IgniteInternalFuture<?> deployNodeSingleton(ClusterGroup prj, String name, Service srvc);
-
- /**
- * @param name Service name.
- * @param srvc Service instance.
- * @return Future.
- */
- public abstract IgniteInternalFuture<?> deployClusterSingleton(ClusterGroup prj, String name, Service srvc);
-
- /**
- * @param name Service name.
- * @param srvc Service.
- * @param totalCnt Total count.
- * @param maxPerNodeCnt Max per-node count.
- * @return Future.
- */
- public abstract IgniteInternalFuture<?> deployMultiple(ClusterGroup prj, String name, Service srvc, int totalCnt,
- int maxPerNodeCnt);
-
- /**
- * @param name Service name.
- * @param srvc Service.
- * @param cacheName Cache name.
- * @param affKey Affinity key.
- * @return Future.
- */
- public abstract IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service srvc, String cacheName,
- Object affKey);
-
- /**
- * @param prj Grid projection.
- * @param cfgs Service configurations.
- * @return Future for deployment.
- */
- public abstract IgniteInternalFuture<?> deployAll(ClusterGroup prj, Collection<ServiceConfiguration> cfgs);
-
- /**
- * @param name Service name.
- * @return Future.
- */
- public abstract IgniteInternalFuture<?> cancel(String name);
-
- /**
- * @return Future.
- */
- public abstract IgniteInternalFuture<?> cancelAll();
-
- /**
- * @param servicesNames Name of services to deploy.
- * @return Future.
- */
- public abstract IgniteInternalFuture<?> cancelAll(Collection<String> servicesNames);
-
- /**
- * @return Collection of service descriptors.
- */
- public abstract Collection<ServiceDescriptor> serviceDescriptors();
-
- /**
- * @param name Service name.
- * @param <T> Service type.
- * @return Service by specified service name.
- */
- public abstract <T> T service(String name);
-
- /**
- * @param prj Grid projection.
- * @param name Service name.
- * @param srvcCls Service class.
- * @param sticky Whether multi-node request should be done.
- * @param callCtxProvider Caller context provider.
- * @param timeout If greater than 0 limits service acquire time. Cannot be negative.
- * @param <T> Service interface type.
- * @return The proxy of a service by its name and class.
- * @throws IgniteException If failed to create proxy.
- */
- public abstract <T> T serviceProxy(
- ClusterGroup prj,
- String name,
- Class<? super T> srvcCls,
- boolean sticky,
- @Nullable Supplier<ServiceCallContext> callCtxProvider,
- long timeout
- ) throws IgniteException;
-
- /**
- * @param name Service name.
- * @param <T> Service type.
- * @return Services by specified service name.
- */
- public abstract <T> Collection<T> services(String name);
-
- /**
- * @param name Service name.
- * @return Service by specified service name.
- */
- public abstract ServiceContextImpl serviceContext(String name);
-
- /**
- * @param name Service name.
- * @param timeout If greater than 0 limits task execution time. Cannot be negative.
- * @return Service topology.
- * @throws IgniteCheckedException On error.
- */
- public abstract Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException;
-
- /**
- * Callback for local join events for which the regular events are not generated.
- * <p/>
- * Local join event is expected in cases of joining to topology or client reconnect.
- *
- * @param evt Discovery event.
- * @param discoCache Discovery cache.
- */
- public void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache) {
- // No-op.
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f98d7e5..4cdff95 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -164,7 +164,6 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_NODE_IDS_HISTORY_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -177,7 +176,6 @@ import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.IgniteFeatures.TCP_DISCOVERY_MESSAGE_NODE_COMPACT_REPRESENTATION;
import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_COMPACT_FOOTER;
@@ -4684,53 +4682,6 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
- final Boolean locSrvcProcModeAttr = locNode.attribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED);
- // Can be null only in module tests of discovery spi (without node startup).
- final Boolean locSrvcProcMode = locSrvcProcModeAttr != null ? locSrvcProcModeAttr : false;
-
- final Boolean rmtSrvcProcModeAttr = node.attribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED);
- final boolean rmtSrvcProcMode = rmtSrvcProcModeAttr != null ? rmtSrvcProcModeAttr : false;
-
- if (!F.eq(locSrvcProcMode, rmtSrvcProcMode)) {
- utilityPool.execute(
- new Runnable() {
- @Override public void run() {
- String errMsg = "Local node's " + IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED +
- " property value differs from remote node's value " +
- "(to make sure all nodes in topology have identical service processor mode, " +
- "configure system property explicitly) " +
- "[locSrvcProcMode=" + locSrvcProcMode +
- ", rmtSrvcProcMode=" + rmtSrvcProcMode +
- ", locNodeAddrs=" + U.addressesAsString(locNode) +
- ", rmtNodeAddrs=" + U.addressesAsString(node) +
- ", locNodeId=" + locNode.id() + ", rmtNodeId=" + msg.creatorNodeId() + ']';
-
- String sndMsg = "Local node's " + IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED +
- " property value differs from remote node's value " +
- "(to make sure all nodes in topology have identical service processor mode, " +
- "configure system property explicitly) " +
- "[locSrvcProcMode=" + rmtSrvcProcMode +
- ", rmtSrvcProcMode=" + locSrvcProcMode +
- ", locNodeAddrs=" + U.addressesAsString(node) + ", locPort=" + node.discoveryPort() +
- ", rmtNodeAddr=" + U.addressesAsString(locNode) + ", locNodeId=" + node.id() +
- ", rmtNodeId=" + locNode.id() + ']';
-
- nodeCheckError(
- node,
- errMsg,
- sndMsg);
- }
- });
-
- // Ignore join request.
- msg.spanContainer().span()
- .addLog(() -> "Ignored")
- .setStatus(SpanStatus.ABORTED)
- .end();
-
- return;
- }
-
// Handle join.
node.internalOrder(ring.nextNodeOrder());
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 8ed15fc..0b7f8b5 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1681,9 +1681,6 @@ org.apache.ignite.internal.processors.service.GridServiceDeployment
org.apache.ignite.internal.processors.service.GridServiceDeploymentKey
org.apache.ignite.internal.processors.service.GridServiceMethodNotFoundException
org.apache.ignite.internal.processors.service.GridServiceNotFoundException
-org.apache.ignite.internal.processors.service.GridServiceProcessor$ServiceAssignmentsPredicate
-org.apache.ignite.internal.processors.service.GridServiceProcessor$ServiceDeploymentPredicate
-org.apache.ignite.internal.processors.service.GridServiceProcessor$ServiceTopologyCallable
org.apache.ignite.internal.processors.service.GridServiceProxy
org.apache.ignite.internal.processors.service.GridServiceProxy$ServiceProxyCallable
org.apache.ignite.internal.processors.service.GridServiceProxy$ServiceProxyException
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
index 121be91..d5c0099 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
@@ -24,13 +24,11 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteServices;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.service.DummyService;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.testframework.GridTestUtils;
-import org.junit.Assume;
import org.junit.Test;
/**
@@ -129,61 +127,9 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst
/**
* @throws Exception If failed.
*/
- @Test
- public void testReconnectInDeploying() throws Exception {
- Assume.assumeTrue(!isEventDrivenServiceProcessorEnabled());
-
- Ignite client = grid(serverCount());
-
- assertTrue(client.cluster().localNode().isClient());
-
- final IgniteServices services = client.services();
-
- Ignite srv = ignite(0);
-
- BlockTcpCommunicationSpi commSpi = commSpi(srv);
-
- commSpi.blockMessage(GridNearTxPrepareResponse.class);
-
- final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- try {
- services.deployClusterSingleton("testReconnectInDeploying", new TestServiceImpl());
- }
- catch (IgniteClientDisconnectedException e) {
- checkAndWait(e);
-
- return true;
- }
-
- return false;
- }
- });
-
- // Check that client waiting operation.
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- return fut.get(200);
- }
- }, IgniteFutureTimeoutCheckedException.class, null);
-
- assertNotDone(fut);
-
- commSpi.unblockMessage();
-
- reconnectClientNode(client, srv, null);
-
- assertTrue((Boolean)fut.get(2, TimeUnit.SECONDS));
- }
-
- /**
- * @throws Exception If failed.
- */
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Test
public void testReconnectInDeployingNew() throws Exception {
- Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
-
IgniteEx client = grid(serverCount());
assertTrue(client.cluster().localNode().isClient());
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java
index 82257b8..9686efd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAttributesSelfTest.java
@@ -30,7 +30,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SECURITY_COMPATIBILITY_MODE;
import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
@@ -247,17 +246,6 @@ public abstract class GridDiscoveryManagerAttributesSelfTest extends GridCommonA
* @throws Exception If failed.
*/
@Test
- public void testServiceProcessorModeProperty() throws Exception {
- doTestCompatibilityEnabled(IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED, true, false, true);
- doTestCompatibilityEnabled(IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED, false, true, true);
- doTestCompatibilityEnabled(IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED, true, true, false);
- doTestCompatibilityEnabled(IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED, false, false, false);
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
public void testSecurityCompatibilityEnabled() throws Exception {
secEnabled = true;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index e106c9d..1ef5195 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -60,10 +60,8 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.datastructures.GridCacheInternalKeyImpl;
-import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.PA;
@@ -213,12 +211,9 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
}, 3000);
for (int i = 0; i < gridCount(); i++) {
- GridKernalContext ctx = grid(i).context();
- GridContinuousProcessor proc = ctx.continuous();
+ GridContinuousProcessor proc = grid(i).context().continuous();
- final int locInfosCnt = ctx.service() instanceof GridServiceProcessor ? 1 : 0;
-
- assertEquals(String.valueOf(i), locInfosCnt, ((Map)U.field(proc, "locInfos")).size());
+ assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "locInfos")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "startFuts")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "stopFuts")).size());
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentExceptionPropagationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentExceptionPropagationTest.java
index c5484ea..9982667 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentExceptionPropagationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentExceptionPropagationTest.java
@@ -23,19 +23,11 @@ import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.services.ServiceDeploymentException;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Assume;
-import org.junit.Before;
import org.junit.Test;
/** */
public class GridServiceDeploymentExceptionPropagationTest extends GridCommonAbstractTest {
/** */
- @Before
- public void check() {
- Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
- }
-
- /** */
@Test
public void testExceptionPropagation() throws Exception {
try (IgniteEx srv = startGrid("server")) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorBatchDeploySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorBatchDeploySelfTest.java
index d821b8f..28961e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorBatchDeploySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorBatchDeploySelfTest.java
@@ -38,7 +38,6 @@ import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceDeploymentException;
import org.apache.ignite.services.ServiceDescriptor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Assume;
import org.junit.Test;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
@@ -414,9 +413,6 @@ public class GridServiceProcessorBatchDeploySelfTest extends GridCommonAbstractT
public void testCancelAllTopologyChange() throws Exception {
IgniteEx client = grid(CLIENT_NODE_NAME);
- Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-10021",
- client.context().service() instanceof GridServiceProcessor);
-
int numServices = 500;
List<ServiceConfiguration> cfgs = getConfigs(client.cluster().forServers().predicate(), numServices);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
index 8a20df7..4d44ee3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
@@ -39,11 +39,10 @@ import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Assume;
import org.junit.Test;
/**
- * Tests that {@link GridServiceProcessor} completes deploy/undeploy futures during node stop.
+ * Tests that {@link IgniteServiceProcessor} completes deploy/undeploy futures during node stop.
*/
public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@@ -175,8 +174,6 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
*/
@Test
public void disconnectingDuringNodeStoppingIsNotHangTest() throws Exception {
- Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
-
runServiceProcessorStoppingTest(
new IgniteInClosure<IgniteServiceProcessor>() {
@Override public void apply(IgniteServiceProcessor srvcProc) {
@@ -196,8 +193,6 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
*/
@Test
public void stoppingDuringDisconnectingIsNotHangTest() throws Exception {
- Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
-
runServiceProcessorStoppingTest(
new IgniteInClosure<IgniteServiceProcessor>() {
@Override public void apply(IgniteServiceProcessor srvcProc) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProxyTopologyInitializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProxyTopologyInitializationTest.java
index de95e80..7c2bb99 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProxyTopologyInitializationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProxyTopologyInitializationTest.java
@@ -34,8 +34,6 @@ import org.apache.ignite.services.ServiceDeploymentException;
import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils.DiscoveryHook;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Assume;
-import org.junit.BeforeClass;
import org.junit.Test;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -98,14 +96,6 @@ public class GridServiceProxyTopologyInitializationTest extends GridCommonAbstra
return cfg;
}
- /**
- * Ignores the test in case the legacy service processor is used.
- */
- @BeforeClass
- public static void checkServiceProcessorType() {
- Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
- }
-
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java
index da55e0e..a74a3e8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java
@@ -114,26 +114,15 @@ public class IgniteServiceDynamicCachesSelfTest extends GridCommonAbstractTest {
awaitPartitionMapExchange();
- if (ig.context().service() instanceof GridServiceProcessor) {
+ GridTestUtils.assertThrowsWithCause(() -> {
svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, key);
- assertNull(svcs.service(svcName));
+ return null;
+ }, ServiceDeploymentException.class);
- ig.createCache(ccfg);
- }
- else if (ig.context().service() instanceof IgniteServiceProcessor) {
- GridTestUtils.assertThrowsWithCause(() -> {
- svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, key);
-
- return null;
- }, ServiceDeploymentException.class);
-
- ig.createCache(ccfg);
+ ig.createCache(ccfg);
- svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, key);
- }
- else
- fail("Unexpected service implementation.");
+ svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, key);
try {
boolean res = GridTestUtils.waitForCondition(new PA() {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java
index 80984b2..c3ae8d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java
@@ -17,28 +17,18 @@
package org.apache.ignite.internal.processors.service;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.PA;
-import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceContext;
-import org.apache.ignite.testframework.GridStringLogger;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Assume;
import org.junit.Test;
/**
@@ -48,12 +38,6 @@ public class IgniteServiceReassignmentTest extends GridCommonAbstractTest {
/** */
private ServiceConfiguration srvcCfg;
- /** */
- private boolean useStrLog;
-
- /** */
- private List<IgniteLogger> strLoggers = new ArrayList<>();
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -61,16 +45,6 @@ public class IgniteServiceReassignmentTest extends GridCommonAbstractTest {
if (srvcCfg != null)
cfg.setServiceConfiguration(srvcCfg);
- if (useStrLog) {
- GridStringLogger strLog = new GridStringLogger(false, cfg.getGridLogger());
-
- strLog.logLength(100 * 1024);
-
- cfg.setGridLogger(strLog);
-
- strLoggers.add(strLog);
- }
-
return cfg;
}
@@ -207,118 +181,6 @@ public class IgniteServiceReassignmentTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception If failed.
- */
- @Test
- public void testZombieAssignmentsCleanup() throws Exception {
- Assume.assumeTrue(!isEventDrivenServiceProcessorEnabled());
-
- useStrLog = true;
-
- final int nodesCnt = 2;
- final int maxSvc = 30;
-
- try {
- startGridsMultiThreaded(nodesCnt);
-
- IgniteEx ignite = grid(0);
-
- IgniteInternalCache<GridServiceAssignmentsKey, Object> sysCache = ignite.utilityCache();
-
- List<GridServiceAssignmentsKey> zombieAssignmentsKeys = new ArrayList<>(maxSvc);
-
- // Adding some assignments without deployments.
- for (int i = 0; i < maxSvc; i++) {
- String name = "svc-" + i;
-
- ServiceConfiguration svcCfg = new ServiceConfiguration();
-
- svcCfg.setName(name);
-
- GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
-
- UUID nodeId = grid(i % nodesCnt).localNode().id();
-
- sysCache.put(key, new GridServiceAssignments(svcCfg, nodeId, ignite.cluster().topologyVersion()));
-
- zombieAssignmentsKeys.add(key);
- }
-
- // Simulate exchange with merge.
- GridTestUtils.runAsync(() -> startGrid(nodesCnt));
- GridTestUtils.runAsync(() -> startGrid(nodesCnt + 1));
- startGrid(nodesCnt + 2);
-
- awaitPartitionMapExchange();
-
- // Checking that all our assignments was removed.
- assertTrue("Found not empty assignments", GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- try {
- for (GridServiceAssignmentsKey key : zombieAssignmentsKeys) {
- if (sysCache.get(key) != null)
- return false;
- }
- } catch (IgniteCheckedException e) {
- fail(X.getFullStackTrace(e));
- }
-
- return true;
- }
- }, 5_000));
-
- for (IgniteLogger logger : strLoggers)
- assertFalse(logger.toString().contains("Getting affinity for topology version earlier than affinity is " +
- "calculated"));
- } finally {
- useStrLog = false;
-
- strLoggers.clear();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testNodeStopWhileThereAreCacheActivitiesInServiceProcessor() throws Exception {
- Assume.assumeTrue(!isEventDrivenServiceProcessorEnabled());
-
- final int nodesCnt = 2;
- final int maxSvc = 1024;
-
- startGridsMultiThreaded(nodesCnt);
-
- IgniteEx ignite = grid(0);
-
- IgniteInternalCache<GridServiceAssignmentsKey, Object> sysCache = ignite.utilityCache();
-
- // Adding some assignments without deployments.
- for (int i = 0; i < maxSvc; i++) {
- String name = "svc-" + i;
-
- ServiceConfiguration svcCfg = new ServiceConfiguration();
-
- svcCfg.setName(name);
-
- GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
-
- UUID nodeId = grid(i % nodesCnt).localNode().id();
-
- sysCache.put(key, new GridServiceAssignments(svcCfg, nodeId, ignite.cluster().topologyVersion()));
- }
-
- // Simulate exchange with merge.
- GridTestUtils.runAsync(() -> startGrid(nodesCnt));
- GridTestUtils.runAsync(() -> startGrid(nodesCnt + 1));
- startGrid(nodesCnt + 2);
-
- Thread.sleep((int)(1000 * ThreadLocalRandom.current().nextDouble()));
-
- stopAllGrids();
- }
-
- /**
* @param node Node.
* @throws Exception If failed.
*/
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentDiscoveryListenerNotificationOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentDiscoveryListenerNotificationOrderTest.java
index cde7f39..abd0ca4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentDiscoveryListenerNotificationOrderTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentDiscoveryListenerNotificationOrderTest.java
@@ -29,8 +29,6 @@
import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
- import org.junit.Assume;
- import org.junit.Before;
import org.junit.Test;
/**
@@ -42,12 +40,6 @@
* because it may be nullified in PME process at the end of exchange in {@link GridDhtPartitionsExchangeFuture#onDone()}.
*/
public class ServiceDeploymentDiscoveryListenerNotificationOrderTest extends GridCommonAbstractTest {
- /** */
- @Before
- public void check() {
- Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
- }
-
/**
* <b>Strongly depends on internal implementation of {@link GridEventStorageManager}.</b>
* <p/>
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentNonSerializableStaticConfigurationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentNonSerializableStaticConfigurationTest.java
index 462d205..9437d77 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentNonSerializableStaticConfigurationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentNonSerializableStaticConfigurationTest.java
@@ -25,8 +25,6 @@ import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Assume;
-import org.junit.Before;
import org.junit.Test;
/** */
@@ -37,12 +35,6 @@ public class ServiceDeploymentNonSerializableStaticConfigurationTest extends Gri
/** */
private final ListeningTestLogger log = new ListeningTestLogger(false, super.log);
- /** */
- @Before
- public void check() {
- Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
- }
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnActivationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnActivationTest.java
index 0bb3804..233ff4d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnActivationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnActivationTest.java
@@ -26,8 +26,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Assume;
-import org.junit.Before;
import org.junit.Test;
/**
@@ -57,12 +55,6 @@ public class ServiceDeploymentOnActivationTest extends GridCommonAbstractTest {
return cfg;
}
- /** */
- @Before
- public void check() {
- Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
- }
-
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
srvcCfg = null;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnClientDisconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnClientDisconnectTest.java
index 573ecca..3630695 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnClientDisconnectTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnClientDisconnectTest.java
@@ -34,8 +34,6 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Assume;
-import org.junit.Before;
import org.junit.Test;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
@@ -51,12 +49,6 @@ public class ServiceDeploymentOnClientDisconnectTest extends GridCommonAbstractT
/** */
private static final long CLIENT_RECONNECT_WAIT_TIMEOUT = 10_000L;
- /** */
- @Before
- public void check() {
- Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
- }
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessAbstractTest.java
index 26b3c58..5b4dc2a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessAbstractTest.java
@@ -30,8 +30,6 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Assume;
-import org.junit.Before;
/**
* Abstract class for tests of service deployment process.
@@ -40,12 +38,6 @@ public abstract class ServiceDeploymentProcessAbstractTest extends GridCommonAbs
/** Timeout to avoid tests hang. */
protected static final long TEST_FUTURE_WAIT_TIMEOUT = 60_000;
- /** */
- @Before
- public void check() {
- Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
- }
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceHotRedeploymentViaDeploymentSpiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceHotRedeploymentViaDeploymentSpiTest.java
index c108224..43ba53f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceHotRedeploymentViaDeploymentSpiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceHotRedeploymentViaDeploymentSpiTest.java
@@ -40,9 +40,7 @@ import org.apache.ignite.spi.deployment.DeploymentSpi;
import org.apache.ignite.spi.deployment.local.LocalDeploymentSpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.After;
-import org.junit.Assume;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
/**
@@ -68,12 +66,6 @@ public class ServiceHotRedeploymentViaDeploymentSpiTest extends GridCommonAbstra
}
/** */
- @BeforeClass
- public static void check() {
- Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
- }
-
- /** */
@Before
public void prepare() throws IOException {
srcTmpDir = Files.createTempDirectory(getClass().getSimpleName());
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceReassignmentFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceReassignmentFunctionSelfTest.java
index fe63266..05c914b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceReassignmentFunctionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceReassignmentFunctionSelfTest.java
@@ -96,7 +96,7 @@ public class ServiceReassignmentFunctionSelfTest {
}
/**
- * Mocks GridServiceProcessor to test method {@link IgniteServiceProcessor#reassign(IgniteUuid,
+ * Mocks IgniteServiceProcessor to test method {@link IgniteServiceProcessor#reassign(IgniteUuid,
* ServiceConfiguration, AffinityTopologyVersion, TreeMap)} AffinityTopologyVersion, Map)} )}.
*/
private IgniteServiceProcessor mockServiceProcessor() {
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 603d65a..cf7a573 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -148,8 +148,6 @@ import org.jetbrains.annotations.Nullable;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
-import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_DIRECTORY;
@@ -2315,14 +2313,6 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
}
/**
- * @return {@code false} if value of a system property "IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED" is "false",
- * otherwise {@code true}.
- */
- protected static boolean isEventDrivenServiceProcessorEnabled() {
- return getBoolean(IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED, true);
- }
-
- /**
* Wait for {@link EventType#EVT_NODE_METRICS_UPDATED} event will be receieved.
*
* @param countUpdate Number of events.
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
index 3ae0c56..4ac592f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
@@ -37,7 +37,6 @@ import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
@@ -45,7 +44,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
-import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
@@ -94,12 +92,9 @@ public class CacheMvccBasicContinuousQueryTest extends CacheMvccAbstractTest {
}, 3000);
for (Ignite node : G.allGrids()) {
- GridKernalContext ctx = ((IgniteEx)node).context();
- GridContinuousProcessor proc = ctx.continuous();
+ GridContinuousProcessor proc = ((IgniteEx)node).context().continuous();
- final int locInfosCnt = ctx.service() instanceof GridServiceProcessor ? 1 : 0;
-
- assertEquals(locInfosCnt, ((Map)U.field(proc, "locInfos")).size());
+ assertEquals(0, ((Map)U.field(proc, "locInfos")).size());
assertEquals(0, ((Map)U.field(proc, "rmtInfos")).size());
assertEquals(0, ((Map)U.field(proc, "startFuts")).size());
assertEquals(0, ((Map)U.field(proc, "stopFuts")).size());