You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/03/16 13:36:47 UTC

[08/13] ignite git commit: IGNITE-4740 - Fix. Service could be deployed/undeployed twice on concurrent cancel and discovery event.

IGNITE-4740 - Fix. Service could be deployed/undeployed twice on concurrent cancel and discovery event.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9fcb3e74
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9fcb3e74
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9fcb3e74

Branch: refs/heads/master
Commit: 9fcb3e74f91c8497b7b1358cdff40950cdf5c568
Parents: c0e2df2
Author: dkarachentsev <dk...@gridgain.com>
Authored: Tue Feb 28 16:05:06 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Tue Feb 28 16:05:06 2017 +0300

----------------------------------------------------------------------
 .../cache/DynamicCacheChangeBatch.java          |  14 ++
 .../service/GridServiceProcessor.java           |  49 +++---
 .../GridServiceContinuousQueryRedeploy.java     | 167 +++++++++++++++++++
 .../testsuites/IgniteKernalSelfTestSuite.java   |   2 +
 4 files changed, 208 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 4dcff9b..a250063 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -113,6 +113,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
         return clientReconnect;
     }
 
+    /**
+     * @return {@code True} if request should trigger partition exchange.
+     */
+    public boolean exchangeNeeded() {
+        if (reqs != null) {
+            for (DynamicCacheChangeRequest req : reqs) {
+                if (req.exchangeNeeded())
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DynamicCacheChangeBatch.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 3690f35..4eeafed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -65,10 +65,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.continuous.AbstractContinuousMessage;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridEmptyIterator;
@@ -1468,19 +1470,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         else {
             String name = e.getKey().name();
 
-            svcName.set(name);
-
-            Collection<ServiceContextImpl> ctxs;
-
-            synchronized (locSvcs) {
-                ctxs = locSvcs.remove(name);
-            }
-
-            if (ctxs != null) {
-                synchronized (ctxs) {
-                    cancel(ctxs, ctxs.size());
-                }
-            }
+            undeploy(name);
 
             // Finish deployment futures if undeployment happened.
             GridFutureAdapter<?> fut = depFuts.remove(name);
@@ -1586,6 +1576,12 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                         if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
                             return;
                     }
+                    else if (msg instanceof DynamicCacheChangeBatch) {
+                        if (!((DynamicCacheChangeBatch)msg).exchangeNeeded())
+                            return;
+                    }
+                    else
+                        return;
                 }
                 else
                     topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
@@ -1771,21 +1767,26 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             }
         }
         // Handle undeployment.
-        else {
-            String name = e.getKey().name();
+        else
+            undeploy(e.getKey().name());
+    }
 
-            svcName.set(name);
 
-            Collection<ServiceContextImpl> ctxs;
+    /**
+     * @param name Name.
+     */
+    private void undeploy(String name) {
+        svcName.set(name);
 
-            synchronized (locSvcs) {
-                ctxs = locSvcs.remove(name);
-            }
+        Collection<ServiceContextImpl> ctxs;
 
-            if (ctxs != null) {
-                synchronized (ctxs) {
-                    cancel(ctxs, ctxs.size());
-                }
+        synchronized (locSvcs) {
+            ctxs = locSvcs.remove(name);
+        }
+
+        if (ctxs != null) {
+            synchronized (ctxs) {
+                cancel(ctxs, ctxs.size());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java
new file mode 100644
index 0000000..1a9ef3a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests whether concurrent service cancel and registering ContinuousQuery doesn't causes
+ * service redeployment.
+ */
+public class GridServiceContinuousQueryRedeploy extends GridCommonAbstractTest {
+    /** */
+    private static final String CACHE_NAME = "TEST_CACHE";
+
+    /** */
+    private static final String TEST_KEY = "TEST_KEY";
+
+    /** */
+    private static final String SERVICE_NAME = "service1";
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServiceRedeploymentAfterCancel() throws Exception {
+        final Ignite ignite = startGrid(0);
+
+        final IgniteCache<Object, Object> managementCache = ignite.getOrCreateCache(CACHE_NAME);
+
+        final ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+        final List<Object> evts = Collections.synchronizedList(new ArrayList<>());
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+            @Override public void onUpdated(
+                Iterable<CacheEntryEvent<?, ?>> iterable) throws CacheEntryListenerException {
+                for (CacheEntryEvent<?, ?> event : iterable)
+                    evts.add(event);
+            }
+        });
+
+        int iterations = 100;
+
+        while (iterations-- > 0) {
+            QueryCursor quorumCursor = managementCache.query(qry);
+
+            IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    System.out.println("Deploy " + SERVICE_NAME);
+                    deployService(ignite);
+
+                    return null;
+                }
+            });
+
+            IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    System.out.println("Undeploy " + SERVICE_NAME);
+                    ignite.services().cancel(SERVICE_NAME);
+
+                    return null;
+                }
+            });
+
+            fut1.get();
+            fut2.get();
+
+            U.sleep(100);
+
+            assert evts.size() <= 1 : evts.size();
+
+            ignite.services().cancel("service1");
+
+            evts.clear();
+
+            quorumCursor.close();
+        }
+
+    }
+
+    /**
+     * @param ignite Ignite.
+     */
+    private void deployService(final Ignite ignite) {
+        ServiceConfiguration svcCfg = new ServiceConfiguration();
+
+        svcCfg.setService(new ManagementService());
+        svcCfg.setName(SERVICE_NAME);
+        svcCfg.setTotalCount(1);
+        svcCfg.setMaxPerNodeCount(1);
+        svcCfg.setNodeFilter(new IgnitePredicate<ClusterNode>() {
+            @Override public boolean apply(ClusterNode node) {
+                return !node.isClient();
+            }
+        });
+
+        ignite.services().deploy(svcCfg);
+    }
+
+    /**
+     *
+     */
+    public static class ManagementService implements Service {
+        /** */
+        private final String name = UUID.randomUUID().toString();
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public void cancel(ServiceContext ctx) {
+            System.out.println(name + " shutdown.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void init(ServiceContext ctx) throws Exception {
+            System.out.println(name + " initializing.");
+
+            ignite.cache(CACHE_NAME).put(TEST_KEY, name + " init");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute(ServiceContext ctx) throws Exception {
+            // No-op
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index 350b715..5977702 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cluster.GridAddressResolverSelfTest
 import org.apache.ignite.internal.processors.cluster.GridUpdateNotifierSelfTest;
 import org.apache.ignite.internal.processors.port.GridPortProcessorSelfTest;
 import org.apache.ignite.internal.processors.service.GridServiceClientNodeTest;
+import org.apache.ignite.internal.processors.service.GridServiceContinuousQueryRedeploy;
 import org.apache.ignite.internal.processors.service.GridServicePackagePrivateSelfTest;
 import org.apache.ignite.internal.processors.service.GridServiceProcessorMultiNodeConfigSelfTest;
 import org.apache.ignite.internal.processors.service.GridServiceProcessorMultiNodeSelfTest;
@@ -143,6 +144,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridServiceProxyClientReconnectSelfTest.class);
         suite.addTestSuite(IgniteServiceReassignmentTest.class);
         suite.addTestSuite(IgniteServiceProxyTimeoutInitializedTest.class);
+        suite.addTestSuite(GridServiceContinuousQueryRedeploy.class);
 
         suite.addTestSuite(IgniteServiceDeploymentClassLoadingDefaultMarshallerTest.class);
         suite.addTestSuite(IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest.class);