You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/11 12:35:48 UTC

[15/17] ignite git commit: IGNITE-2466 - Use current NIO back pressure mechanism to limit received messages. Mark them process only when backups acknowledged.

IGNITE-2466 - Use current NIO back pressure mechanism to limit received messages. Mark them process only when backups acknowledged.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/220db882
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/220db882
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/220db882

Branch: refs/heads/ignite-4932
Commit: 220db882b466c03eadd148b3a19a0bf70d82d4a6
Parents: a039260
Author: dkarachentsev <dk...@gridgain.com>
Authored: Mon Apr 10 10:28:15 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Mon Apr 10 10:28:15 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  12 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  23 ++-
 .../util/nio/GridNioBackPressureControl.java    |  39 ++++-
 .../util/nio/GridNioMessageTracker.java         |   7 +
 .../CacheAtomicPrimarySyncBackPressureTest.java | 151 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |   3 +
 6 files changed, 220 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index bb3add4..dbd5db6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -738,7 +738,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         Runnable c = new Runnable() {
             @Override public void run() {
                 try {
-                    threadProcessingMessage(true);
+                    threadProcessingMessage(true, msgC);
 
                     GridMessageListener lsnr = listenerGet0(msg.topic());
 
@@ -752,7 +752,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     invokeListener(msg.policy(), lsnr, nodeId, obj);
                 }
                 finally {
-                    threadProcessingMessage(false);
+                    threadProcessingMessage(false, null);
 
                     msgC.run();
                 }
@@ -787,12 +787,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         Runnable c = new Runnable() {
             @Override public void run() {
                 try {
-                    threadProcessingMessage(true);
+                    threadProcessingMessage(true, msgC);
 
                     processRegularMessage0(msg, nodeId);
                 }
                 finally {
-                    threadProcessingMessage(false);
+                    threadProcessingMessage(false, null);
 
                     msgC.run();
                 }
@@ -1148,12 +1148,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         Runnable c = new Runnable() {
             @Override public void run() {
                 try {
-                    threadProcessingMessage(true);
+                    threadProcessingMessage(true, msgC);
 
                     unwindMessageSet(msgSet0, lsnr);
                 }
                 finally {
-                    threadProcessingMessage(false);
+                    threadProcessingMessage(false, null);
                 }
             }
         };

http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 463fc57..047be87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -84,6 +84,8 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.nio.GridNioBackPressureControl;
+import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -92,14 +94,15 @@ import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
@@ -110,6 +113,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_AC
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
@@ -1904,8 +1908,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (req.writeSynchronizationMode() != FULL_ASYNC)
                             req.cleanup(!node.isLocal());
 
-                        if (dhtFut != null)
+                        if (dhtFut != null) {
+                            if (req.writeSynchronizationMode() == PRIMARY_SYNC && !dhtFut.isDone()) {
+                                final IgniteRunnable tracker = GridNioBackPressureControl.threadTracker();
+
+                                if (tracker != null && tracker instanceof GridNioMessageTracker) {
+                                    ((GridNioMessageTracker)tracker).onMessageReceived();
+
+                                    dhtFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
+                                        @Override public void apply(IgniteInternalFuture<Void> fut) {
+                                            ((GridNioMessageTracker)tracker).onMessageProcessed();
+                                        }
+                                    });
+                                }
+                            }
+
                             ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut);
+                        }
                     }
                     else
                         // Should remap all keys.

http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
index 96a1ab3..37d985f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
@@ -17,14 +17,17 @@
 
 package org.apache.ignite.internal.util.nio;
 
+import org.apache.ignite.lang.IgniteRunnable;
+import org.jetbrains.annotations.Nullable;
+
 /**
  * Utility class that allows to ignore back-pressure control for threads that are processing messages.
  */
 public class GridNioBackPressureControl {
     /** Thread local flag indicating that thread is processing message. */
-    private static ThreadLocal<Boolean> threadProcMsg = new ThreadLocal<Boolean>() {
-        @Override protected Boolean initialValue() {
-            return Boolean.FALSE;
+    private static ThreadLocal<Holder> threadProcMsg = new ThreadLocal<Holder>() {
+        @Override protected Holder initialValue() {
+            return new Holder();
         }
     };
 
@@ -32,13 +35,35 @@ public class GridNioBackPressureControl {
      * @return Flag indicating whether current thread is processing message.
      */
     public static boolean threadProcessingMessage() {
-        return threadProcMsg.get();
+        return threadProcMsg.get().procMsg;
     }
 
     /**
      * @param processing Flag indicating whether current thread is processing message.
+     * @param tracker Thread local back pressure tracker of messages, associated with one connection.
+     */
+    public static void threadProcessingMessage(boolean processing, @Nullable IgniteRunnable tracker) {
+        Holder holder = threadProcMsg.get();
+
+        holder.procMsg = processing;
+        holder.tracker = tracker;
+    }
+
+    /**
+     * @return Thread local back pressure tracker of messages, associated with one connection.
      */
-    public static void threadProcessingMessage(boolean processing) {
-        threadProcMsg.set(processing);
+    @Nullable public static IgniteRunnable threadTracker() {
+        return threadProcMsg.get().tracker;
+    }
+
+    /**
+     *
+     */
+    private static class Holder {
+        /** Process message. */
+        private boolean procMsg;
+
+        /** Tracker. */
+        private IgniteRunnable tracker;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
index e02c7ca..f05ee0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
@@ -93,6 +93,13 @@ public class GridNioMessageTracker implements IgniteRunnable {
     }
 
     /**
+     *
+     */
+    public void onMessageProcessed() {
+        run();
+    }
+
+    /**
      */
     public void onMessageReceived() {
         int cnt = msgCnt.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java
new file mode 100644
index 0000000..49e3e5c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Checks that back-pressure control restricts uncontrolled growing
+ * of backup message queue. This means, if queue too big - any reads
+ * will be stopped until received acks from backup nodes.
+ */
+public class CacheAtomicPrimarySyncBackPressureTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration("cache");
+
+        ccfg.setBackups(1);
+
+        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+        ccfg.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED);
+        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+
+        TestCommunicationSpi spi = new TestCommunicationSpi();
+
+        spi.setMessageQueueLimit(100);
+
+        cfg.setCommunicationSpi(spi);
+        cfg.setClientMode(gridName.contains("client"));
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientPut() throws Exception {
+        Ignite srv1 = startGrid("server1");
+        Ignite srv2 = startGrid("server2");
+
+        final Ignite client = startGrid("client");
+
+        checkBackPressure(client, srv1, srv2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServerPut() throws Exception {
+        Ignite srv1 = startGrid("server1");
+        Ignite srv2 = startGrid("server2");
+
+        final Ignite client = startGrid("server3");
+
+        checkBackPressure(client, srv1, srv2);
+    }
+
+    /**
+     * @param client Producer node.
+     * @throws InterruptedException If failed.
+     */
+    private void checkBackPressure(Ignite client, final Ignite srv1, final Ignite srv2) throws Exception {
+        final IgniteCache<Integer, String> cache = client.cache("cache");
+
+        awaitPartitionMapExchange();
+
+        for (int i = 0; i < 10000; i++) {
+            cache.put(i, String.valueOf(i));
+
+            if (i % 100 == 0) {
+                int size1 = futuresNum(srv1);
+                int size2 = futuresNum(srv2);
+
+                assert size1 < 150 : size1;
+                assert size2 < 150 : size2;
+            }
+        }
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @return Size of the backup queue.
+     */
+    private int futuresNum(Ignite ignite) {
+        return ((IgniteKernal)ignite).context().cache().context().mvcc().atomicFutures().size();
+    }
+
+    /**
+     * Delays backup update acks.
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg,
+            IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            if (((GridIoMessage)msg).message() instanceof GridDhtAtomicDeferredUpdateResponse)
+                sleep(100);
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+
+    /**
+     * @param millis Millis.
+     */
+    private static void sleep(long millis) {
+        try {
+            Thread.sleep(millis);
+        }
+        catch (InterruptedException e) {
+            throw new IgniteSpiException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 5a09a1c..9fcf31a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -94,6 +94,7 @@ import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransaction
 import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest;
 import org.apache.ignite.internal.processors.cache.MarshallerCacheJobRunNodeRestartTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest;
@@ -334,6 +335,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
 
         suite.addTestSuite(IgniteCacheNearOnlyTxTest.class);
 
+        suite.addTestSuite(CacheAtomicPrimarySyncBackPressureTest.class);
+
         return suite;
     }
 }
\ No newline at end of file