You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/20 10:35:19 UTC

[1/3] ignite git commit: IGNITE-7197 Avoid NPE in services() by waiting on latch

Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 8735efda6 -> e39ebde17


IGNITE-7197 Avoid NPE in services() by waiting on latch


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/22671135
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/22671135
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/22671135

Branch: refs/heads/ignite-zk
Commit: 226711357db742a5073a98ef179997480e1b1a56
Parents: 6e8cfe3
Author: Ilya Kasnacheev <il...@gmail.com>
Authored: Wed Dec 20 13:05:24 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 20 13:05:24 2017 +0300

----------------------------------------------------------------------
 .../service/GridServiceProcessor.java           |  53 ++++--
 .../processors/task/GridTaskProcessor.java      |   2 +
 .../internal/GridJobServicesAddNodeTest.java    | 172 +++++++++++++++++++
 .../testsuites/IgniteComputeGridTestSuite.java  |   2 +
 4 files changed, 216 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/22671135/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 7097735..8581023 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -160,11 +161,14 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
     private ThreadLocal<String> svcName = new ThreadLocal<>();
 
     /** Service cache. */
-    private IgniteInternalCache<Object, Object> cache;
+    private volatile IgniteInternalCache<Object, Object> serviceCache;
 
     /** Topology listener. */
     private DiscoveryEventListener topLsnr = new TopologyListener();
 
+    /** */
+    private final CountDownLatch startLatch = new CountDownLatch(1);
+
     /**
      * @param ctx Kernal context.
      */
@@ -223,19 +227,21 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
      * @throws IgniteCheckedException If failed.
      */
     private void onKernalStart0() throws IgniteCheckedException {
-        updateUtilityCache();
-
         if (!ctx.clientNode())
             ctx.event().addDiscoveryEventListener(topLsnr, EVTS);
 
+        updateUtilityCache();
+
+        startLatch.countDown();
+
         try {
             if (ctx.deploy().enabled())
                 ctx.cache().context().deploy().ignoreOwnership(true);
 
             if (!ctx.clientNode()) {
-                assert cache.context().affinityNode();
+                assert serviceCache.context().affinityNode();
 
-                cache.context().continuousQueries().executeInternalQuery(
+                serviceCache.context().continuousQueries().executeInternalQuery(
                     new ServiceEntriesListener(), null, true, true, false
                 );
             }
@@ -246,7 +252,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                     @Override public void run() {
                         try {
                             Iterable<CacheEntryEvent<?, ?>> entries =
-                                cache.context().continuousQueries().existingEntries(false, null);
+                                serviceCache.context().continuousQueries().existingEntries(false, null);
 
                             onSystemCacheUpdated(entries);
                         }
@@ -275,7 +281,17 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
      *
      */
     public void updateUtilityCache() {
-        cache = ctx.cache().utilityCache();
+        serviceCache = ctx.cache().utilityCache();
+    }
+
+    /**
+     * @return Service cache.
+     */
+    private IgniteInternalCache<Object, Object> serviceCache() {
+        if (serviceCache == null)
+            U.awaitQuiet(startLatch);
+
+        return serviceCache;
     }
 
     /** {@inheritDoc} */
@@ -292,6 +308,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
             this.busyLock = null;
         }
 
+        startLatch.countDown();
+
         U.shutdownNow(GridServiceProcessor.class, depExe, log);
 
         if (!ctx.clientNode())
@@ -617,7 +635,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                 if (cfgsCp.size() == 1)
                     writeServiceToCache(res, cfgsCp.get(0));
                 else if (cfgsCp.size() > 1) {
-                    try (Transaction tx = cache.txStart(PESSIMISTIC, READ_COMMITTED)) {
+                    try (Transaction tx = serviceCache().txStart(PESSIMISTIC, READ_COMMITTED)) {
                         for (ServiceConfiguration cfg : cfgsCp) {
                             try {
                                 writeServiceToCache(res, cfg);
@@ -709,7 +727,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
 
             GridServiceDeploymentKey key = new GridServiceDeploymentKey(name);
 
-            GridServiceDeployment dep = (GridServiceDeployment)cache.getAndPutIfAbsent(key,
+            GridServiceDeployment dep = (GridServiceDeployment)serviceCache().getAndPutIfAbsent(key,
                 new GridServiceDeployment(ctx.localNodeId(), cfg));
 
             if (dep != null) {
@@ -809,7 +827,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
 
             List<String> toRollback = new ArrayList<>();
 
-            try (Transaction tx = cache.txStart(PESSIMISTIC, READ_COMMITTED)) {
+            try (Transaction tx = serviceCache().txStart(PESSIMISTIC, READ_COMMITTED)) {
                 for (String name : svcNames) {
                     if (res == null)
                         res = new GridCompoundFuture<>();
@@ -882,7 +900,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
             GridServiceDeploymentKey key = new GridServiceDeploymentKey(name);
 
             try {
-                if (cache.getAndRemove(key) == null) {
+                if (serviceCache().getAndRemove(key) == null) {
                     // Remove future from local map if service was not deployed.
                     undepFuts.remove(name, fut);
 
@@ -910,6 +928,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
      * @throws IgniteCheckedException On error.
      */
     public Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException {
+        IgniteInternalCache<Object, Object> cache = serviceCache();
+
         ClusterNode node = cache.affinity().mapKeyToNode(name);
 
         final ServiceTopologyCallable call = new ServiceTopologyCallable(name);
@@ -952,7 +972,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
             ServiceDescriptorImpl desc = new ServiceDescriptorImpl(dep);
 
             try {
-                GridServiceAssignments assigns = (GridServiceAssignments)cache.getForcePrimary(
+                GridServiceAssignments assigns = (GridServiceAssignments)serviceCache().getForcePrimary(
                     new GridServiceAssignmentsKey(dep.configuration().getName()));
 
                 if (assigns != null) {
@@ -1117,8 +1137,9 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
      * @throws IgniteCheckedException If failed.
      */
     private void reassign(GridServiceDeployment dep, AffinityTopologyVersion topVer) throws IgniteCheckedException {
-        ServiceConfiguration cfg = dep.configuration();
+        IgniteInternalCache<Object, Object> cache = serviceCache();
 
+        ServiceConfiguration cfg = dep.configuration();
         Object nodeFilter = cfg.getNodeFilter();
 
         if (nodeFilter != null)
@@ -1475,6 +1496,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
     @SuppressWarnings("unchecked")
     private Iterator<Cache.Entry<Object, Object>> serviceEntries(IgniteBiPredicate<Object, Object> p) {
         try {
+            IgniteInternalCache<Object, Object> cache = serviceCache();
+
             GridCacheQueryManager qryMgr = cache.context().queries();
 
             CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, null, false);
@@ -1615,6 +1638,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
             GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
 
             // Remove assignment on primary node in case of undeploy.
+            IgniteInternalCache<Object, Object> cache = serviceCache();
+
             if (cache.cache().affinity().isPrimary(ctx.discovery().localNode(), key)) {
                 try {
                     cache.getAndRemove(key);
@@ -1783,6 +1808,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                         Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceAssignmentsPredicate.INSTANCE);
 
                         // Clean up zombie assignments.
+                        IgniteInternalCache<Object, Object> cache = serviceCache();
+
                         while (it.hasNext()) {
                             Cache.Entry<Object, Object> e = it.next();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/22671135/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 25a38ac..871d945 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -198,6 +198,8 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
             lock.writeUnlock();
         }
 
+        startLatch.countDown();
+
         int size = tasks.size();
 
         if (size > 0) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/22671135/modules/core/src/test/java/org/apache/ignite/internal/GridJobServicesAddNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobServicesAddNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobServicesAddNodeTest.java
new file mode 100644
index 0000000..4b8b494
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobServicesAddNodeTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.service.DummyService;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.CAX;
+import org.apache.ignite.internal.util.typedef.CIX1;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonTest;
+
+/**
+ * Tests multiple parallel jobs execution, accessing services(), while starting new nodes.
+ */
+@GridCommonTest(group = "Kernal Self")
+public class GridJobServicesAddNodeTest extends GridCommonAbstractTest {
+    /** */
+    private static final int LOG_MOD = 100;
+
+    /** */
+    private static final int MAX_ADD_NODES = 64;
+
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrid(1);
+        startGrid(2);
+
+        assertEquals(2, grid(1).cluster().nodes().size());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        c.setCommunicationSpi(commSpi);
+
+        return c;
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testServiceDescriptorsJob() throws Exception {
+        final int tasks = 5000;
+        final int threads = 10;
+
+        final Ignite ignite1 = grid(1);
+        final CountDownLatch latch = new CountDownLatch(tasks);
+        final AtomicInteger jobsCnt = new AtomicInteger();
+        final AtomicInteger resCnt = new AtomicInteger();
+
+        ignite1.services().deployClusterSingleton("jobsSvc", new DummyService());
+
+        GridTestUtils.runMultiThreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                while (true) {
+                    int cnt = jobsCnt.incrementAndGet();
+
+                    if (cnt > 5000)
+                        break;
+
+                    IgniteCallable<Boolean> job;
+
+                    job = new ServiceDescriptorsJob();
+
+                    IgniteFuture<Boolean> fut = ignite1.compute().callAsync(job);
+
+                    if (cnt % LOG_MOD == 0)
+                        X.println("Submitted jobs: " + cnt);
+
+                    fut.listen(new CIX1<IgniteFuture<Boolean>>() {
+                        @Override public void applyx(IgniteFuture<Boolean> f) {
+                            try {
+                                assert f.get();
+
+                                long cnt = resCnt.incrementAndGet();
+
+                                if (cnt % LOG_MOD == 0)
+                                    X.println("Results count: " + cnt);
+                            }
+                            finally {
+                                latch.countDown();
+                            }
+                        }
+                    });
+
+                    IgniteUtils.sleep(5);
+                }
+            }
+        }, threads, "TEST-THREAD");
+
+        int additionalNodesStarted = 0;
+        while (!latch.await(threads, TimeUnit.MILLISECONDS)) {
+            if (additionalNodesStarted++ <= MAX_ADD_NODES) {
+                startGrid(2 + additionalNodesStarted);
+            }
+        }
+
+        assertEquals("Jobs cnt != Results cnt", jobsCnt.get() - threads, resCnt.get());
+    }
+
+    /**
+     * Test service enumerating job.
+     */
+    @SuppressWarnings({"PublicInnerClass"})
+    public static class ServiceDescriptorsJob implements IgniteCallable<Boolean> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public Boolean call() throws Exception {
+            try {
+                return ignite.services().serviceDescriptors().iterator().hasNext();
+            } catch (Exception e) {
+                e.printStackTrace();
+
+                return false;
+            } finally {
+                Thread.sleep(10);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/22671135/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index ac3de73..2ffa11e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.GridJobCheckpointCleanupSelfTest;
 import org.apache.ignite.internal.GridJobCollisionCancelSelfTest;
 import org.apache.ignite.internal.GridJobContextSelfTest;
 import org.apache.ignite.internal.GridJobMasterLeaveAwareSelfTest;
+import org.apache.ignite.internal.GridJobServicesAddNodeTest;
 import org.apache.ignite.internal.GridJobStealingSelfTest;
 import org.apache.ignite.internal.GridJobStealingZeroActiveJobsSelfTest;
 import org.apache.ignite.internal.GridJobSubjectIdSelfTest;
@@ -160,6 +161,7 @@ public class IgniteComputeGridTestSuite {
         suite.addTestSuite(IgniteRoundRobinErrorAfterClientReconnectTest.class);
         suite.addTestSuite(PublicThreadpoolStarvationTest.class);
         suite.addTestSuite(StripedExecutorTest.class);
+        suite.addTestSuite(GridJobServicesAddNodeTest.class);
 
         suite.addTestSuite(IgniteComputeCustomExecutorConfigurationSelfTest.class);
         suite.addTestSuite(IgniteComputeCustomExecutorSelfTest.class);


[3/3] ignite git commit: zk

Posted by sb...@apache.org.
zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e39ebde1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e39ebde1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e39ebde1

Branch: refs/heads/ignite-zk
Commit: e39ebde17a274c1413c1fa9958d7db0008db04ba
Parents: 58f187d
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 20 13:28:00 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 20 13:28:00 2017 +0300

----------------------------------------------------------------------
 .../discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java   | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e39ebde1/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 829d3a8..0f4b0e7 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -2664,7 +2664,10 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws Exception {
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
-                return spis.containsKey(nodeName);
+                ZookeeperDiscoverySpi spi = spis.get(nodeName);
+
+                return spi != null && GridTestUtils.getFieldValue(spi, "impl") != null;
+
             }
         }, 5000);
 


[2/3] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-zk

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/58f187d3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/58f187d3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/58f187d3

Branch: refs/heads/ignite-zk
Commit: 58f187d3a7fcaa7810d4253d9d121ec0751803cf
Parents: 8735efd 2267113
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 20 13:22:37 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 20 13:22:37 2017 +0300

----------------------------------------------------------------------
 .../service/GridServiceProcessor.java           |  53 ++++--
 .../processors/task/GridTaskProcessor.java      |   2 +
 .../internal/GridJobServicesAddNodeTest.java    | 172 +++++++++++++++++++
 .../testsuites/IgniteComputeGridTestSuite.java  |   2 +
 4 files changed, 216 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/58f187d3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------