You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/23 07:12:40 UTC

[44/50] [abbrv] ignite git commit: ignite-8205 Clear list of local services in GridServiceProcessor#onKernalStop

ignite-8205 Clear list of local services in GridServiceProcessor#onKernalStop

Signed-off-by: Andrey Gura <ag...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fbe24f8e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fbe24f8e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fbe24f8e

Branch: refs/heads/ignite-6083
Commit: fbe24f8e3b0d9016a69670ca2bc50766865adf38
Parents: 358d039
Author: Denis Mekhanikov <dm...@gmail.com>
Authored: Fri Apr 20 17:11:36 2018 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Fri Apr 20 17:11:36 2018 +0300

----------------------------------------------------------------------
 .../service/GridServiceProcessor.java           |   8 +-
 .../ServiceDeploymentOnActivationTest.java      | 244 +++++++++++++++++++
 .../testsuites/IgniteKernalSelfTestSuite.java   |   2 +
 3 files changed, 252 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fbe24f8e/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 63f5027..9cf27d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -107,7 +108,6 @@ import org.apache.ignite.thread.IgniteThreadFactory;
 import org.apache.ignite.thread.OomExceptionHandler;
 import org.apache.ignite.transactions.Transaction;
 import org.jetbrains.annotations.Nullable;
-import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SERVICES_COMPATIBILITY_MODE;
 import static org.apache.ignite.IgniteSystemProperties.getString;
@@ -245,11 +245,12 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                 ctx.cache().context().deploy().ignoreOwnership(true);
 
             if (!ctx.clientNode() && serviceCache.context().affinityNode()) {
+                // Register query listener and run it for local entries. It is also invoked on rebalancing.
                 serviceCache.context().continuousQueries().executeInternalQuery(
                     new ServiceEntriesListener(), null, true, true, false
                 );
             }
-            else {
+            else { // Listener for client nodes is registered in onContinuousProcessorStarted method.
                 assert !ctx.isDaemon();
 
                 ctx.closure().runLocalSafe(new Runnable() {
@@ -324,6 +325,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
         synchronized (locSvcs) {
             for (Collection<ServiceContextImpl> ctxs0 : locSvcs.values())
                 ctxs.addAll(ctxs0);
+
+            locSvcs.clear();
         }
 
         for (ServiceContextImpl ctx : ctxs) {
@@ -1627,6 +1630,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
 
             ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
 
+            // Process deployment on coordinator only.
             if (oldest.isLocal())
                 onDeployment(dep, topVer);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fbe24f8e/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnActivationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnActivationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnActivationTest.java
new file mode 100644
index 0000000..52d706b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnActivationTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/** */
+public class ServiceDeploymentOnActivationTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String SERVICE_NAME = "test-service";
+
+    /** */
+    private static final IgnitePredicate<ClusterNode> CLIENT_FILTER = new IgnitePredicate<ClusterNode>() {
+        @Override public boolean apply(ClusterNode node) {
+            return node.isClient();
+        }
+    };
+
+    /** */
+    private boolean client;
+
+    /** */
+    private boolean persistence;
+
+    /** */
+    private ServiceConfiguration srvcCfg;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
+        discoverySpi.setIpFinder(IP_FINDER);
+        cfg.setDiscoverySpi(discoverySpi);
+
+        cfg.setClientMode(client);
+
+        if (srvcCfg != null)
+            cfg.setServiceConfiguration(srvcCfg);
+
+        if (persistence) {
+            cfg.setDataStorageConfiguration(
+                new DataStorageConfiguration()
+                    .setDefaultDataRegionConfiguration(
+                        new DataRegionConfiguration()
+                            .setPersistenceEnabled(true)
+                            .setMaxSize(10 * 1024 * 1024)
+                    ).setWalMode(WALMode.LOG_ONLY)
+            );
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        client = false;
+        persistence = false;
+        srvcCfg = null;
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServersWithPersistence() throws Exception {
+        persistence = true;
+
+        checkRedeployment(2, 0, F.alwaysTrue(), 2, false, true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testClientsWithPersistence() throws Exception {
+        persistence = true;
+
+        checkRedeployment(2, 2, CLIENT_FILTER, 2, false, true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testServersWithoutPersistence() throws Exception {
+        persistence = false;
+
+        checkRedeployment(2, 0, F.alwaysTrue(), 2, false, false);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testClientsWithoutPersistence() throws Exception {
+        persistence = false;
+
+        checkRedeployment(2, 2, CLIENT_FILTER, 2, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServersStaticConfigWithPersistence() throws Exception {
+        persistence = true;
+
+        checkRedeployment(2, 0, F.alwaysTrue(), 2, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientsStaticConfigWithPersistence() throws Exception {
+        persistence = true;
+
+        checkRedeployment(2, 2, CLIENT_FILTER, 2, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServersStaticConfigWithoutPersistence() throws Exception {
+        persistence = false;
+
+        checkRedeployment(2, 0, F.alwaysTrue(), 2, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testClientsStaticConfigWithoutPersistence() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-8279");
+
+        persistence = false;
+
+        checkRedeployment(2, 2, CLIENT_FILTER, 2, true, true);
+    }
+
+    /**
+     * @param srvsNum Number of server nodes to start.
+     * @param clientsNum Number of client nodes to start.
+     * @param nodeFilter Node filter.
+     * @param deps Expected number of deployed services.
+     * @param isStatic Static or dynamic service deployment is used.
+     * @param expRedep {@code true} if services should be redeployed on activation. {@code false} otherwise.
+     * @throws Exception If failed.
+     */
+    private void checkRedeployment(int srvsNum, int clientsNum, IgnitePredicate<ClusterNode> nodeFilter, int deps,
+        boolean isStatic, boolean expRedep) throws Exception {
+
+        if (isStatic)
+            srvcCfg = getServiceConfiguration(nodeFilter);
+
+        CountDownLatch exeLatch = new CountDownLatch(deps);
+        CountDownLatch cancelLatch = new CountDownLatch(deps);
+
+        DummyService.exeLatch(SERVICE_NAME, exeLatch);
+        DummyService.cancelLatch(SERVICE_NAME, cancelLatch);
+
+        for (int i = 0; i < srvsNum; i++)
+            startGrid(i);
+
+        client = true;
+
+        for (int i = 0; i < clientsNum; i++)
+            startGrid(srvsNum + i);
+
+        Ignite ignite = grid(0);
+
+        ignite.cluster().active(true);
+
+        if (!isStatic) {
+            ServiceConfiguration srvcCfg = getServiceConfiguration(nodeFilter);
+
+            ignite.services().deploy(srvcCfg);
+        }
+
+        assertTrue(exeLatch.await(10, TimeUnit.SECONDS));
+
+        ignite.cluster().active(false);
+
+        assertTrue(cancelLatch.await(10, TimeUnit.SECONDS));
+
+        exeLatch = new CountDownLatch(expRedep ? deps : 1);
+
+        DummyService.exeLatch(SERVICE_NAME, exeLatch);
+
+        ignite.cluster().active(true);
+
+        if (expRedep)
+            assertTrue(exeLatch.await(10, TimeUnit.SECONDS));
+        else
+            assertFalse(exeLatch.await(1, TimeUnit.SECONDS));
+    }
+
+    /**
+     * @param nodeFilter Node filter.
+     * @return Service configuration.
+     */
+    private ServiceConfiguration getServiceConfiguration(IgnitePredicate<ClusterNode> nodeFilter) {
+        ServiceConfiguration srvcCfg = new ServiceConfiguration();
+        srvcCfg.setName(SERVICE_NAME);
+        srvcCfg.setMaxPerNodeCount(1);
+        srvcCfg.setNodeFilter(nodeFilter);
+        srvcCfg.setService(new DummyService());
+        return srvcCfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fbe24f8e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index f26205b..12e4802 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClas
 import org.apache.ignite.internal.processors.service.IgniteServiceDynamicCachesSelfTest;
 import org.apache.ignite.internal.processors.service.IgniteServiceProxyTimeoutInitializedTest;
 import org.apache.ignite.internal.processors.service.IgniteServiceReassignmentTest;
+import org.apache.ignite.internal.processors.service.ServiceDeploymentOnActivationTest;
 import org.apache.ignite.internal.processors.service.ServicePredicateAccessCacheTest;
 import org.apache.ignite.internal.util.GridStartupWithUndefinedIgniteHomeSelfTest;
 import org.apache.ignite.services.ServiceThreadPoolSelfTest;
@@ -146,6 +147,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite {
         suite.addTestSuite(ServiceThreadPoolSelfTest.class);
         suite.addTestSuite(GridServiceProcessorBatchDeploySelfTest.class);
         suite.addTestSuite(GridServiceDeploymentCompoundFutureSelfTest.class);
+        suite.addTestSuite(ServiceDeploymentOnActivationTest.class);
 
         suite.addTestSuite(IgniteServiceDeploymentClassLoadingDefaultMarshallerTest.class);
         suite.addTestSuite(IgniteServiceDeploymentClassLoadingJdkMarshallerTest.class);