You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/03/22 13:43:05 UTC

[3/3] ignite git commit: Merge branch 'ignite-1.7.9-p1' into apache-master

Merge branch 'ignite-1.7.9-p1' into apache-master

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
#	modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
#	modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16153bb8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16153bb8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16153bb8

Branch: refs/heads/apache-master
Commit: 16153bb8bae9153ae979c13a93e60daab240d5ee
Parents: 117e18e d124004
Author: dkarachentsev <dk...@gridgain.com>
Authored: Wed Mar 22 16:36:27 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Wed Mar 22 16:36:27 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridKernalGatewayImpl.java  |   8 +-
 .../apache/ignite/internal/IgniteKernal.java    | 120 +++++-
 .../internal/IgniteNeedReconnectException.java  |  40 ++
 .../discovery/GridDiscoveryManager.java         |  24 ++
 .../GridCachePartitionExchangeManager.java      |  25 +-
 .../dht/GridDhtAssignmentFetchFuture.java       |  13 +-
 .../GridDhtPartitionsExchangeFuture.java        |  46 ++-
 .../service/GridServiceProcessor.java           | 101 ++---
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 203 ++++++++--
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   5 +
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   8 +
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   9 +
 .../IgniteClientReconnectCacheTest.java         |   7 +-
 .../ignite/internal/IgniteClientRejoinTest.java | 378 +++++++++++++++++++
 .../GridServiceProcessorStopSelfTest.java       |  75 ++++
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  48 ++-
 .../IgniteClientReconnectTestSuite.java         |   2 +
 17 files changed, 1013 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 2a6706e,25f7884..0ea6ea4
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@@ -3620,25 -3576,45 +3676,65 @@@ public class IgniteKernal implements Ig
      }
  
      /**
 +     * @param node Node.
 +     * @param payload Message payload.
 +     * @param procFromNioThread If {@code true} message is processed from NIO thread.
 +     * @return Response future.
 +     */
 +    public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) {
 +        return ctx.io().sendIoTest(node, payload, procFromNioThread);
 +    }
 +
 +    /**
 +     * @param nodes Nodes.
 +     * @param payload Message payload.
 +     * @param procFromNioThread If {@code true} message is processed from NIO thread.
 +     * @return Response future.
 +     */
 +    public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) {
 +        return ctx.io().sendIoTest(nodes, payload, procFromNioThread);
 +    }
 +
++    /**
+      *
+      */
+     private class ReconnectState {
+         /** */
+         private final GridFutureAdapter firstReconnectFut = new GridFutureAdapter();
+ 
+         /** */
+         private GridCompoundFuture<?, Object> curReconnectFut;
+ 
+         /** */
+         private GridFutureAdapter<?> reconnectDone;
+ 
+         /**
+          * @throws IgniteCheckedException If failed.
+          */
+         void waitFirstReconnect() throws IgniteCheckedException {
+             firstReconnectFut.get();
+         }
+ 
+         /**
+          *
+          */
+         void waitPreviousReconnect() {
+             if (curReconnectFut != null && !curReconnectFut.isDone()) {
+                 assert reconnectDone != null;
+ 
+                 curReconnectFut.onDone(STOP_RECONNECT);
+ 
+                 try {
+                     reconnectDone.get();
+                 }
+                 catch (IgniteCheckedException ignote) {
+                     // No-op.
+                 }
+             }
+ 
+         }
+     }
+ 
      /** {@inheritDoc} */
      @Override public String toString() {
          return S.toString(IgniteKernal.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index d637de4,2ec1070..b2c4ced
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@@ -108,8 -112,8 +108,9 @@@ import org.apache.ignite.spi.discovery.
  import org.apache.ignite.spi.discovery.DiscoverySpiListener;
  import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
  import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
  import org.apache.ignite.thread.IgniteThread;
 +import org.jetbrains.annotations.NotNull;
  import org.jetbrains.annotations.Nullable;
  import org.jsr166.ConcurrentHashMap8;
  
@@@ -1903,114 -1892,29 +1904,137 @@@ public class GridDiscoveryManager exten
      }
  
      /**
+      * @return {@code True} if local node client and discovery SPI supports reconnect.
+      */
+     public boolean reconnectSupported() {
+         DiscoverySpi spi = getSpi();
+ 
+         return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) &&
+             !(((TcpDiscoverySpi) spi).isClientReconnectDisabled());
+     }
+ 
+     /**
+      * Leave cluster and try to join again.
+      *
+      * @throws IgniteSpiException If failed.
+      */
+     public void reconnect() {
+         assert reconnectSupported();
+ 
+         DiscoverySpi discoverySpi = getSpi();
+ 
+         ((TcpDiscoverySpi)discoverySpi).reconnect();
+     }
+ 
+     /**
 +     * @param loc Local node.
 +     * @param topSnapshot Topology snapshot.
 +     * @return Newly created discovery cache.
 +     */
 +    @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) {
 +        HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
 +        HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size());
 +
 +        ArrayList<ClusterNode> daemonNodes = new ArrayList<>(topSnapshot.size());
 +        ArrayList<ClusterNode> srvNodes = new ArrayList<>(topSnapshot.size());
 +        ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size());
 +        ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size());
 +
 +        for (ClusterNode node : topSnapshot) {
 +            if (alive(node))
 +                alives.add(node.id());
 +
 +            if (node.isDaemon())
 +                daemonNodes.add(node);
 +            else {
 +                allNodes.add(node);
 +
 +                if (!node.isLocal())
 +                    rmtNodes.add(node);
 +
 +                if (!CU.clientNode(node))
 +                    srvNodes.add(node);
 +            }
 +
 +            nodeMap.put(node.id(), node);
 +        }
 +
 +        assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" +
 +            " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
 +
 +        Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
 +        Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size());
 +
 +        Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
 +        Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
 +        Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
 +
 +        Set<Integer> nearEnabledCaches = new HashSet<>();
 +
 +        for (ClusterNode node : allNodes) {
 +            assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
 +            assert !node.isDaemon();
 +
 +            for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
 +                String cacheName = entry.getKey();
 +                CachePredicate filter = entry.getValue();
 +
 +                if (filter.cacheNode(node)) {
 +                    allNodesWithCaches.add(node);
 +
 +                    if(!CU.clientNode(node))
 +                        srvNodesWithCaches.add(node);
 +
 +                    if (!node.isLocal())
 +                        rmtNodesWithCaches.add(node);
 +
 +                    addToMap(allCacheNodes, cacheName, node);
 +
 +                    if (filter.dataNode(node))
 +                        addToMap(affCacheNodes, cacheName, node);
 +
 +                    if (filter.nearNode(node))
 +                        nearEnabledCaches.add(CU.cacheId(cacheName));
 +                }
 +            }
 +        }
 +
 +        return new DiscoCache(
 +            loc,
 +            Collections.unmodifiableList(rmtNodes),
 +            Collections.unmodifiableList(allNodes),
 +            Collections.unmodifiableList(srvNodes),
 +            Collections.unmodifiableList(daemonNodes),
 +            U.sealList(srvNodesWithCaches),
 +            U.sealList(allNodesWithCaches),
 +            U.sealList(rmtNodesWithCaches),
 +            Collections.unmodifiableMap(allCacheNodes),
 +            Collections.unmodifiableMap(affCacheNodes),
 +            Collections.unmodifiableMap(nodeMap),
 +            Collections.unmodifiableSet(nearEnabledCaches),
 +            alives);
 +    }
 +
 +    /**
 +     * Adds node to map.
 +     *
 +     * @param cacheMap Map to add to.
 +     * @param cacheName Cache name.
 +     * @param rich Node to add
 +     */
 +    private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
 +        List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
 +
 +        if (cacheNodes == null) {
 +            cacheNodes = new ArrayList<>();
 +
 +            cacheMap.put(CU.cacheId(cacheName), cacheNodes);
 +        }
 +
 +        cacheNodes.add(rich);
 +    }
 +
 +    /**
       * Updates topology version if current version is smaller than updated.
       *
       * @param updated Updated topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 50937a8,d4f95e5..5eacc36
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -1677,12 -1677,16 +1704,21 @@@ public class GridDhtPartitionsExchangeF
          }
      }
  
+     /**
+      * @param e Exception.
+      * @return {@code True} if local node should try reconnect in case of error.
+      */
+     public boolean reconnectOnError(Throwable e) {
+         return X.hasCause(e, IOException.class, IgniteClientDisconnectedCheckedException.class) &&
+             cctx.discovery().reconnectSupported();
+     }
+ 
      /** {@inheritDoc} */
 +    @Override public boolean isExchange() {
 +        return true;
 +    }
 +
 +    /** {@inheritDoc} */
      @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
          return exchId.compareTo(fut.exchId);
      }

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 84fb8e3,bd81518..e0a5c7c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -317,8 -315,10 +317,10 @@@ public class GridServiceProcessor exten
  
          busyLock.block();
  
+         U.shutdownNow(GridServiceProcessor.class, depExe, log);
+ 
          if (!ctx.clientNode())
 -            ctx.event().removeLocalEventListener(topLsnr);
 +            ctx.event().removeDiscoveryEventListener(topLsnr);
  
          Collection<ServiceContextImpl> ctxs = new ArrayList<>();
  
@@@ -1401,7 -1399,7 +1401,7 @@@
                  return;
  
              try {
-                 depExe.execute(new BusyRunnable() {
 -                depExe.submit(new DepRunnable() {
++                depExe.execute(new DepRunnable() {
                      @Override public void run0() {
                          onSystemCacheUpdated(deps);
                      }
@@@ -1582,18 -1582,13 +1582,18 @@@
                      }
                      else
                          return;
 +
 +                    topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
                  }
                  else
 -                    topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
 +                    topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
  
-                 depExe.execute(new BusyRunnable() {
 -                depExe.submit(new DepRunnable() {
++                depExe.execute(new DepRunnable() {
                      @Override public void run0() {
 -                        ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
 +                        // In case the cache instance isn't tracked by DiscoveryManager anymore.
 +                        discoCache.updateAlives(ctx.discovery());
 +
 +                        ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
  
                          if (oldest != null && oldest.isLocal()) {
                              final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 04b076d,02ba56a..feb3e48
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@@ -1100,6 -1183,15 +1186,16 @@@ class ClientImpl extends TcpDiscoveryIm
  
                          continue;
                      }
+                     else {
 -                        msg = queue.poll();
++                        if (msg == null)
++                            msg = queue.poll();
+ 
+                         if (msg == null) {
+                             mux.wait();
+ 
+                             continue;
+                         }
+                     }
                  }
  
                  for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs)

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 682d2d7,6cdf465..01aa256
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@@ -698,11 -698,14 +698,14 @@@ public class IgniteClientReconnectCache
          IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() {
              @Override public Boolean call() throws Exception {
                  try {
 -                    Ignition.start(optimize(getConfiguration(getTestGridName(SRV_CNT))));
 +                    Ignition.start(optimize(getConfiguration(getTestIgniteInstanceName(SRV_CNT))));
  
-                     fail();
+                     // Commented due to IGNITE-4473, because
+                     // IgniteClientDisconnectedException won't
+                     // be thrown, but client will reconnect.
+ //                    fail();
  
-                     return false;
+                     return true;
                  }
                  catch (IgniteClientDisconnectedException e) {
                      log.info("Expected start error: " + e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/16153bb8/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------