You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/12/20 10:35:19 UTC
[1/3] ignite git commit: IGNITE-7197 Avoid NPE in services() by
waiting on latch
Repository: ignite
Updated Branches:
refs/heads/ignite-zk 8735efda6 -> e39ebde17
IGNITE-7197 Avoid NPE in services() by waiting on latch
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/22671135
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/22671135
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/22671135
Branch: refs/heads/ignite-zk
Commit: 226711357db742a5073a98ef179997480e1b1a56
Parents: 6e8cfe3
Author: Ilya Kasnacheev <il...@gmail.com>
Authored: Wed Dec 20 13:05:24 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 20 13:05:24 2017 +0300
----------------------------------------------------------------------
.../service/GridServiceProcessor.java | 53 ++++--
.../processors/task/GridTaskProcessor.java | 2 +
.../internal/GridJobServicesAddNodeTest.java | 172 +++++++++++++++++++
.../testsuites/IgniteComputeGridTestSuite.java | 2 +
4 files changed, 216 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/22671135/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 7097735..8581023 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -160,11 +161,14 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
private ThreadLocal<String> svcName = new ThreadLocal<>();
/** Service cache. */
- private IgniteInternalCache<Object, Object> cache;
+ private volatile IgniteInternalCache<Object, Object> serviceCache;
/** Topology listener. */
private DiscoveryEventListener topLsnr = new TopologyListener();
+ /** */
+ private final CountDownLatch startLatch = new CountDownLatch(1);
+
/**
* @param ctx Kernal context.
*/
@@ -223,19 +227,21 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
* @throws IgniteCheckedException If failed.
*/
private void onKernalStart0() throws IgniteCheckedException {
- updateUtilityCache();
-
if (!ctx.clientNode())
ctx.event().addDiscoveryEventListener(topLsnr, EVTS);
+ updateUtilityCache();
+
+ startLatch.countDown();
+
try {
if (ctx.deploy().enabled())
ctx.cache().context().deploy().ignoreOwnership(true);
if (!ctx.clientNode()) {
- assert cache.context().affinityNode();
+ assert serviceCache.context().affinityNode();
- cache.context().continuousQueries().executeInternalQuery(
+ serviceCache.context().continuousQueries().executeInternalQuery(
new ServiceEntriesListener(), null, true, true, false
);
}
@@ -246,7 +252,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
@Override public void run() {
try {
Iterable<CacheEntryEvent<?, ?>> entries =
- cache.context().continuousQueries().existingEntries(false, null);
+ serviceCache.context().continuousQueries().existingEntries(false, null);
onSystemCacheUpdated(entries);
}
@@ -275,7 +281,17 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
*
*/
public void updateUtilityCache() {
- cache = ctx.cache().utilityCache();
+ serviceCache = ctx.cache().utilityCache();
+ }
+
+ /**
+ * @return Service cache.
+ */
+ private IgniteInternalCache<Object, Object> serviceCache() {
+ if (serviceCache == null)
+ U.awaitQuiet(startLatch);
+
+ return serviceCache;
}
/** {@inheritDoc} */
@@ -292,6 +308,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
this.busyLock = null;
}
+ startLatch.countDown();
+
U.shutdownNow(GridServiceProcessor.class, depExe, log);
if (!ctx.clientNode())
@@ -617,7 +635,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
if (cfgsCp.size() == 1)
writeServiceToCache(res, cfgsCp.get(0));
else if (cfgsCp.size() > 1) {
- try (Transaction tx = cache.txStart(PESSIMISTIC, READ_COMMITTED)) {
+ try (Transaction tx = serviceCache().txStart(PESSIMISTIC, READ_COMMITTED)) {
for (ServiceConfiguration cfg : cfgsCp) {
try {
writeServiceToCache(res, cfg);
@@ -709,7 +727,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
GridServiceDeploymentKey key = new GridServiceDeploymentKey(name);
- GridServiceDeployment dep = (GridServiceDeployment)cache.getAndPutIfAbsent(key,
+ GridServiceDeployment dep = (GridServiceDeployment)serviceCache().getAndPutIfAbsent(key,
new GridServiceDeployment(ctx.localNodeId(), cfg));
if (dep != null) {
@@ -809,7 +827,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
List<String> toRollback = new ArrayList<>();
- try (Transaction tx = cache.txStart(PESSIMISTIC, READ_COMMITTED)) {
+ try (Transaction tx = serviceCache().txStart(PESSIMISTIC, READ_COMMITTED)) {
for (String name : svcNames) {
if (res == null)
res = new GridCompoundFuture<>();
@@ -882,7 +900,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
GridServiceDeploymentKey key = new GridServiceDeploymentKey(name);
try {
- if (cache.getAndRemove(key) == null) {
+ if (serviceCache().getAndRemove(key) == null) {
// Remove future from local map if service was not deployed.
undepFuts.remove(name, fut);
@@ -910,6 +928,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
* @throws IgniteCheckedException On error.
*/
public Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException {
+ IgniteInternalCache<Object, Object> cache = serviceCache();
+
ClusterNode node = cache.affinity().mapKeyToNode(name);
final ServiceTopologyCallable call = new ServiceTopologyCallable(name);
@@ -952,7 +972,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
ServiceDescriptorImpl desc = new ServiceDescriptorImpl(dep);
try {
- GridServiceAssignments assigns = (GridServiceAssignments)cache.getForcePrimary(
+ GridServiceAssignments assigns = (GridServiceAssignments)serviceCache().getForcePrimary(
new GridServiceAssignmentsKey(dep.configuration().getName()));
if (assigns != null) {
@@ -1117,8 +1137,9 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
* @throws IgniteCheckedException If failed.
*/
private void reassign(GridServiceDeployment dep, AffinityTopologyVersion topVer) throws IgniteCheckedException {
- ServiceConfiguration cfg = dep.configuration();
+ IgniteInternalCache<Object, Object> cache = serviceCache();
+ ServiceConfiguration cfg = dep.configuration();
Object nodeFilter = cfg.getNodeFilter();
if (nodeFilter != null)
@@ -1475,6 +1496,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
@SuppressWarnings("unchecked")
private Iterator<Cache.Entry<Object, Object>> serviceEntries(IgniteBiPredicate<Object, Object> p) {
try {
+ IgniteInternalCache<Object, Object> cache = serviceCache();
+
GridCacheQueryManager qryMgr = cache.context().queries();
CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, null, false);
@@ -1615,6 +1638,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name);
// Remove assignment on primary node in case of undeploy.
+ IgniteInternalCache<Object, Object> cache = serviceCache();
+
if (cache.cache().affinity().isPrimary(ctx.discovery().localNode(), key)) {
try {
cache.getAndRemove(key);
@@ -1783,6 +1808,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceAssignmentsPredicate.INSTANCE);
// Clean up zombie assignments.
+ IgniteInternalCache<Object, Object> cache = serviceCache();
+
while (it.hasNext()) {
Cache.Entry<Object, Object> e = it.next();
http://git-wip-us.apache.org/repos/asf/ignite/blob/22671135/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 25a38ac..871d945 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -198,6 +198,8 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha
lock.writeUnlock();
}
+ startLatch.countDown();
+
int size = tasks.size();
if (size > 0) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/22671135/modules/core/src/test/java/org/apache/ignite/internal/GridJobServicesAddNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobServicesAddNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobServicesAddNodeTest.java
new file mode 100644
index 0000000..4b8b494
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobServicesAddNodeTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.service.DummyService;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.CAX;
+import org.apache.ignite.internal.util.typedef.CIX1;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonTest;
+
+/**
+ * Tests multiple parallel jobs execution, accessing services(), while starting new nodes.
+ */
+@GridCommonTest(group = "Kernal Self")
+public class GridJobServicesAddNodeTest extends GridCommonAbstractTest {
+ /** */
+ private static final int LOG_MOD = 100;
+
+ /** */
+ private static final int MAX_ADD_NODES = 64;
+
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrid(1);
+ startGrid(2);
+
+ assertEquals(2, grid(1).cluster().nodes().size());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+
+ c.setDiscoverySpi(disco);
+
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ c.setCommunicationSpi(commSpi);
+
+ return c;
+ }
+
+ /**
+ * @throws Exception If test failed.
+ */
+ public void testServiceDescriptorsJob() throws Exception {
+ final int tasks = 5000;
+ final int threads = 10;
+
+ final Ignite ignite1 = grid(1);
+ final CountDownLatch latch = new CountDownLatch(tasks);
+ final AtomicInteger jobsCnt = new AtomicInteger();
+ final AtomicInteger resCnt = new AtomicInteger();
+
+ ignite1.services().deployClusterSingleton("jobsSvc", new DummyService());
+
+ GridTestUtils.runMultiThreadedAsync(new CAX() {
+ @Override public void applyx() throws IgniteCheckedException {
+ while (true) {
+ int cnt = jobsCnt.incrementAndGet();
+
+ if (cnt > 5000)
+ break;
+
+ IgniteCallable<Boolean> job;
+
+ job = new ServiceDescriptorsJob();
+
+ IgniteFuture<Boolean> fut = ignite1.compute().callAsync(job);
+
+ if (cnt % LOG_MOD == 0)
+ X.println("Submitted jobs: " + cnt);
+
+ fut.listen(new CIX1<IgniteFuture<Boolean>>() {
+ @Override public void applyx(IgniteFuture<Boolean> f) {
+ try {
+ assert f.get();
+
+ long cnt = resCnt.incrementAndGet();
+
+ if (cnt % LOG_MOD == 0)
+ X.println("Results count: " + cnt);
+ }
+ finally {
+ latch.countDown();
+ }
+ }
+ });
+
+ IgniteUtils.sleep(5);
+ }
+ }
+ }, threads, "TEST-THREAD");
+
+ int additionalNodesStarted = 0;
+ while (!latch.await(threads, TimeUnit.MILLISECONDS)) {
+ if (additionalNodesStarted++ <= MAX_ADD_NODES) {
+ startGrid(2 + additionalNodesStarted);
+ }
+ }
+
+ assertEquals("Jobs cnt != Results cnt", jobsCnt.get() - threads, resCnt.get());
+ }
+
+ /**
+ * Test service enumerating job.
+ */
+ @SuppressWarnings({"PublicInnerClass"})
+ public static class ServiceDescriptorsJob implements IgniteCallable<Boolean> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() throws Exception {
+ try {
+ return ignite.services().serviceDescriptors().iterator().hasNext();
+ } catch (Exception e) {
+ e.printStackTrace();
+
+ return false;
+ } finally {
+ Thread.sleep(10);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/22671135/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index ac3de73..2ffa11e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.GridJobCheckpointCleanupSelfTest;
import org.apache.ignite.internal.GridJobCollisionCancelSelfTest;
import org.apache.ignite.internal.GridJobContextSelfTest;
import org.apache.ignite.internal.GridJobMasterLeaveAwareSelfTest;
+import org.apache.ignite.internal.GridJobServicesAddNodeTest;
import org.apache.ignite.internal.GridJobStealingSelfTest;
import org.apache.ignite.internal.GridJobStealingZeroActiveJobsSelfTest;
import org.apache.ignite.internal.GridJobSubjectIdSelfTest;
@@ -160,6 +161,7 @@ public class IgniteComputeGridTestSuite {
suite.addTestSuite(IgniteRoundRobinErrorAfterClientReconnectTest.class);
suite.addTestSuite(PublicThreadpoolStarvationTest.class);
suite.addTestSuite(StripedExecutorTest.class);
+ suite.addTestSuite(GridJobServicesAddNodeTest.class);
suite.addTestSuite(IgniteComputeCustomExecutorConfigurationSelfTest.class);
suite.addTestSuite(IgniteComputeCustomExecutorSelfTest.class);
[3/3] ignite git commit: zk
Posted by sb...@apache.org.
zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e39ebde1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e39ebde1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e39ebde1
Branch: refs/heads/ignite-zk
Commit: e39ebde17a274c1413c1fa9958d7db0008db04ba
Parents: 58f187d
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 20 13:28:00 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 20 13:28:00 2017 +0300
----------------------------------------------------------------------
.../discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e39ebde1/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 829d3a8..0f4b0e7 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -2664,7 +2664,10 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws Exception {
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- return spis.containsKey(nodeName);
+ ZookeeperDiscoverySpi spi = spis.get(nodeName);
+
+ return spi != null && GridTestUtils.getFieldValue(spi, "impl") != null;
+
}
}, 5000);
[2/3] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-zk
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/58f187d3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/58f187d3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/58f187d3
Branch: refs/heads/ignite-zk
Commit: 58f187d3a7fcaa7810d4253d9d121ec0751803cf
Parents: 8735efd 2267113
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 20 13:22:37 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 20 13:22:37 2017 +0300
----------------------------------------------------------------------
.../service/GridServiceProcessor.java | 53 ++++--
.../processors/task/GridTaskProcessor.java | 2 +
.../internal/GridJobServicesAddNodeTest.java | 172 +++++++++++++++++++
.../testsuites/IgniteComputeGridTestSuite.java | 2 +
4 files changed, 216 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/58f187d3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------