You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/16 06:39:29 UTC
ignite git commit: ignite-2146 Call service node filter outside of
system cache transaction.
Repository: ignite
Updated Branches:
refs/heads/ignite-1.5 df08d3de9 -> ab8ba9746
ignite-2146 Call service node filter outside of system cache transaction.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ab8ba974
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ab8ba974
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ab8ba974
Branch: refs/heads/ignite-1.5
Commit: ab8ba9746f7f8f3be88eba73cc9abfb84cd86ecc
Parents: df08d3d
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 16 08:39:20 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 16 08:39:20 2015 +0300
----------------------------------------------------------------------
.../service/GridServiceProcessor.java | 30 +++-
.../ServicePredicateAccessCacheTest.java | 155 +++++++++++++++++++
.../testsuites/IgniteKernalSelfTestSuite.java | 2 +
3 files changed, 180 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab8ba974/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index ed54f00..6b05edd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -704,13 +704,33 @@ public class GridServiceProcessor extends GridProcessorAdapter {
Object affKey = cfg.getAffinityKey();
while (true) {
+ GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer);
+
+ Collection<ClusterNode> nodes;
+
+ // Call node filter outside of transaction.
+ if (affKey == null) {
+ nodes = ctx.discovery().nodes(topVer);
+
+ if (assigns.nodeFilter() != null) {
+ Collection<ClusterNode> nodes0 = new ArrayList<>();
+
+ for (ClusterNode node : nodes) {
+ if (assigns.nodeFilter().apply(node))
+ nodes0.add(node);
+ }
+
+ nodes = nodes0;
+ }
+ }
+ else
+ nodes = null;
+
try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(cfg.getName());
GridServiceAssignments oldAssigns = (GridServiceAssignments)cache.get(key);
- GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer);
-
Map<UUID, Integer> cnts = new HashMap<>();
if (affKey != null) {
@@ -723,10 +743,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
}
else {
- Collection<ClusterNode> nodes = assigns.nodeFilter() == null ?
- ctx.discovery().nodes(topVer) :
- F.view(ctx.discovery().nodes(topVer), assigns.nodeFilter());
-
if (!nodes.isEmpty()) {
int size = nodes.size();
@@ -805,7 +821,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
assigns.assigns(cnts);
- cache.getAndPut(key, assigns);
+ cache.put(key, assigns);
tx.commit();
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab8ba974/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServicePredicateAccessCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServicePredicateAccessCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServicePredicateAccessCacheTest.java
new file mode 100644
index 0000000..c91d9f1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServicePredicateAccessCacheTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class ServicePredicateAccessCacheTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static CountDownLatch latch;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setMarshaller(new BinaryMarshaller());
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 60_000;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPredicateAccessCache() throws Exception {
+ final Ignite ignite0 = startGrid(0);
+
+ ignite0.getOrCreateCache(new CacheConfiguration<String, Object>()
+ .setName("testCache")
+ .setAtomicityMode(ATOMIC)
+ .setCacheMode(REPLICATED)
+ .setWriteSynchronizationMode(FULL_SYNC)
+ .setAtomicWriteOrderMode(PRIMARY));
+
+ latch = new CountDownLatch(1);
+
+ final ClusterGroup grp = ignite0.cluster().forPredicate(new IgnitePredicate<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ System.out.println("Predicated started [thread=" + Thread.currentThread().getName() + ']');
+
+ latch.countDown();
+
+ try {
+ Thread.sleep(3000);
+ }
+ catch (InterruptedException ignore) {
+ // No-op.
+ }
+
+ System.out.println("Call contains key [thread=" + Thread.currentThread().getName() + ']');
+
+ boolean ret = ignite0.cache("testCache").containsKey(node.id().toString());
+
+ System.out.println("After contains key [ret=" + ret +
+ ", thread=" + Thread.currentThread().getName() + ']');
+
+ return ret;
+ }
+ });
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ info("Start deploy service.");
+
+ ignite0.services(grp).deployNodeSingleton("testService", new TestService());
+
+ info("Service deployed.");
+
+ return null;
+ }
+ }, "deploy-thread");
+
+ latch.await();
+
+ startGrid(1);
+
+ fut.get();
+ }
+
+ /**
+ *
+ */
+ public static class TestService implements Service {
+ /** {@inheritDoc} */
+ public void execute(ServiceContext ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ public void init(ServiceContext ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ public void cancel(ServiceContext ctx) {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ab8ba974/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index a41859e..deb49b7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.service.GridServiceProcessorProxySe
import org.apache.ignite.internal.processors.service.GridServiceProcessorSingleNodeSelfTest;
import org.apache.ignite.internal.processors.service.GridServiceProcessorStopSelfTest;
import org.apache.ignite.internal.processors.service.GridServiceReassignmentSelfTest;
+import org.apache.ignite.internal.processors.service.ServicePredicateAccessCacheTest;
import org.apache.ignite.internal.util.GridStartupWithSpecifiedWorkDirectorySelfTest;
import org.apache.ignite.internal.util.GridStartupWithUndefinedIgniteHomeSelfTest;
import org.apache.ignite.spi.communication.GridCacheMessageSelfTest;
@@ -117,6 +118,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite {
suite.addTestSuite(GridServiceReassignmentSelfTest.class);
suite.addTestSuite(GridServiceClientNodeTest.class);
suite.addTestSuite(GridServiceProcessorStopSelfTest.class);
+ suite.addTestSuite(ServicePredicateAccessCacheTest.class);
return suite;
}