You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/18 07:04:37 UTC
[18/21] ignite git commit: IGNITE-2249 - Do not deserialize services
on client node
IGNITE-2249 - Do not deserialize services on client node
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d5f77e2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d5f77e2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d5f77e2
Branch: refs/heads/ignite-1232
Commit: 7d5f77e2f9ad80ec298b96452e5f55f737a01701
Parents: 2ad4b5c
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Feb 17 18:21:54 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Feb 17 18:21:54 2016 -0800
----------------------------------------------------------------------
.../ignite/internal/MarshallerContextImpl.java | 3 +-
.../binary/CacheObjectBinaryProcessorImpl.java | 3 +-
.../CacheDataStructuresManager.java | 4 +-
.../continuous/CacheContinuousQueryHandler.java | 21 ++-
.../continuous/CacheContinuousQueryManager.java | 18 ++-
.../datastructures/DataStructuresProcessor.java | 1 +
.../service/GridServiceProcessor.java | 101 ++++++++++++-
.../processors/service/GridServiceProxy.java | 22 +--
.../GridServiceSerializationSelfTest.java | 149 +++++++++++++++++++
.../testsuites/IgniteKernalSelfTestSuite.java | 2 +
.../hadoop/jobtracker/HadoopJobTracker.java | 5 +-
11 files changed, 287 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index e3f2bc9..05fe8ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -83,7 +83,8 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir),
null,
ctx.cache().marshallerCache().context().affinityNode(),
- true
+ true,
+ false
);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index e0da8d1..624a453 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -277,7 +277,8 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
new MetaDataEntryListener(),
new MetaDataEntryFilter(),
false,
- true);
+ true,
+ false);
while (true) {
ClusterNode oldestSrvNode =
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 47c3dd9..b42e5e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -37,7 +37,6 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -285,7 +284,8 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
},
new QueueHeaderPredicate(),
cctx.isLocal() || (cctx.isReplicated() && cctx.affinityNode()),
- true);
+ true,
+ false);
}
GridCacheQueueProxy queue = queuesMap.get(hdr.id());
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 08fe62a..0324e41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -151,6 +151,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/** */
private AffinityTopologyVersion initTopVer;
+ /** */
+ private transient boolean ignoreClassNotFound;
+
/**
* Required by {@link Externalizable}.
*/
@@ -188,7 +191,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
int taskHash,
boolean skipPrimaryCheck,
boolean locCache,
- boolean keepBinary) {
+ boolean keepBinary,
+ boolean ignoreClassNotFound) {
assert topic != null;
assert locLsnr != null;
@@ -205,6 +209,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
this.skipPrimaryCheck = skipPrimaryCheck;
this.locCache = locCache;
this.keepBinary = keepBinary;
+ this.ignoreClassNotFound = ignoreClassNotFound;
cacheId = CU.cacheId(cacheName);
}
@@ -566,6 +571,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
final GridCacheContext cctx = cacheContext(ctx);
+ Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
+
for (CacheContinuousQueryEntry e : entries) {
GridCacheDeploymentManager depMgr = cctx.deploy();
@@ -582,19 +589,19 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
try {
e.unmarshal(cctx, ldr);
+
+ entries0.addAll(handleEvent(ctx, e));
}
catch (IgniteCheckedException ex) {
- U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
+ if (ignoreClassNotFound)
+ assert internal;
+ else
+ U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
}
}
final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
- Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
-
- for (CacheContinuousQueryEntry e : entries)
- entries0.addAll(handleEvent(ctx, e));
-
if (!entries0.isEmpty()) {
Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 840a61b..409c1da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -433,7 +433,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
false,
true,
loc,
- keepBinary);
+ keepBinary,
+ false);
}
/**
@@ -447,7 +448,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
public UUID executeInternalQuery(CacheEntryUpdatedListener<?, ?> locLsnr,
CacheEntryEventSerializableFilter rmtFilter,
boolean loc,
- boolean notifyExisting)
+ boolean notifyExisting,
+ boolean ignoreClassNotFound)
throws IgniteCheckedException
{
return executeQuery0(
@@ -462,7 +464,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
false,
true,
loc,
- false);
+ false,
+ ignoreClassNotFound);
}
/**
@@ -560,7 +563,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean sync,
boolean ignoreExpired,
boolean loc,
- final boolean keepBinary) throws IgniteCheckedException
+ final boolean keepBinary,
+ boolean ignoreClassNotFound) throws IgniteCheckedException
{
cctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -582,7 +586,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
taskNameHash,
skipPrimaryCheck,
cctx.isLocal(),
- keepBinary);
+ keepBinary,
+ ignoreClassNotFound);
IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ?
F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
@@ -790,7 +795,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
cfg.isSynchronous(),
false,
false,
- keepBinary);
+ keepBinary,
+ false);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 98848ee..445fc3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -248,6 +248,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
new DataStructuresEntryFilter(),
dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
+ false,
false);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 2841083..1a48e8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -44,8 +44,10 @@ import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridClosureCallMode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -58,21 +60,26 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceDescriptor;
@@ -166,11 +173,13 @@ public class GridServiceProcessor extends GridProcessorAdapter {
if (ctx.deploy().enabled())
ctx.cache().context().deploy().ignoreOwnership(true);
+ boolean affNode = cache.context().affinityNode();
+
cfgQryId = cache.context().continuousQueries().executeInternalQuery(
- new DeploymentListener(), null, cache.context().affinityNode(), true);
+ new DeploymentListener(), null, affNode, true, !affNode);
assignQryId = cache.context().continuousQueries().executeInternalQuery(
- new AssignmentListener(), null, cache.context().affinityNode(), true);
+ new AssignmentListener(), null, affNode, true, !affNode);
}
finally {
if (ctx.deploy().enabled())
@@ -544,6 +553,38 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
/**
+ * @param name Service name.
+ * @return Service topology.
+ */
+ public Map<UUID, Integer> serviceTopology(String name) throws IgniteCheckedException {
+ ClusterNode node = cache.affinity().mapKeyToNode(name);
+
+ if (node.version().compareTo(ServiceTopologyCallable.SINCE_VER) >= 0) {
+ return ctx.closure().callAsyncNoFailover(
+ GridClosureCallMode.BALANCE,
+ new ServiceTopologyCallable(name),
+ Collections.singletonList(node),
+ false
+ ).get();
+ }
+ else
+ return serviceTopology(cache, name);
+ }
+
+ /**
+ * @param cache Utility cache.
+ * @param svcName Service name.
+ * @return Service topology.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private static Map<UUID, Integer> serviceTopology(IgniteInternalCache<Object, Object> cache, String svcName)
+ throws IgniteCheckedException {
+ GridServiceAssignments val = (GridServiceAssignments)cache.get(new GridServiceAssignmentsKey(svcName));
+
+ return val != null ? val.assigns() : null;
+ }
+
+ /**
* @return Collection of service descriptors.
*/
public Collection<ServiceDescriptor> serviceDescriptors() {
@@ -1069,7 +1110,17 @@ public class GridServiceProcessor extends GridProcessorAdapter {
if (!(e.getKey() instanceof GridServiceDeploymentKey))
continue;
- GridServiceDeployment dep = (GridServiceDeployment)e.getValue();
+ GridServiceDeployment dep;
+
+ try {
+ dep = (GridServiceDeployment)e.getValue();
+ }
+ catch (IgniteException ex) {
+ if (X.hasCause(ex, ClassNotFoundException.class))
+ continue;
+ else
+ throw ex;
+ }
if (dep != null) {
svcName.set(dep.configuration().getName());
@@ -1346,7 +1397,17 @@ public class GridServiceProcessor extends GridProcessorAdapter {
if (!(e.getKey() instanceof GridServiceAssignmentsKey))
continue;
- GridServiceAssignments assigns = (GridServiceAssignments)e.getValue();
+ GridServiceAssignments assigns;
+
+ try {
+ assigns = (GridServiceAssignments)e.getValue();
+ }
+ catch (IgniteException ex) {
+ if (X.hasCause(ex, ClassNotFoundException.class))
+ continue;
+ else
+ throw ex;
+ }
if (assigns != null) {
svcName.set(assigns.name());
@@ -1467,4 +1528,34 @@ public class GridServiceProcessor extends GridProcessorAdapter {
return S.toString(ServiceAssignmentsPredicate.class, this);
}
}
-}
\ No newline at end of file
+
+ /**
+ */
+ @GridInternal
+ private static class ServiceTopologyCallable implements IgniteCallable<Map<UUID, Integer>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.7");
+
+ /** */
+ private final String svcName;
+
+ /** */
+ @IgniteInstanceResource
+ private IgniteEx ignite;
+
+ /**
+ * @param svcName Service name.
+ */
+ public ServiceTopologyCallable(String svcName) {
+ this.svcName = svcName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<UUID, Integer> call() throws Exception {
+ return serviceTopology(ignite.context().cache().utilityCache(), svcName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index e54ec7b..6bec8ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -47,7 +47,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.services.ServiceDescriptor;
import org.jsr166.ThreadLocalRandom8;
import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
@@ -210,7 +209,7 @@ class GridServiceProxy<T> implements Serializable {
* @param name Service name.
* @return Node with deployed service or {@code null} if there is no such node.
*/
- private ClusterNode nodeForService(String name, boolean sticky) {
+ private ClusterNode nodeForService(String name, boolean sticky) throws IgniteCheckedException {
do { // Repeat if reference to remote node was changed.
if (sticky) {
ClusterNode curNode = rmtNode.get();
@@ -237,11 +236,11 @@ class GridServiceProxy<T> implements Serializable {
* @return Local node if it has a given service deployed or randomly chosen remote node,
* otherwise ({@code null} if given service is not deployed on any node.
*/
- private ClusterNode randomNodeForService(String name) {
+ private ClusterNode randomNodeForService(String name) throws IgniteCheckedException {
if (hasLocNode && ctx.service().service(name) != null)
return ctx.discovery().localNode();
- Map<UUID, Integer> snapshot = serviceTopology(name);
+ Map<UUID, Integer> snapshot = ctx.service().serviceTopology(name);
if (snapshot == null || snapshot.isEmpty())
return null;
@@ -307,19 +306,6 @@ class GridServiceProxy<T> implements Serializable {
return null;
}
-
- /**
- * @param name Service name.
- * @return Map of number of service instances per node ID.
- */
- private Map<UUID, Integer> serviceTopology(String name) {
- for (ServiceDescriptor desc : ctx.service().serviceDescriptors()) {
- if (desc.name().equals(name))
- return desc.topologySnapshot();
- }
-
- return null;
- }
}
/**
@@ -403,4 +389,4 @@ class GridServiceProxy<T> implements Serializable {
return S.toString(ServiceProxyCallable.class, this);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceSerializationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceSerializationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceSerializationSelfTest.java
new file mode 100644
index 0000000..f709dfe
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceSerializationSelfTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.thread.IgniteThread;
+
+/**
+ * Service serialization test.
+ */
+public class GridServiceSerializationSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServiceSerialization() throws Exception {
+ try {
+ Ignite server = startGridsMultiThreaded(3);
+
+ Ignition.setClientMode(true);
+
+ Ignite client = startGrid("client");
+
+ server.services(server.cluster().forServers())
+ .deployClusterSingleton("my-service", new MyServiceImpl());
+
+ MyService svc = client.services().serviceProxy("my-service", MyService.class, false);
+
+ svc.hello();
+
+ assert MyServiceImpl.latch.await(2000, TimeUnit.MILLISECONDS);
+
+ assertEquals(0, MyServiceImpl.cnt.get());
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ */
+ private static interface MyService extends Service {
+ /** */
+ void hello();
+ }
+
+ /**
+ */
+ private static class MyServiceImpl implements MyService, Externalizable {
+ /** */
+ static final AtomicInteger cnt = new AtomicInteger();
+
+ /** */
+ static final CountDownLatch latch = new CountDownLatch(1);
+
+ /**
+ */
+ public MyServiceImpl() throws ClassNotFoundException {
+ if (clientThread())
+ throw new ClassNotFoundException("Expected ClassNotFoundException");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel(ServiceContext ctx) {
+ if (clientThread())
+ cnt.incrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void init(ServiceContext ctx) throws Exception {
+ if (clientThread())
+ cnt.incrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(ServiceContext ctx) throws Exception {
+ if (clientThread())
+ cnt.incrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void hello() {
+ if (clientThread())
+ cnt.incrementAndGet();
+
+ latch.countDown();
+ }
+
+ /**
+ * @return If current thread belongs to client.
+ */
+ private boolean clientThread() {
+ assert Thread.currentThread() instanceof IgniteThread;
+
+ return ((IgniteThread)Thread.currentThread()).getGridName().contains("client");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index d9e9b0f..214d375 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.service.GridServiceProcessorProxySe
import org.apache.ignite.internal.processors.service.GridServiceProcessorSingleNodeSelfTest;
import org.apache.ignite.internal.processors.service.GridServiceProcessorStopSelfTest;
import org.apache.ignite.internal.processors.service.GridServiceReassignmentSelfTest;
+import org.apache.ignite.internal.processors.service.GridServiceSerializationSelfTest;
import org.apache.ignite.internal.processors.service.ServicePredicateAccessCacheTest;
import org.apache.ignite.internal.util.GridStartupWithSpecifiedWorkDirectorySelfTest;
import org.apache.ignite.internal.util.GridStartupWithUndefinedIgniteHomeSelfTest;
@@ -123,6 +124,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite {
suite.addTestSuite(GridServiceProcessorStopSelfTest.class);
suite.addTestSuite(ServicePredicateAccessCacheTest.class);
suite.addTestSuite(GridServicePackagePrivateSelfTest.class);
+ suite.addTestSuite(GridServiceSerializationSelfTest.class);
return suite;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d5f77e2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index 81ff8ea..f4cf892 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -255,7 +255,8 @@ public class HadoopJobTracker extends HadoopComponent {
},
null,
true,
- true
+ true,
+ false
);
ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
@@ -1690,4 +1691,4 @@ public class HadoopJobTracker extends HadoopComponent {
*/
protected abstract void update(HadoopJobMetadata meta, HadoopJobMetadata cp);
}
-}
\ No newline at end of file
+}