You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/19 16:31:47 UTC

[1/7] incubator-ignite git commit: IGNITE-1034 - Drop slow clients.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-sprint-6 d699faa34 -> 7a8b572bf


IGNITE-1034 - Drop slow clients.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/44bbecea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/44bbecea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/44bbecea

Branch: refs/heads/ignite-sprint-6
Commit: 44bbeceae657058eee322b13e339966c625802db
Parents: ad0a026
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Jun 18 19:06:47 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Jun 18 19:06:47 2015 -0700

----------------------------------------------------------------------
 .../internal/managers/GridManagerAdapter.java   |   4 +
 .../discovery/GridDiscoveryManager.java         |  15 +++
 .../ignite/internal/util/nio/GridNioServer.java |  51 ++++++++
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |   5 +
 .../org/apache/ignite/spi/IgniteSpiContext.java |   5 +
 .../communication/tcp/TcpCommunicationSpi.java  |  73 +++++++++++
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   5 +
 .../IgniteSlowClientDetectionSelfTest.java      | 124 +++++++++++++++++++
 .../testframework/GridSpiTestContext.java       |   5 +
 9 files changed, 287 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index bea4256..885d52c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -484,6 +484,10 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         return ctx.discovery().tryFailNode(nodeId);
                     }
 
+                    @Override public void failNode(UUID nodeId) {
+                        ctx.discovery().failNode(nodeId);
+                    }
+
                     @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
                         ctx.timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 464110c..717cdf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1502,6 +1502,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @param nodeId Node ID to fail.
+     */
+    public void failNode(UUID nodeId) {
+        if (!busyLock.enterBusy())
+            return;
+
+        try {
+            getSpi().failNode(nodeId);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
      * Updates topology version if current version is smaller than updated.
      *
      * @param updated Updated topology version.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 88fad71..b9d246a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -116,6 +116,10 @@ public class GridNioServer<T> {
     @SuppressWarnings("UnusedDeclaration")
     private boolean skipWrite;
 
+    /** For test purposes only. */
+    @SuppressWarnings("UnusedDeclaration")
+    private boolean skipRead;
+
     /** Local address. */
     private final InetSocketAddress locAddr;
 
@@ -145,6 +149,9 @@ public class GridNioServer<T> {
     @GridToStringExclude
     private IgnitePredicate<Message> skipRecoveryPred;
 
+    /** Optional listener to monitor outbound message queue size. */
+    private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
+
     /** Static initializer ensures single-threaded execution of workaround. */
     static {
         // This is a workaround for JDK bug (NPE in Selector.open()).
@@ -174,6 +181,7 @@ public class GridNioServer<T> {
      * @param metricsLsnr Metrics listener.
      * @param formatter Message formatter.
      * @param skipRecoveryPred Skip recovery predicate.
+     * @param msgQueueLsnr Message queue size listener.
      * @param filters Filters for this server.
      * @throws IgniteCheckedException If failed.
      */
@@ -195,6 +203,7 @@ public class GridNioServer<T> {
         GridNioMetricsListener metricsLsnr,
         MessageFormatter formatter,
         IgnitePredicate<Message> skipRecoveryPred,
+        IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr,
         GridNioFilter... filters
     ) throws IgniteCheckedException {
         A.notNull(addr, "addr");
@@ -215,6 +224,7 @@ public class GridNioServer<T> {
         this.sockRcvBuf = sockRcvBuf;
         this.sockSndBuf = sockSndBuf;
         this.sndQueueLimit = sndQueueLimit;
+        this.msgQueueLsnr = msgQueueLsnr;
 
         filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
 
@@ -385,6 +395,11 @@ public class GridNioServer<T> {
         else if (msgCnt == 1)
             // Change from 0 to 1 means that worker thread should be waken up.
             clientWorkers.get(ses.selectorIndex()).offer(fut);
+
+        IgniteBiInClosure<GridNioSession, Integer> lsnr0 = msgQueueLsnr;
+
+        if (lsnr0 != null)
+            lsnr0.apply(ses, msgCnt);
     }
 
     /**
@@ -634,6 +649,17 @@ public class GridNioServer<T> {
         * @throws IOException If key read failed.
         */
         @Override protected void processRead(SelectionKey key) throws IOException {
+            if (skipRead) {
+                try {
+                    U.sleep(50);
+                }
+                catch (IgniteInterruptedCheckedException ignored) {
+                    U.warn(log, "Sleep has been interrupted.");
+                }
+
+                return;
+            }
+
             ReadableByteChannel sockCh = (ReadableByteChannel)key.channel();
 
             final GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
@@ -775,6 +801,17 @@ public class GridNioServer<T> {
          * @throws IOException If key read failed.
          */
         @Override protected void processRead(SelectionKey key) throws IOException {
+            if (skipRead) {
+                try {
+                    U.sleep(50);
+                }
+                catch (IgniteInterruptedCheckedException ignored) {
+                    U.warn(log, "Sleep has been interrupted.");
+                }
+
+                return;
+            }
+
             ReadableByteChannel sockCh = (ReadableByteChannel)key.channel();
 
             final GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
@@ -2108,6 +2145,9 @@ public class GridNioServer<T> {
         /** Skip recovery predicate. */
         private IgnitePredicate<Message> skipRecoveryPred;
 
+        /** Message queue size listener. */
+        private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
+
         /**
          * Finishes building the instance.
          *
@@ -2133,6 +2173,7 @@ public class GridNioServer<T> {
                 metricsLsnr,
                 formatter,
                 skipRecoveryPred,
+                msgQueueLsnr,
                 filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS
             );
 
@@ -2345,5 +2386,15 @@ public class GridNioServer<T> {
 
             return this;
         }
+
+        /**
+         * @param msgQueueLsnr Message queue size listener.
+         * @return Instance of this builder for chaining.
+         */
+        public Builder<T> messageQueueSizeListener(IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr) {
+            this.msgQueueLsnr = msgQueueLsnr;
+
+            return this;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 476f8a8..18191a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -759,6 +759,11 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         }
 
         /** {@inheritDoc} */
+        @Override public void failNode(UUID nodeId) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
         @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
             assert ignite instanceof IgniteKernal : ignite;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index f83326c..a655a73 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -312,6 +312,11 @@ public interface IgniteSpiContext {
     public boolean tryFailNode(UUID nodeId);
 
     /**
+     * @param nodeId Node ID.
+     */
+    public void failNode(UUID nodeId);
+
+    /**
      * @param c Timeout object.
      */
     public void addTimeoutObject(IgniteSpiTimeoutObject c);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 87d5b65..538e9a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.util.ipc.*;
 import org.apache.ignite.internal.util.ipc.shmem.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
@@ -610,6 +611,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @LoggerResource
     private IgniteLogger log;
 
+    /** Ignite. */
+    @IgniteInstanceResource
+    @GridToStringExclude
+    private Ignite ignite;
+
     /** Local IP address. */
     private String locAddr;
 
@@ -653,6 +659,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Message queue limit. */
     private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT;
 
+    /** Slow client queue limit. */
+    private int slowClientQueueLimit;
+
     /** Min buffered message count. */
     private int minBufferedMsgCnt = Integer.getInteger(IGNITE_MIN_BUFFERED_COMMUNICATION_MSG_CNT, 512);
 
@@ -1145,6 +1154,30 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     * Gets slow client queue limit.
+     * <p/>
+     * When set to a positive number, communication SPI will monitor clients outbound queue sizes and will drop
+     * those clients whose queue exceeded this limit.
+     *
+     * @return Slow client queue limit.
+     */
+    public int getSlowClientQueueLimit() {
+        return slowClientQueueLimit;
+    }
+
+    /**
+     * Sets slow client queue limit.
+     * <p/>
+     * When set to a positive number, communication SPI will monitor clients outbound queue sizes and will drop
+     * those clients whose queue exceeded this limit.
+     *
+     * @param slowClientQueueLimit Slow cilent queue limit.
+     */
+    public void setSlowClientQueueLimit(int slowClientQueueLimit) {
+        this.slowClientQueueLimit = slowClientQueueLimit;
+    }
+
+    /**
      * Sets the minimum number of messages for this SPI, that are buffered
      * prior to sending.
      * <p>
@@ -1315,6 +1348,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             U.quietAndWarn(log, "'TCP_NO_DELAY' for communication is off, which should be used with caution " +
                 "since may produce significant delays with some scenarios.");
 
+        if (slowClientQueueLimit > 0 && msgQueueLimit > 0) {
+            if (slowClientQueueLimit >= msgQueueLimit) {
+                U.quietAndWarn(log, "Slow client queue limit is set to a value greater than message queue limit. " +
+                    "Slow client queue limit will have no effect.");
+            }
+        }
+
         registerMBean(gridName, this, TcpCommunicationSpiMBean.class);
 
         if (shmemSrv != null) {
@@ -1425,6 +1465,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 };
 
+                IgniteBiInClosure<GridNioSession, Integer> queueSizeMonitor =
+                    !ignite.configuration().isClientMode() && slowClientQueueLimit > 0 ?
+                    new CI2<GridNioSession, Integer>() {
+                        @Override public void apply(GridNioSession ses, Integer qSize) {
+                            checkClientQueueSize(ses, qSize);
+                        }
+                    } :
+                    null;
+
                 GridNioServer<Message> srvr =
                     GridNioServer.<Message>builder()
                         .address(locHost)
@@ -1446,6 +1495,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             new GridConnectionBytesVerifyFilter(log))
                         .messageFormatter(msgFormatter)
                         .skipRecoveryPredicate(skipRecoveryPred)
+                        .messageQueueSizeListener(queueSizeMonitor)
                         .build();
 
                 boundTcpPort = port;
@@ -1860,6 +1910,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     * Checks client message queue size and initiates client drop if message queue size exceeds the configured limit.
+     *
+     * @param ses Node communication session.
+     * @param msgQueueSize Message queue size.
+     */
+    private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
+        if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) {
+            UUID id = ses.meta(NODE_ID_META);
+
+            if (id != null) {
+                ClusterNode node = getSpiContext().node(id);
+
+                if (node != null && node.isClient()) {
+                    LT.warn(log, null, "Client node outbound queue size exceed configured slow client queue limit, " +
+                        "will fail the node (consider changing \'slowClientQueueLimit\'): " + node);
+
+                    getSpiContext().failNode(id);
+                }
+            }
+        }
+    }
+
+    /**
      * Establish TCP connection to remote node and returns client.
      *
      * @param node Remote node.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index b743a1a..8eb82ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -3402,6 +3402,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                     failedNodes.remove(node);
 
                     leavingNodes.remove(node);
+
+                    ClientMessageWorker worker = clientMsgWorkers.remove(node.id());
+
+                    if (worker != null)
+                        worker.interrupt();
                 }
 
                 notifyDiscovery(EVT_NODE_FAILED, topVer, node);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
new file mode 100644
index 0000000..09b4215
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.event.*;
+
+/**
+ *
+ */
+public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
+
+    public static final String PARTITIONED = "partitioned";
+
+    /**
+     * @return Node count.
+     */
+    private int nodeCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (getTestGridName(nodeCount() - 1).equals(gridName) || getTestGridName(nodeCount() - 2).equals(gridName))
+            cfg.setClientMode(true);
+
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSlowClientQueueLimit(50);
+        commSpi.setSharedMemoryPort(-1);
+        commSpi.setIdleConnectionTimeout(300_000);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(nodeCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSlowClient() throws Exception {
+        final IgniteEx slowClient = grid(nodeCount() - 1);
+
+        assertTrue(slowClient.cluster().localNode().isClient());
+
+        IgniteCache<Object, Object> cache = slowClient.getOrCreateCache(PARTITIONED);
+
+        IgniteEx client0 = grid(nodeCount() - 2);
+
+        assertTrue(client0.cluster().localNode().isClient());
+
+        IgniteCache<Object, Object> cache0 = client0.getOrCreateCache(PARTITIONED);
+
+        cache.query(new ContinuousQuery<>().setLocalListener(new Listener()));
+
+        for (int i = 0; i < 100; i++)
+            cache0.put(0, i);
+
+        GridIoManager ioMgr = slowClient.context().io();
+
+        TcpCommunicationSpi commSpi = (TcpCommunicationSpi)((Object[])U.field(ioMgr, "spis"))[0];
+
+        GridNioServer nioSrvr = U.field(commSpi, "nioSrvr");
+
+        GridTestUtils.setFieldValue(nioSrvr, "skipRead", true);
+
+        // Initiate messages for client.
+        for (int i = 0; i < 100; i++)
+            cache0.put(0, new byte[10 * 1024]);
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return Ignition.state(slowClient.name()) == IgniteState.STOPPED_ON_SEGMENTATION;
+            }
+        }, getTestTimeout());
+    }
+
+    private static class Listener implements CacheEntryUpdatedListener<Object, Object> {
+        @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+            System.out.println(">>>> Received update: " + iterable);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/44bbecea/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 21f9424..c20ff2e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -502,6 +502,11 @@ public class GridSpiTestContext implements IgniteSpiContext {
     }
 
     /** {@inheritDoc} */
+    @Override public void failNode(UUID nodeId) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
         // No-op.
     }


[4/7] incubator-ignite git commit: Merge remote-tracking branch 'origin/ignite-1034' into ignite-1034

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-1034' into ignite-1034


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ef4abeba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ef4abeba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ef4abeba

Branch: refs/heads/ignite-sprint-6
Commit: ef4abeba881f079cab08af9ade99fe52ecf5d26d
Parents: 462495f 49dc4a5
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 19 12:02:31 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 19 12:02:31 2015 +0300

----------------------------------------------------------------------

----------------------------------------------------------------------



[3/7] incubator-ignite git commit: # ignite-1034 fixed assert in discovery manager, warning on all nodes, improved test

Posted by sb...@apache.org.
# ignite-1034 fixed assert in discovery manager, warning on all nodes, improved test


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/462495f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/462495f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/462495f2

Branch: refs/heads/ignite-sprint-6
Commit: 462495f2977668ae9353adecef554c4f15dd70f3
Parents: 44bbece
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 19 10:12:20 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 19 11:47:14 2015 +0300

----------------------------------------------------------------------
 .../internal/managers/GridManagerAdapter.java   |  8 +--
 .../discovery/GridDiscoveryManager.java         | 19 ++++--
 .../continuous/CacheContinuousQueryHandler.java |  8 +++
 .../ignite/internal/util/nio/GridNioServer.java | 13 ++--
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  4 +-
 .../org/apache/ignite/spi/IgniteSpiContext.java |  6 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 20 +++---
 .../tcp/TcpCommunicationSpiMBean.java           | 11 ++++
 .../ignite/spi/discovery/DiscoverySpi.java      |  3 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 12 +++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 12 +++-
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  3 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  6 +-
 .../messages/TcpDiscoveryNodeFailedMessage.java | 18 +++++
 ...ridFailFastNodeFailureDetectionSelfTest.java | 17 ++++-
 .../IgniteSlowClientDetectionSelfTest.java      | 69 +++++++++++++++++++-
 .../testframework/GridSpiTestContext.java       |  4 +-
 .../ignite/testsuites/IgniteBasicTestSuite.java |  1 +
 18 files changed, 189 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 885d52c..40a5ea5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -480,12 +480,12 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         return ctx.io().messageFactory();
                     }
 
-                    @Override public boolean tryFailNode(UUID nodeId) {
-                        return ctx.discovery().tryFailNode(nodeId);
+                    @Override public boolean tryFailNode(UUID nodeId, @Nullable String warning) {
+                        return ctx.discovery().tryFailNode(nodeId, warning);
                     }
 
-                    @Override public void failNode(UUID nodeId) {
-                        ctx.discovery().failNode(nodeId);
+                    @Override public void failNode(UUID nodeId, @Nullable String warning) {
+                        ctx.discovery().failNode(nodeId, warning);
                     }
 
                     @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 717cdf3..1e4b972 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -386,9 +386,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         verChanged = false;
                 }
                 else {
-                    minorTopVer = 0;
+                    if (type != EVT_NODE_SEGMENTED) {
+                        minorTopVer = 0;
 
-                    verChanged = true;
+                        verChanged = true;
+                    }
+                    else
+                        verChanged = false;
                 }
 
                 AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
@@ -1481,15 +1485,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
     /**
      * @param nodeId Node ID.
+     * @param warning Warning message to be shown on all nodes.
      * @return Whether node is failed.
      */
-    public boolean tryFailNode(UUID nodeId) {
+    public boolean tryFailNode(UUID nodeId, @Nullable String warning) {
         if (!busyLock.enterBusy())
             return false;
 
         try {
             if (!getSpi().pingNode(nodeId)) {
-                getSpi().failNode(nodeId);
+                getSpi().failNode(nodeId, warning);
 
                 return true;
             }
@@ -1503,13 +1508,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
     /**
      * @param nodeId Node ID to fail.
+     * @param warning Warning message to be shown on all nodes.
      */
-    public void failNode(UUID nodeId) {
+    public void failNode(UUID nodeId, @Nullable String warning) {
         if (!busyLock.enterBusy())
             return;
 
         try {
-            getSpi().failNode(nodeId);
+            getSpi().failNode(nodeId, warning);
         }
         finally {
             busyLock.leaveBusy();
@@ -1520,6 +1526,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * Updates topology version if current version is smaller than updated.
      *
      * @param updated Updated topology version.
+     * @param discoCache Discovery cache.
      * @return {@code True} if topology was updated.
      */
     private boolean updateTopologyVersionIfGreater(AffinityTopologyVersion updated, DiscoCache discoCache) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index ad78b92..ff2905f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.managers.deployment.*;
@@ -226,6 +227,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
                             ctx.continuous().addNotification(nodeId, routineId, evt.entry(), topic, sync, true);
                         }
+                        catch (ClusterTopologyCheckedException ex) {
+                            IgniteLogger log = ctx.log(getClass());
+
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to send event notification to node, node left cluster " +
+                                    "[node=" + nodeId + ", err=" + ex + ']');
+                        }
                         catch (IgniteCheckedException ex) {
                             U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
                         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index b9d246a..24e1e08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -505,13 +505,18 @@ public class GridNioServer<T> {
     public GridNioFuture<GridNioSession> createSession(final SocketChannel ch,
         @Nullable Map<Integer, ?> meta) {
         try {
-            ch.configureBlocking(false);
+            if (!closed) {
+                ch.configureBlocking(false);
 
-            NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
+                NioOperationFuture<GridNioSession> req = new NioOperationFuture<>(ch, false, meta);
 
-            offerBalanced(req);
+                offerBalanced(req);
 
-            return req;
+                return req;
+            }
+            else
+                return new GridNioFinishedFuture<>(
+                    new IgniteCheckedException("Failed to create session, server is stopped."));
         }
         catch (IOException e) {
             return new GridNioFinishedFuture<>(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 18191a1..5e557bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -754,12 +754,12 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         }
 
         /** {@inheritDoc} */
-        @Override public boolean tryFailNode(UUID nodeId) {
+        @Override public boolean tryFailNode(UUID nodeId, @Nullable String warning) {
             return false;
         }
 
         /** {@inheritDoc} */
-        @Override public void failNode(UUID nodeId) {
+        @Override public void failNode(UUID nodeId, @Nullable String warning) {
             // No-op.
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index a655a73..611702b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -307,14 +307,16 @@ public interface IgniteSpiContext {
 
     /**
      * @param nodeId Node ID.
+     * @param warning Warning to be shown on all cluster nodes.
      * @return If node was failed.
      */
-    public boolean tryFailNode(UUID nodeId);
+    public boolean tryFailNode(UUID nodeId, @Nullable String warning);
 
     /**
      * @param nodeId Node ID.
+     * @param warning Warning to be shown on all cluster nodes.
      */
-    public void failNode(UUID nodeId);
+    public void failNode(UUID nodeId, @Nullable String warning);
 
     /**
      * @param c Timeout object.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 538e9a8..84c1a57 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1153,15 +1153,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         return msgQueueLimit;
     }
 
-    /**
-     * Gets slow client queue limit.
-     * <p/>
-     * When set to a positive number, communication SPI will monitor clients outbound queue sizes and will drop
-     * those clients whose queue exceeded this limit.
-     *
-     * @return Slow client queue limit.
-     */
-    public int getSlowClientQueueLimit() {
+    /** {@inheritDoc} */
+    @Override public int getSlowClientQueueLimit() {
         return slowClientQueueLimit;
     }
 
@@ -1923,10 +1916,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 ClusterNode node = getSpiContext().node(id);
 
                 if (node != null && node.isClient()) {
-                    LT.warn(log, null, "Client node outbound queue size exceed configured slow client queue limit, " +
-                        "will fail the node (consider changing \'slowClientQueueLimit\'): " + node);
+                    String msg = "Client node outbound queue size exceed configured slow client queue limit, " +
+                        "will fail the node (consider changing \'slowClientQueueLimit\') [clientNode=" + node +
+                        ", slowClientQueueLimit=" + slowClientQueueLimit + ']';
+
+                    LT.warn(log, null, msg);
 
-                    getSpiContext().failNode(id);
+                    getSpiContext().failNode(id, msg);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index fe4f581..1971d99 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -268,4 +268,15 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
      */
     @MXBeanDescription("Maximum number of unacknowledged messages.")
     public int getUnacknowledgedMessagesBufferSize();
+
+    /**
+     * Gets slow client queue limit.
+     * <p/>
+     * When set to a positive number, communication SPI will monitor clients outbound queue sizes and will drop
+     * those clients whose queue exceeded this limit.
+     *
+     * @return Slow client queue limit.
+     */
+    @MXBeanDescription("Slow client queue limit.")
+    public int getSlowClientQueueLimit();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index b952087..11a18b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -151,8 +151,9 @@ public interface DiscoverySpi extends IgniteSpi {
      * Initiates failure of provided node.
      *
      * @param nodeId Node ID.
+     * @param warning Warning to be shown on all cluster nodes.
      */
-    public void failNode(UUID nodeId);
+    public void failNode(UUID nodeId, @Nullable String warning);
 
     /**
      * Whether or not discovery is started in client mode.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index fef6f4f..e255e08 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -335,13 +335,15 @@ class ClientImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
-    @Override public void failNode(UUID nodeId) {
+    @Override public void failNode(UUID nodeId, @Nullable String warning) {
         ClusterNode node = rmtNodes.get(nodeId);
 
         if (node != null) {
             TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
                 node.id(), node.order());
 
+            msg.warning(warning);
+
             msgWorker.addMessage(msg);
         }
     }
@@ -1432,6 +1434,14 @@ class ClientImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
+                if (msg.warning() != null) {
+                    ClusterNode creatorNode = rmtNodes.get(msg.creatorNodeId());
+
+                    U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
+                        "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : msg.creatorNodeId()) +
+                        ", msg=" + msg.warning() + ']');
+                }
+
                 notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top);
 
                 spi.stats.onNodeFailed();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8eb82ac..2458f85 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -584,13 +584,15 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
-    @Override public void failNode(UUID nodeId) {
+    @Override public void failNode(UUID nodeId, @Nullable String warning) {
         ClusterNode node = ring.node(nodeId);
 
         if (node != null) {
             TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
                 node.id(), node.order());
 
+            msg.warning(warning);
+
             msgWorker.addMessage(msg);
         }
     }
@@ -3409,6 +3411,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                         worker.interrupt();
                 }
 
+                if (msg.warning() != null && !msg.creatorNodeId().equals(getLocalNodeId())) {
+                    ClusterNode creatorNode = ring.node(msg.creatorNodeId());
+
+                    U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
+                        "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : msg.creatorNodeId()) +
+                        ", msg=" + msg.warning() + ']');
+                }
+
                 notifyDiscovery(EVT_NODE_FAILED, topVer, node);
 
                 spi.stats.onNodeFailed();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 94097c9..ace917f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -175,8 +175,9 @@ abstract class TcpDiscoveryImpl {
 
     /**
      * @param nodeId Node id.
+     * @param warning Warning message to be shown on all nodes.
      */
-    public abstract void failNode(UUID nodeId);
+    public abstract void failNode(UUID nodeId, @Nullable String warning);
 
     /**
      * @param gridName Grid name.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index c36ac76..1d1916a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -373,8 +373,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     }
 
     /** {@inheritDoc} */
-    @Override public void failNode(UUID nodeId) {
-        impl.failNode(nodeId);
+    @Override public void failNode(UUID nodeId, @Nullable String warning) {
+        impl.failNode(nodeId, warning);
     }
 
     /** {@inheritDoc} */
@@ -385,7 +385,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** {@inheritDoc} */
     @Override public boolean isClientMode() {
         if (impl == null)
-            throw new IllegalStateException("TcpDiscoverySpi has not started");
+            throw new IllegalStateException("TcpDiscoverySpi has not started.");
 
         return impl instanceof ClientImpl;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
index 8cb8414..93ecdaa 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp.messages;
 
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
 
 import java.util.*;
 
@@ -37,6 +38,9 @@ public class TcpDiscoveryNodeFailedMessage extends TcpDiscoveryAbstractMessage {
     /** Internal order of the failed node. */
     private final long order;
 
+    /** */
+    private String warning;
+
     /**
      * Constructor.
      *
@@ -55,6 +59,20 @@ public class TcpDiscoveryNodeFailedMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * @param warning Warning message to be shown on all nodes.
+     */
+    public void warning(String warning) {
+        this.warning = warning;
+    }
+
+    /**
+     * @return Warning message to be shown on all nodes.
+     */
+    @Nullable public String warning() {
+        return warning;
+    }
+
+    /**
      * Gets ID of the failed node.
      *
      * @return ID of the failed node.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
index 992d7bf..238115d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
@@ -50,7 +50,12 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract
         TcpDiscoverySpi disco = new TcpDiscoverySpi();
 
         disco.setIpFinder(IP_FINDER);
-        disco.setHeartbeatFrequency(10000);
+        disco.setHeartbeatFrequency(10_000);
+
+        // Set parameters for fast ping failure.
+        disco.setSocketTimeout(100);
+        disco.setNetworkTimeout(100);
+        disco.setReconnectCount(2);
 
         cfg.setDiscoverySpi(disco);
 
@@ -66,8 +71,6 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract
      * @throws Exception If failed.
      */
     public void testFailFast() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-933");
-
         startGridsMultiThreaded(5);
 
         final CountDownLatch failLatch = new CountDownLatch(4);
@@ -87,6 +90,8 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract
         Ignite ignite1 = ignite(0);
         Ignite ignite2 = ignite(1);
 
+        final CountDownLatch evtLatch = new CountDownLatch(1);
+
         ignite1.message().localListen(null, new MessagingListenActor<Object>() {
             @Override protected void receive(UUID nodeId, Object rcvMsg) throws Throwable {
                 respond(rcvMsg);
@@ -95,12 +100,18 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract
 
         ignite2.message().localListen(null, new MessagingListenActor<Object>() {
             @Override protected void receive(UUID nodeId, Object rcvMsg) throws Throwable {
+                evtLatch.countDown();
+
                 respond(rcvMsg);
             }
         });
 
         ignite1.message(ignite1.cluster().forRemotes()).send(null, "Message");
 
+        evtLatch.await(); // Wait when connection is established.
+
+        log.info("Fail node: " + ignite1.cluster().localNode());
+
         failNode(ignite1);
 
         assert failLatch.await(1000, MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
index 09b4215..27c2a61 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -19,24 +19,37 @@ package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.nio.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 
 import javax.cache.event.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
 
 /**
  *
  */
 public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
-
+    /** */
     public static final String PARTITIONED = "partitioned";
 
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
     /**
      * @return Node count.
      */
@@ -48,6 +61,8 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
         if (getTestGridName(nodeCount() - 1).equals(gridName) || getTestGridName(nodeCount() - 2).equals(gridName))
             cfg.setClientMode(true);
 
@@ -66,7 +81,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        startGridsMultiThreaded(nodeCount());
+        startGrids(nodeCount());
     }
 
     /** {@inheritDoc} */
@@ -82,6 +97,45 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
     public void testSlowClient() throws Exception {
         final IgniteEx slowClient = grid(nodeCount() - 1);
 
+        final ClusterNode slowClientNode = slowClient.localNode();
+
+        final CountDownLatch evtSegmentedLatch = new CountDownLatch(1);
+
+        slowClient.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                assertEquals("Unexpected event: " + evt, evt.type(), EventType.EVT_NODE_SEGMENTED);
+
+                DiscoveryEvent evt0 = (DiscoveryEvent)evt;
+
+                assertEquals(slowClientNode, evt0.eventNode());
+                assertEquals(5L, evt0.topologyVersion());
+
+                evtSegmentedLatch.countDown();
+
+                return false;
+            }
+        }, EventType.EVT_NODE_SEGMENTED);
+
+        final CountDownLatch evtFailedLatch = new CountDownLatch(nodeCount() - 1);
+
+        for (int i = 0; i < nodeCount() - 1; i++) {
+            grid(i).events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    assertEquals("Unexpected event: " + evt, evt.type(), EventType.EVT_NODE_FAILED);
+
+                    DiscoveryEvent evt0 = (DiscoveryEvent) evt;
+
+                    assertEquals(slowClientNode, evt0.eventNode());
+                    assertEquals(6L, evt0.topologyVersion());
+                    assertEquals(4, evt0.topologyNodes().size());
+
+                    evtFailedLatch.countDown();
+
+                    return false;
+                }
+            }, EventType.EVT_NODE_FAILED);
+        }
+
         assertTrue(slowClient.cluster().localNode().isClient());
 
         IgniteCache<Object, Object> cache = slowClient.getOrCreateCache(PARTITIONED);
@@ -109,14 +163,23 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
         for (int i = 0; i < 100; i++)
             cache0.put(0, new byte[10 * 1024]);
 
-        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+        boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 return Ignition.state(slowClient.name()) == IgniteState.STOPPED_ON_SEGMENTATION;
             }
         }, getTestTimeout());
+
+        assertTrue(wait);
+
+        assertTrue("Failed to wait for client failed event", evtFailedLatch.await(5000, MILLISECONDS));
+        assertTrue("Failed to wait for client segmented event", evtSegmentedLatch.await(5000, MILLISECONDS));
     }
 
+    /**
+     *
+     */
     private static class Listener implements CacheEntryUpdatedListener<Object, Object> {
+        /** {@inheritDoc} */
         @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
             System.out.println(">>>> Received update: " + iterable);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index c20ff2e..08268af 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -497,12 +497,12 @@ public class GridSpiTestContext implements IgniteSpiContext {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean tryFailNode(UUID nodeId) {
+    @Override public boolean tryFailNode(UUID nodeId, @Nullable String warning) {
         return false;
     }
 
     /** {@inheritDoc} */
-    @Override public void failNode(UUID nodeId) {
+    @Override public void failNode(UUID nodeId, @Nullable String warning) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/462495f2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index b4977ce..2d14728 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -91,6 +91,7 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTestSuite(GridMessageListenSelfTest.class);
         suite.addTestSuite(GridFailFastNodeFailureDetectionSelfTest.class);
         suite.addTestSuite(OffHeapTieredTransactionSelfTest.class);
+        suite.addTestSuite(IgniteSlowClientDetectionSelfTest.class);
 
         return suite;
     }


[2/7] incubator-ignite git commit: # ignite-1034 fixed assert in discovery manager, warning on all nodes, improved test

Posted by sb...@apache.org.
# ignite-1034 fixed assert in discovery manager, warning on all nodes, improved test


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/49dc4a50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/49dc4a50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/49dc4a50

Branch: refs/heads/ignite-sprint-6
Commit: 49dc4a50197b8a7203bfc0684d5f77f46bda0297
Parents: 44bbece
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 19 10:12:20 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 19 11:41:00 2015 +0300

----------------------------------------------------------------------
 .../internal/managers/GridManagerAdapter.java   |  8 +--
 .../discovery/GridDiscoveryManager.java         | 19 ++++--
 .../continuous/CacheContinuousQueryHandler.java |  8 +++
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  4 +-
 .../org/apache/ignite/spi/IgniteSpiContext.java |  6 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 20 +++---
 .../tcp/TcpCommunicationSpiMBean.java           | 11 ++++
 .../ignite/spi/discovery/DiscoverySpi.java      |  3 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 12 +++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 12 +++-
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  3 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  6 +-
 .../messages/TcpDiscoveryNodeFailedMessage.java | 18 +++++
 ...ridFailFastNodeFailureDetectionSelfTest.java | 17 ++++-
 .../IgniteSlowClientDetectionSelfTest.java      | 69 +++++++++++++++++++-
 .../testframework/GridSpiTestContext.java       |  4 +-
 .../ignite/testsuites/IgniteBasicTestSuite.java |  1 +
 17 files changed, 180 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 885d52c..40a5ea5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -480,12 +480,12 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         return ctx.io().messageFactory();
                     }
 
-                    @Override public boolean tryFailNode(UUID nodeId) {
-                        return ctx.discovery().tryFailNode(nodeId);
+                    @Override public boolean tryFailNode(UUID nodeId, @Nullable String warning) {
+                        return ctx.discovery().tryFailNode(nodeId, warning);
                     }
 
-                    @Override public void failNode(UUID nodeId) {
-                        ctx.discovery().failNode(nodeId);
+                    @Override public void failNode(UUID nodeId, @Nullable String warning) {
+                        ctx.discovery().failNode(nodeId, warning);
                     }
 
                     @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 717cdf3..1e4b972 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -386,9 +386,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         verChanged = false;
                 }
                 else {
-                    minorTopVer = 0;
+                    if (type != EVT_NODE_SEGMENTED) {
+                        minorTopVer = 0;
 
-                    verChanged = true;
+                        verChanged = true;
+                    }
+                    else
+                        verChanged = false;
                 }
 
                 AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
@@ -1481,15 +1485,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
     /**
      * @param nodeId Node ID.
+     * @param warning Warning message to be shown on all nodes.
      * @return Whether node is failed.
      */
-    public boolean tryFailNode(UUID nodeId) {
+    public boolean tryFailNode(UUID nodeId, @Nullable String warning) {
         if (!busyLock.enterBusy())
             return false;
 
         try {
             if (!getSpi().pingNode(nodeId)) {
-                getSpi().failNode(nodeId);
+                getSpi().failNode(nodeId, warning);
 
                 return true;
             }
@@ -1503,13 +1508,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
     /**
      * @param nodeId Node ID to fail.
+     * @param warning Warning message to be shown on all nodes.
      */
-    public void failNode(UUID nodeId) {
+    public void failNode(UUID nodeId, @Nullable String warning) {
         if (!busyLock.enterBusy())
             return;
 
         try {
-            getSpi().failNode(nodeId);
+            getSpi().failNode(nodeId, warning);
         }
         finally {
             busyLock.leaveBusy();
@@ -1520,6 +1526,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * Updates topology version if current version is smaller than updated.
      *
      * @param updated Updated topology version.
+     * @param discoCache Discovery cache.
      * @return {@code True} if topology was updated.
      */
     private boolean updateTopologyVersionIfGreater(AffinityTopologyVersion updated, DiscoCache discoCache) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index ad78b92..ff2905f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.managers.deployment.*;
@@ -226,6 +227,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
                             ctx.continuous().addNotification(nodeId, routineId, evt.entry(), topic, sync, true);
                         }
+                        catch (ClusterTopologyCheckedException ex) {
+                            IgniteLogger log = ctx.log(getClass());
+
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to send event notification to node, node left cluster " +
+                                    "[node=" + nodeId + ", err=" + ex + ']');
+                        }
                         catch (IgniteCheckedException ex) {
                             U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
                         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 18191a1..5e557bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -754,12 +754,12 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         }
 
         /** {@inheritDoc} */
-        @Override public boolean tryFailNode(UUID nodeId) {
+        @Override public boolean tryFailNode(UUID nodeId, @Nullable String warning) {
             return false;
         }
 
         /** {@inheritDoc} */
-        @Override public void failNode(UUID nodeId) {
+        @Override public void failNode(UUID nodeId, @Nullable String warning) {
             // No-op.
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index a655a73..611702b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -307,14 +307,16 @@ public interface IgniteSpiContext {
 
     /**
      * @param nodeId Node ID.
+     * @param warning Warning to be shown on all cluster nodes.
      * @return If node was failed.
      */
-    public boolean tryFailNode(UUID nodeId);
+    public boolean tryFailNode(UUID nodeId, @Nullable String warning);
 
     /**
      * @param nodeId Node ID.
+     * @param warning Warning to be shown on all cluster nodes.
      */
-    public void failNode(UUID nodeId);
+    public void failNode(UUID nodeId, @Nullable String warning);
 
     /**
      * @param c Timeout object.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 538e9a8..84c1a57 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1153,15 +1153,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         return msgQueueLimit;
     }
 
-    /**
-     * Gets slow client queue limit.
-     * <p/>
-     * When set to a positive number, communication SPI will monitor clients outbound queue sizes and will drop
-     * those clients whose queue exceeded this limit.
-     *
-     * @return Slow client queue limit.
-     */
-    public int getSlowClientQueueLimit() {
+    /** {@inheritDoc} */
+    @Override public int getSlowClientQueueLimit() {
         return slowClientQueueLimit;
     }
 
@@ -1923,10 +1916,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 ClusterNode node = getSpiContext().node(id);
 
                 if (node != null && node.isClient()) {
-                    LT.warn(log, null, "Client node outbound queue size exceed configured slow client queue limit, " +
-                        "will fail the node (consider changing \'slowClientQueueLimit\'): " + node);
+                    String msg = "Client node outbound queue size exceed configured slow client queue limit, " +
+                        "will fail the node (consider changing \'slowClientQueueLimit\') [clientNode=" + node +
+                        ", slowClientQueueLimit=" + slowClientQueueLimit + ']';
+
+                    LT.warn(log, null, msg);
 
-                    getSpiContext().failNode(id);
+                    getSpiContext().failNode(id, msg);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index fe4f581..1971d99 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -268,4 +268,15 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
      */
     @MXBeanDescription("Maximum number of unacknowledged messages.")
     public int getUnacknowledgedMessagesBufferSize();
+
+    /**
+     * Gets slow client queue limit.
+     * <p/>
+     * When set to a positive number, communication SPI will monitor clients outbound queue sizes and will drop
+     * those clients whose queue exceeded this limit.
+     *
+     * @return Slow client queue limit.
+     */
+    @MXBeanDescription("Slow client queue limit.")
+    public int getSlowClientQueueLimit();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index b952087..11a18b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -151,8 +151,9 @@ public interface DiscoverySpi extends IgniteSpi {
      * Initiates failure of provided node.
      *
      * @param nodeId Node ID.
+     * @param warning Warning to be shown on all cluster nodes.
      */
-    public void failNode(UUID nodeId);
+    public void failNode(UUID nodeId, @Nullable String warning);
 
     /**
      * Whether or not discovery is started in client mode.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index fef6f4f..e255e08 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -335,13 +335,15 @@ class ClientImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
-    @Override public void failNode(UUID nodeId) {
+    @Override public void failNode(UUID nodeId, @Nullable String warning) {
         ClusterNode node = rmtNodes.get(nodeId);
 
         if (node != null) {
             TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
                 node.id(), node.order());
 
+            msg.warning(warning);
+
             msgWorker.addMessage(msg);
         }
     }
@@ -1432,6 +1434,14 @@ class ClientImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
+                if (msg.warning() != null) {
+                    ClusterNode creatorNode = rmtNodes.get(msg.creatorNodeId());
+
+                    U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
+                        "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : msg.creatorNodeId()) +
+                        ", msg=" + msg.warning() + ']');
+                }
+
                 notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, top);
 
                 spi.stats.onNodeFailed();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8eb82ac..2458f85 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -584,13 +584,15 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
-    @Override public void failNode(UUID nodeId) {
+    @Override public void failNode(UUID nodeId, @Nullable String warning) {
         ClusterNode node = ring.node(nodeId);
 
         if (node != null) {
             TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(),
                 node.id(), node.order());
 
+            msg.warning(warning);
+
             msgWorker.addMessage(msg);
         }
     }
@@ -3409,6 +3411,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                         worker.interrupt();
                 }
 
+                if (msg.warning() != null && !msg.creatorNodeId().equals(getLocalNodeId())) {
+                    ClusterNode creatorNode = ring.node(msg.creatorNodeId());
+
+                    U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
+                        "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : msg.creatorNodeId()) +
+                        ", msg=" + msg.warning() + ']');
+                }
+
                 notifyDiscovery(EVT_NODE_FAILED, topVer, node);
 
                 spi.stats.onNodeFailed();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 94097c9..ace917f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -175,8 +175,9 @@ abstract class TcpDiscoveryImpl {
 
     /**
      * @param nodeId Node id.
+     * @param warning Warning message to be shown on all nodes.
      */
-    public abstract void failNode(UUID nodeId);
+    public abstract void failNode(UUID nodeId, @Nullable String warning);
 
     /**
      * @param gridName Grid name.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index c36ac76..1d1916a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -373,8 +373,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     }
 
     /** {@inheritDoc} */
-    @Override public void failNode(UUID nodeId) {
-        impl.failNode(nodeId);
+    @Override public void failNode(UUID nodeId, @Nullable String warning) {
+        impl.failNode(nodeId, warning);
     }
 
     /** {@inheritDoc} */
@@ -385,7 +385,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** {@inheritDoc} */
     @Override public boolean isClientMode() {
         if (impl == null)
-            throw new IllegalStateException("TcpDiscoverySpi has not started");
+            throw new IllegalStateException("TcpDiscoverySpi has not started.");
 
         return impl instanceof ClientImpl;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
index 8cb8414..93ecdaa 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeFailedMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp.messages;
 
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
 
 import java.util.*;
 
@@ -37,6 +38,9 @@ public class TcpDiscoveryNodeFailedMessage extends TcpDiscoveryAbstractMessage {
     /** Internal order of the failed node. */
     private final long order;
 
+    /** */
+    private String warning;
+
     /**
      * Constructor.
      *
@@ -55,6 +59,20 @@ public class TcpDiscoveryNodeFailedMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * @param warning Warning message to be shown on all nodes.
+     */
+    public void warning(String warning) {
+        this.warning = warning;
+    }
+
+    /**
+     * @return Warning message to be shown on all nodes.
+     */
+    @Nullable public String warning() {
+        return warning;
+    }
+
+    /**
      * Gets ID of the failed node.
      *
      * @return ID of the failed node.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
index 992d7bf..238115d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridFailFastNodeFailureDetectionSelfTest.java
@@ -50,7 +50,12 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract
         TcpDiscoverySpi disco = new TcpDiscoverySpi();
 
         disco.setIpFinder(IP_FINDER);
-        disco.setHeartbeatFrequency(10000);
+        disco.setHeartbeatFrequency(10_000);
+
+        // Set parameters for fast ping failure.
+        disco.setSocketTimeout(100);
+        disco.setNetworkTimeout(100);
+        disco.setReconnectCount(2);
 
         cfg.setDiscoverySpi(disco);
 
@@ -66,8 +71,6 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract
      * @throws Exception If failed.
      */
     public void testFailFast() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-933");
-
         startGridsMultiThreaded(5);
 
         final CountDownLatch failLatch = new CountDownLatch(4);
@@ -87,6 +90,8 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract
         Ignite ignite1 = ignite(0);
         Ignite ignite2 = ignite(1);
 
+        final CountDownLatch evtLatch = new CountDownLatch(1);
+
         ignite1.message().localListen(null, new MessagingListenActor<Object>() {
             @Override protected void receive(UUID nodeId, Object rcvMsg) throws Throwable {
                 respond(rcvMsg);
@@ -95,12 +100,18 @@ public class GridFailFastNodeFailureDetectionSelfTest extends GridCommonAbstract
 
         ignite2.message().localListen(null, new MessagingListenActor<Object>() {
             @Override protected void receive(UUID nodeId, Object rcvMsg) throws Throwable {
+                evtLatch.countDown();
+
                 respond(rcvMsg);
             }
         });
 
         ignite1.message(ignite1.cluster().forRemotes()).send(null, "Message");
 
+        evtLatch.await(); // Wait when connection is established.
+
+        log.info("Fail node: " + ignite1.cluster().localNode());
+
         failNode(ignite1);
 
         assert failLatch.await(1000, MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
index 09b4215..27c2a61 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -19,24 +19,37 @@ package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.nio.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 
 import javax.cache.event.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
 
 /**
  *
  */
 public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
-
+    /** */
     public static final String PARTITIONED = "partitioned";
 
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
     /**
      * @return Node count.
      */
@@ -48,6 +61,8 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
         if (getTestGridName(nodeCount() - 1).equals(gridName) || getTestGridName(nodeCount() - 2).equals(gridName))
             cfg.setClientMode(true);
 
@@ -66,7 +81,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        startGridsMultiThreaded(nodeCount());
+        startGrids(nodeCount());
     }
 
     /** {@inheritDoc} */
@@ -82,6 +97,45 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
     public void testSlowClient() throws Exception {
         final IgniteEx slowClient = grid(nodeCount() - 1);
 
+        final ClusterNode slowClientNode = slowClient.localNode();
+
+        final CountDownLatch evtSegmentedLatch = new CountDownLatch(1);
+
+        slowClient.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                assertEquals("Unexpected event: " + evt, evt.type(), EventType.EVT_NODE_SEGMENTED);
+
+                DiscoveryEvent evt0 = (DiscoveryEvent)evt;
+
+                assertEquals(slowClientNode, evt0.eventNode());
+                assertEquals(5L, evt0.topologyVersion());
+
+                evtSegmentedLatch.countDown();
+
+                return false;
+            }
+        }, EventType.EVT_NODE_SEGMENTED);
+
+        final CountDownLatch evtFailedLatch = new CountDownLatch(nodeCount() - 1);
+
+        for (int i = 0; i < nodeCount() - 1; i++) {
+            grid(i).events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    assertEquals("Unexpected event: " + evt, evt.type(), EventType.EVT_NODE_FAILED);
+
+                    DiscoveryEvent evt0 = (DiscoveryEvent) evt;
+
+                    assertEquals(slowClientNode, evt0.eventNode());
+                    assertEquals(6L, evt0.topologyVersion());
+                    assertEquals(4, evt0.topologyNodes().size());
+
+                    evtFailedLatch.countDown();
+
+                    return false;
+                }
+            }, EventType.EVT_NODE_FAILED);
+        }
+
         assertTrue(slowClient.cluster().localNode().isClient());
 
         IgniteCache<Object, Object> cache = slowClient.getOrCreateCache(PARTITIONED);
@@ -109,14 +163,23 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
         for (int i = 0; i < 100; i++)
             cache0.put(0, new byte[10 * 1024]);
 
-        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+        boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 return Ignition.state(slowClient.name()) == IgniteState.STOPPED_ON_SEGMENTATION;
             }
         }, getTestTimeout());
+
+        assertTrue(wait);
+
+        assertTrue("Failed to wait for client failed event", evtFailedLatch.await(5000, MILLISECONDS));
+        assertTrue("Failed to wait for client segmented event", evtSegmentedLatch.await(5000, MILLISECONDS));
     }
 
+    /**
+     *
+     */
     private static class Listener implements CacheEntryUpdatedListener<Object, Object> {
+        /** {@inheritDoc} */
         @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
             System.out.println(">>>> Received update: " + iterable);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index c20ff2e..08268af 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -497,12 +497,12 @@ public class GridSpiTestContext implements IgniteSpiContext {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean tryFailNode(UUID nodeId) {
+    @Override public boolean tryFailNode(UUID nodeId, @Nullable String warning) {
         return false;
     }
 
     /** {@inheritDoc} */
-    @Override public void failNode(UUID nodeId) {
+    @Override public void failNode(UUID nodeId, @Nullable String warning) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/49dc4a50/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index b4977ce..2d14728 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -91,6 +91,7 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTestSuite(GridMessageListenSelfTest.class);
         suite.addTestSuite(GridFailFastNodeFailureDetectionSelfTest.class);
         suite.addTestSuite(OffHeapTieredTransactionSelfTest.class);
+        suite.addTestSuite(IgniteSlowClientDetectionSelfTest.class);
 
         return suite;
     }


[5/7] incubator-ignite git commit: # ignite-sprint-6 fixed NPE in comm spi unit tests

Posted by sb...@apache.org.
# ignite-sprint-6 fixed NPE in comm spi unit tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/895771a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/895771a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/895771a7

Branch: refs/heads/ignite-sprint-6
Commit: 895771a79d8acca3baf06c23bf3f82483ec62d6f
Parents: ef4abeb
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 19 13:06:05 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 19 13:06:05 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/895771a7/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 84c1a57..bbb9b1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1458,8 +1458,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 };
 
+                boolean clientMode = Boolean.TRUE.equals(ignite.configuration().isClientMode());
+
                 IgniteBiInClosure<GridNioSession, Integer> queueSizeMonitor =
-                    !ignite.configuration().isClientMode() && slowClientQueueLimit > 0 ?
+                    !clientMode && slowClientQueueLimit > 0 ?
                     new CI2<GridNioSession, Integer>() {
                         @Override public void apply(GridNioSession ses, Integer qSize) {
                             checkClientQueueSize(ses, qSize);


[6/7] incubator-ignite git commit: # ignite-1034

Posted by sb...@apache.org.
# ignite-1034


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d34f3294
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d34f3294
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d34f3294

Branch: refs/heads/ignite-sprint-6
Commit: d34f3294fbe120c443448e4df201fc9337725706
Parents: 895771a
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 19 13:27:26 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 19 13:27:26 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/communication/tcp/TcpCommunicationSpi.java       | 5 -----
 1 file changed, 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d34f3294/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index bbb9b1c..a540b7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -611,11 +611,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @LoggerResource
     private IgniteLogger log;
 
-    /** Ignite. */
-    @IgniteInstanceResource
-    @GridToStringExclude
-    private Ignite ignite;
-
     /** Local IP address. */
     private String locAddr;
 


[7/7] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-1034' into ignite-sprint-6

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-1034' into ignite-sprint-6


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7a8b572b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7a8b572b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7a8b572b

Branch: refs/heads/ignite-sprint-6
Commit: 7a8b572bf93b2d72aeb19cdad1f904fed49d5d17
Parents: d699faa d34f329
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 19 17:27:55 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 19 17:27:55 2015 +0300

----------------------------------------------------------------------
 .../internal/managers/GridManagerAdapter.java   |   8 +-
 .../discovery/GridDiscoveryManager.java         |  30 ++-
 .../continuous/CacheContinuousQueryHandler.java |   8 +
 .../ignite/internal/util/nio/GridNioServer.java |  64 ++++++-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |   7 +-
 .../org/apache/ignite/spi/IgniteSpiContext.java |   9 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  66 +++++++
 .../tcp/TcpCommunicationSpiMBean.java           |  11 ++
 .../ignite/spi/discovery/DiscoverySpi.java      |   3 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  12 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  17 +-
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   3 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   6 +-
 .../messages/TcpDiscoveryNodeFailedMessage.java |  18 ++
 ...ridFailFastNodeFailureDetectionSelfTest.java |  17 +-
 .../IgniteSlowClientDetectionSelfTest.java      | 187 +++++++++++++++++++
 .../testframework/GridSpiTestContext.java       |   7 +-
 .../ignite/testsuites/IgniteBasicTestSuite.java |   1 +
 18 files changed, 451 insertions(+), 23 deletions(-)
----------------------------------------------------------------------