You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/03 16:42:45 UTC

[1/5] incubator-ignite git commit: # ignite-901 WIP

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 4b90d91ab -> 8cafff66f


# ignite-901 WIP


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/33a0816c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/33a0816c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/33a0816c

Branch: refs/heads/ignite-901
Commit: 33a0816c9e7a98148285e69ec72af1dc2e078eb1
Parents: c4abdf4
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 3 15:02:02 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 3 15:02:02 2015 +0300

----------------------------------------------------------------------
 ...IgniteClientReconnectDiscoveryStateTest.java |   9 +-
 .../IgniteClientReconnectServicesTest.java      | 139 +++++++++++++++++++
 .../IgniteClientReconnectTestSuite.java         |   1 +
 3 files changed, 147 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33a0816c/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
index f1019fb..77927a7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
@@ -36,13 +36,18 @@ public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconne
         return 3;
     }
 
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
     /**
      * @throws Exception If failed.
      */
     public void testReconnect() throws Exception {
-        clientMode = true;
+        final Ignite client = ignite(serverCount());
 
-        final Ignite client = startGrid(serverCount());
+        assertTrue(client.cluster().localNode().isClient());
 
         long topVer = 4;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33a0816c/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
new file mode 100644
index 0000000..e79f2aa
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.processors.service.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.services.*;
+
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * TODO IGNITE-901: fail after disconnect, disconnect when operation in progress, service deployed on client.
+ */
+public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnect() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        IgniteServices services = client.services();
+
+        services.deployClusterSingleton("s1", new TestServiceImpl());
+
+        TestService srvc = services.serviceProxy("s1", TestService.class, false);
+
+        assertNotNull(srvc);
+
+        assertEquals((Object) 2L, srvc.test());
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED)
+                    info("Disconnected: " + evt);
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    reconnectLatch.countDown();
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        srvSpi.failNode(client.cluster().localNode().id(), null);
+
+        assertTrue(reconnectLatch.await(5000, MILLISECONDS));
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        DummyService.exeLatch("s2", latch);
+
+        services.deployClusterSingleton("s2", new DummyService());
+
+        assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
+
+        assertEquals((Object) 4L, srvc.test());
+    }
+
+    /**
+     *
+     */
+    public static interface TestService {
+        /**
+         * @return Topology version.
+         */
+        public Long test();
+    }
+
+    /**
+     *
+     */
+    public static class TestServiceImpl implements Service, TestService {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public void cancel(ServiceContext ctx) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void init(ServiceContext ctx) throws Exception {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute(ServiceContext ctx) throws Exception {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Long test() {
+            assertFalse(ignite.cluster().localNode().isClient());
+
+            return ignite.cluster().topologyVersion();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33a0816c/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
index affbb54..2cfaa56 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -40,6 +40,7 @@ public class IgniteClientReconnectTestSuite extends TestSuite {
         suite.addTestSuite(IgniteClientReconnectComputeTest.class);
         suite.addTestSuite(IgniteClientReconnectAtomicsTest.class);
         suite.addTestSuite(IgniteClientReconnectCollectionsTest.class);
+        suite.addTestSuite(IgniteClientReconnectServicesTest.class);
 
         return suite;
     }


[2/5] incubator-ignite git commit: Merge remote-tracking branch 'origin/ignite-901' into ignite-901

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-901' into ignite-901


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c7573169
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c7573169
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c7573169

Branch: refs/heads/ignite-901
Commit: c757316944e2b9c46c51d3736b2175558dbe600e
Parents: 33a0816 bf97d04
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 3 15:27:38 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 3 15:27:38 2015 +0300

----------------------------------------------------------------------
 .../IgniteClientReconnectAbstractTest.java      |  13 +-
 .../IgniteClientReconnectApiBlockTest.java      | 268 +++++++++++++++++++
 2 files changed, 272 insertions(+), 9 deletions(-)
----------------------------------------------------------------------



[5/5] incubator-ignite git commit: Merge remote-tracking branch 'origin/ignite-901' into ignite-901

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-901' into ignite-901


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8cafff66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8cafff66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8cafff66

Branch: refs/heads/ignite-901
Commit: 8cafff66f8138980dc42f92ff57aad2dae4f0721
Parents: 59b967a 4b90d91
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 3 17:42:40 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 3 17:42:40 2015 +0300

----------------------------------------------------------------------
 .../IgniteClientReconnectAtomicsTest.java       |   6 +-
 .../IgniteClientReconnectCollectionsTest.java   | 350 +++++++++++++++++--
 2 files changed, 319 insertions(+), 37 deletions(-)
----------------------------------------------------------------------



[4/5] incubator-ignite git commit: # ignite-901 WIP

Posted by sb...@apache.org.
# ignite-901 WIP


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/59b967aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/59b967aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/59b967aa

Branch: refs/heads/ignite-901
Commit: 59b967aa973fc365cb7514e112194c7689982adc
Parents: 363e161
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 3 17:42:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 3 17:42:18 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/GridComponent.java   |   3 +-
 .../ignite/internal/GridPluginComponent.java    |   2 +-
 .../apache/ignite/internal/IgniteKernal.java    |   6 +-
 .../internal/managers/GridManagerAdapter.java   |   2 +-
 .../deployment/GridDeploymentManager.java       |   2 +-
 .../discovery/GridDiscoveryManager.java         |   5 +-
 .../processors/GridProcessorAdapter.java        |   2 +-
 .../cache/DynamicCacheChangeBatch.java          |  17 ++
 .../processors/cache/GridCacheProcessor.java    | 186 +++++++++++++------
 .../datastructures/DataStructuresProcessor.java |   7 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  11 +-
 .../IgniteClientReconnectAbstractTest.java      |   8 +-
 .../IgniteClientReconnectCacheTest.java         | 144 +++++++++++++-
 13 files changed, 317 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 705576e..fb0a157 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -126,7 +126,8 @@ public interface GridComponent {
     public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException;
 
     /**
+     * @param clusterRestarted Cluster restarted flag.
      * @throws IgniteCheckedException If failed.
      */
-    public void onReconnected() throws IgniteCheckedException;
+    public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
index 9639df0..55a84c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
@@ -70,7 +70,7 @@ public class GridPluginComponent implements GridComponent {
     }
 
     /** {@inheritDoc} */
-    @Override public void onReconnected() {
+    @Override public void onReconnected(boolean clusterRestarted) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index f97a1c4..5876288 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2848,14 +2848,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     }
 
     /**
-     *
+     * @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected.
      */
-    public void reconnected() {
+    public void reconnected(boolean clusterRestarted) {
         Throwable err = null;
 
         try {
             for (GridComponent comp : ctx.components())
-                comp.onReconnected();
+                comp.onReconnected(clusterRestarted);
 
             ctx.gateway().onReconnected();
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index b0a46eb..1cbe68d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -172,7 +172,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
     }
 
     /** {@inheritDoc} */
-    @Override public void onReconnected() throws IgniteCheckedException {
+    @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
index 9eda2eb..9e418a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java
@@ -119,7 +119,7 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> {
     }
 
     /** {@inheritDoc} */
-    @Override public void onReconnected() throws IgniteCheckedException {
+    @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
         storesOnKernalStart();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index a8af43b..f95788a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -296,6 +296,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /** {@inheritDoc} */
     @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
         locJoinEvt = new GridFutureAdapter<>();
+
+        registeredCaches.clear();
     }
 
     /** {@inheritDoc} */
@@ -1906,7 +1908,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 case EVT_CLIENT_NODE_RECONNECTED: {
                     assert localNode().isClient() : evt;
 
-                    ((IgniteKernal)ctx.grid()).reconnected();
+                    // TODO IGNITE-901.
+                    ((IgniteKernal)ctx.grid()).reconnected(false);
 
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
index 1a6791b..8baf95c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
@@ -68,7 +68,7 @@ public abstract class GridProcessorAdapter implements GridProcessor {
     }
 
     /** {@inheritDoc} */
-    @Override public void onReconnected() throws IgniteCheckedException {
+    @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index dfc39c1..1e8184d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -43,6 +43,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     /** Custom message ID. */
     private IgniteUuid id = IgniteUuid.randomUuid();
 
+    /** */
+    private boolean clientReconnect;
+
     /**
      * @param reqs Requests.
      */
@@ -93,6 +96,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
         return false;
     }
 
+    /**
+     * @param clientReconnect {@code True} if this is discovery data sent on client reconnect.
+     */
+    public void clientReconnect(boolean clientReconnect) {
+        this.clientReconnect = clientReconnect;
+    }
+
+    /**
+     * @return {@code True} if this is discovery data sent on client reconnect.
+     */
+    public boolean clientReconnect() {
+        return clientReconnect;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DynamicCacheChangeBatch.class, this);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 8d3f8da..4fc02d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -127,6 +127,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Count down latch for caches. */
     private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
 
+    /** */
+    private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
+
     /**
      * @param ctx Kernal context.
      */
@@ -914,6 +917,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+        cachesOnDisconnect = new HashMap<>(registeredCaches);
+
+        registeredCaches.clear();
+
+        registeredTemplates.clear();
+
         for (GridCacheAdapter cache : caches.values())
             cache.context().gate().onDisconnected(reconnectFut);
 
@@ -922,15 +931,28 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         for (GridCacheAdapter cache : caches.values())
             cache.disconnected();
 
-        registeredCaches.clear();
-
         sharedCtx.onDisconnected();
     }
 
     /** {@inheritDoc} */
-    @Override public void onReconnected() throws IgniteCheckedException {
-        for (GridCacheAdapter cache : caches.values())
-            cache.context().gate().reconnected(false);
+    @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+        cachesOnDisconnect = null;
+
+        for (GridCacheAdapter cache : caches.values()) {
+            boolean stopped = !registeredCaches.containsKey(maskNull(cache.name()));
+
+            cache.context().gate().reconnected(stopped);
+
+            if (stopped) {
+                sharedCtx.removeCacheContext(cache.ctx);
+
+                caches.remove(maskNull(cache.name()));
+                jCacheProxies.remove(maskNull(cache.name()));
+
+                onKernalStop(cache, true);
+                stopCache(cache, true);
+            }
+        }
 
         marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() {
             @Override public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException {
@@ -1690,11 +1712,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
-        // Collect dynamically started caches to a single object.
         Collection<DynamicCacheChangeRequest> reqs =
+            // Collect dynamically started caches to a single object.
             new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
 
-        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+        boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null;
+
+        Map<String, DynamicCacheDescriptor> descs = reconnect ? cachesOnDisconnect : registeredCaches;
+
+        for (DynamicCacheDescriptor desc : descs.values()) {
             if (!desc.cancelled()) {
                 DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
 
@@ -1722,7 +1748,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         DynamicCacheChangeBatch req = new DynamicCacheChangeBatch(reqs);
 
-        req.clientNodes(ctx.discovery().clientNodesMap());
+        Map<String, Map<UUID, Boolean>> clientNodesMap = ctx.discovery().clientNodesMap();
+
+        if (reconnect) {
+            clientNodesMap = U.newHashMap(caches.size());
+
+            for (GridCacheAdapter<?, ?> cache : caches.values()) {
+                Boolean nearEnabled = cache.isNear();
+
+                Map<UUID, Boolean> map = U.newHashMap(1);
+
+                map.put(nodeId, nearEnabled);
+
+                clientNodesMap.put(cache.name(), map);
+            }
+        }
+
+        req.clientNodes(clientNodesMap);
+
+        req.clientReconnect(reconnect);
 
         return req;
     }
@@ -1732,38 +1776,86 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (data instanceof DynamicCacheChangeBatch) {
             DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data;
 
-            for (DynamicCacheChangeRequest req : batch.requests()) {
-                if (req.template()) {
-                    CacheConfiguration ccfg = req.startCacheConfiguration();
+            if (batch.clientReconnect()) {
+                for (DynamicCacheChangeRequest req : batch.requests()) {
+                    assert !req.template() : req;
 
-                    assert ccfg != null : req;
+                    String name = req.cacheName();
 
-                    DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName()));
+                    boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name);
 
-                    if (existing == null) {
-                        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
-                            ctx,
-                            ccfg,
-                            req.cacheType(),
-                            true,
-                            req.deploymentId());
+                    if (!sysCache) {
+                        DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
 
-                        registeredTemplates.put(maskNull(req.cacheName()), desc);
-                    }
+                        if (desc != null && !desc.cancelled() && desc.deploymentId().equals(req.deploymentId())) {
+                            Map<UUID, Boolean> nodes = batch.clientNodes().get(name);
 
-                    continue;
+                            assert nodes != null : req;
+                            assert nodes.containsKey(joiningNodeId) : nodes;
+
+                            ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, nodes.get(joiningNodeId));
+                        }
+                    }
+                    else
+                        ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, false);
                 }
+            }
+            else {
+                for (DynamicCacheChangeRequest req : batch.requests()) {
+                    if (req.template()) {
+                        CacheConfiguration ccfg = req.startCacheConfiguration();
+
+                        assert ccfg != null : req;
+
+                        DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName()));
+
+                        if (existing == null) {
+                            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+                                ctx,
+                                ccfg,
+                                req.cacheType(),
+                                true,
+                                req.deploymentId());
+
+                            registeredTemplates.put(maskNull(req.cacheName()), desc);
+                        }
 
-                DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName()));
+                        continue;
+                    }
+
+                    DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName()));
+
+                    if (req.start() && !req.clientStartOnly()) {
+                        CacheConfiguration ccfg = req.startCacheConfiguration();
+
+                        if (existing != null) {
+                            if (existing.locallyConfigured()) {
+                                existing.deploymentId(req.deploymentId());
+
+                                existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
+
+                                ctx.discovery().setCacheFilter(
+                                    req.cacheName(),
+                                    ccfg.getNodeFilter(),
+                                    ccfg.getNearConfiguration() != null,
+                                    ccfg.getCacheMode() == LOCAL);
+                            }
+                        }
+                        else {
+                            assert req.cacheType() != null : req;
 
-                if (req.start() && !req.clientStartOnly()) {
-                    CacheConfiguration ccfg = req.startCacheConfiguration();
+                            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+                                ctx,
+                                ccfg,
+                                req.cacheType(),
+                                false,
+                                req.deploymentId());
 
-                    if (existing != null) {
-                        if (existing.locallyConfigured()) {
-                            existing.deploymentId(req.deploymentId());
+                            // Received statically configured cache.
+                            if (req.initiatingNodeId() == null)
+                                desc.staticallyConfigured(true);
 
-                            existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
+                            registeredCaches.put(maskNull(req.cacheName()), desc);
 
                             ctx.discovery().setCacheFilter(
                                 req.cacheName(),
@@ -1772,37 +1864,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                                 ccfg.getCacheMode() == LOCAL);
                         }
                     }
-                    else {
-                        assert req.cacheType() != null : req;
-
-                        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
-                            ctx,
-                            ccfg,
-                            req.cacheType(),
-                            false,
-                            req.deploymentId());
-
-                        // Received statically configured cache.
-                        if (req.initiatingNodeId() == null)
-                            desc.staticallyConfigured(true);
-
-                        registeredCaches.put(maskNull(req.cacheName()), desc);
-
-                        ctx.discovery().setCacheFilter(
-                            req.cacheName(),
-                            ccfg.getNodeFilter(),
-                            ccfg.getNearConfiguration() != null,
-                            ccfg.getCacheMode() == LOCAL);
-                    }
                 }
-            }
 
-            if (!F.isEmpty(batch.clientNodes())) {
-                for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) {
-                    String cacheName = entry.getKey();
+                if (!F.isEmpty(batch.clientNodes())) {
+                    for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) {
+                        String cacheName = entry.getKey();
 
-                    for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
-                        ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
+                        for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
+                            ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 95c9563..4637bd0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -186,14 +186,17 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onReconnected() throws IgniteCheckedException {
+    @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
         Set<GridCacheInternal> keys = dsMap.keySet();
 
         Map<GridCacheInternal, GridCacheInternal> vals = dsView.getAll(keys);
 
         for (Map.Entry<GridCacheInternal, GridCacheRemovable> e : dsMap.entrySet()) {
-            if (!vals.containsKey(e.getKey()))
+            if (!vals.containsKey(e.getKey())) {
+                dsMap.remove(e.getKey());
+
                 e.getValue().onRemoved();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 578aae8..f3f19bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1084,6 +1084,9 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** */
         private Reconnector reconnector;
 
+        /** */
+        private boolean nodeAdded;
+
         /**
          *
          */
@@ -1286,12 +1289,16 @@ class ClientImpl extends TcpDiscoveryImpl {
             spi.stats.onMessageProcessingFinished(msg);
         }
 
-        private boolean nodeAdded;
-
+        /**
+         * @return {@code True} if client in process of join.
+         */
         private boolean joining() {
             return state == ClientImpl.State.STARTING || state == ClientImpl.State.DISCONNECTED;
         }
 
+        /**
+         * @return {@code True} if disconnected.
+         */
         private boolean disconnected() {
             return state == ClientImpl.State.DISCONNECTED;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index a9ce136..0f8aadd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -165,10 +165,10 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
      *
      * @param client Client.
      * @param srv Server.
-     * @param disconnectedClosure Closure which will be run when client node disconnected.
+     * @param disconnectedC Closure which will be run when client node disconnected.
      * @throws Exception If failed.
      */
-    protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedClosure)
+    protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedC)
         throws Exception {
         final TestTcpDiscoverySpi clientSpi = spi(client);
         final TestTcpDiscoverySpi srvSpi = spi(srv);
@@ -201,8 +201,8 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
 
         assertTrue(disconnectLatch.await(5000, MILLISECONDS));
 
-        if (disconnectedClosure != null)
-            disconnectedClosure.run();
+        if (disconnectedC != null)
+            disconnectedC.run();
 
         log.info("Allow reconnect.");
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59b967aa/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 6a77a18..258eef9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
@@ -100,7 +101,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
     public void testReconnect() throws Exception {
         clientMode = true;
 
-        Ignite client = startGrid(SRV_CNT);
+        IgniteEx client = startGrid(SRV_CNT);
 
         final TestTcpDiscoverySpi clientSpi = spi(client);
 
@@ -110,6 +111,16 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
 
         final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>());
 
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName("nearCache");
+
+        final IgniteCache<Object, Object> nearCache = client.getOrCreateCache(ccfg, new NearCacheConfiguration<>());
+
+        nearCache.put(1, 1);
+
+        assertEquals(1, nearCache.localPeek(1));
+
         cache.put(1, 1);
 
         final CountDownLatch disconnectLatch = new CountDownLatch(1);
@@ -141,7 +152,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
                             catch (CacheException e) {
                                 log.info("Expected exception: " + e);
 
-                                IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause();
+                                IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException) e.getCause();
 
                                 e0.reconnectFuture().get();
                             }
@@ -187,6 +198,10 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
 
         assertTrue(reconnectLatch.await(5000, TimeUnit.MILLISECONDS));
 
+        checkCacheDiscoveryData(srv, client, null, true, true, false);
+
+        checkCacheDiscoveryData(srv, client, "nearCache", true, true, true);
+
         assertEquals(1, cache.get(1));
 
         putFut.get();
@@ -197,6 +212,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
 
         assertEquals(3, cache.get(3));
 
+        assertNull(nearCache.localPeek(1));
+
         this.clientMode = false;
 
         IgniteEx srv2 = startGrid(SRV_CNT + 1);
@@ -206,6 +223,10 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
         cache.put(key, 4);
 
         assertEquals(4, cache.get(key));
+
+        checkCacheDiscoveryData(srv2, client, null, true, true, false);
+
+        checkCacheDiscoveryData(srv2, client, "nearCache", true, true, true);
     }
 
     /**
@@ -231,7 +252,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
         final CountDownLatch reconnectLatch = new CountDownLatch(1);
 
         client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
+            @Override
+            public boolean apply(Event evt) {
                 if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
                     info("Reconnected: " + evt);
 
@@ -439,6 +461,74 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectCacheDestroyed() throws Exception {
+        clientMode = true;
+
+        final IgniteEx client = startGrid(SRV_CNT);
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(client);
+
+        final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srv.destroyCache(null);
+            }
+        });
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return clientCache.get(1);
+            }
+        }, IllegalStateException.class, null);
+
+        checkCacheDiscoveryData(srv, client, null, false, false, false);
+
+        IgniteCache<Object, Object> clientCache0 = client.getOrCreateCache(new CacheConfiguration<>());
+
+        checkCacheDiscoveryData(srv, client, null, true, true, false);
+
+        clientCache0.put(1, 1);
+
+        assertEquals(1, clientCache0.get(1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectCacheDestroyedAndCreated() throws Exception {
+        clientMode = true;
+
+        final Ignite client = startGrid(SRV_CNT);
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(client);
+
+        final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override
+            public void run() {
+                srv.destroyCache(null);
+
+                srv.getOrCreateCache(new CacheConfiguration<>());
+            }
+        });
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                return clientCache.get(1);
+            }
+        }, IllegalStateException.class, null);
+    }
+
+    /**
      * @param client Client.
      * @param ccfg Cache configuration.
      * @param msgToBlock Message to block.
@@ -501,6 +591,54 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
     }
 
     /**
+     * @param srv Server node.
+     * @param client Client node.
+     * @param cacheName Cache name.
+     * @param cacheExists Cache exists flag.
+     * @param clientCache {@code True} if client node has client cache.
+     * @param clientNear {@code True} if client node has near-enabled client cache.
+     */
+    private void checkCacheDiscoveryData(Ignite srv,
+        Ignite client,
+        String cacheName,
+        boolean cacheExists,
+        boolean clientCache,
+        boolean clientNear)
+    {
+        GridDiscoveryManager srvDisco = ((IgniteKernal)srv).context().discovery();
+        GridDiscoveryManager clientDisco = ((IgniteKernal)client).context().discovery();
+
+        ClusterNode srvNode = ((IgniteKernal)srv).localNode();
+        ClusterNode clientNode = ((IgniteKernal)client).localNode();
+
+        assertFalse(srvDisco.cacheAffinityNode(clientNode, cacheName));
+        assertFalse(clientDisco.cacheAffinityNode(clientNode, cacheName));
+
+        assertEquals(cacheExists, srvDisco.cacheAffinityNode(srvNode, cacheName));
+
+        if (clientNear)
+            assertTrue(srvDisco.cacheNearNode(clientNode, cacheName));
+        else
+            assertEquals(clientCache, srvDisco.cacheClientNode(clientNode, cacheName));
+
+        assertEquals(cacheExists, clientDisco.cacheAffinityNode(srvNode, cacheName));
+
+        if (clientNear)
+            assertTrue(clientDisco.cacheNearNode(clientNode, cacheName));
+        else
+            assertEquals(clientCache, clientDisco.cacheClientNode(clientNode, cacheName));
+
+        if (cacheExists) {
+            assertTrue(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode));
+            assertTrue(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode));
+        }
+        else {
+            assertTrue(client.cluster().forClientNodes(cacheName).nodes().isEmpty());
+            assertTrue(srv.cluster().forClientNodes(cacheName).nodes().isEmpty());
+        }
+    }
+
+    /**
      *
      */
     private static class TestCommunicationSpi extends TcpCommunicationSpi {


[3/5] incubator-ignite git commit: Merge remote-tracking branch 'origin/ignite-901' into ignite-901

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-901' into ignite-901


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/363e1615
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/363e1615
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/363e1615

Branch: refs/heads/ignite-901
Commit: 363e1615cc87ff420fbe2b47559f57e2b8d93ecc
Parents: c757316 b14b73e
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 3 15:49:00 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 3 15:49:00 2015 +0300

----------------------------------------------------------------------
 .../IgniteClientReconnectAbstractTest.java      |  57 ++
 .../IgniteClientReconnectAtomicsTest.java       | 662 ++-----------------
 2 files changed, 130 insertions(+), 589 deletions(-)
----------------------------------------------------------------------