You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/11 12:35:48 UTC
[15/17] ignite git commit: IGNITE-2466 - Use current NIO back
pressure mechanism to limit received messages. Mark them process only when
backups acknowledged.
IGNITE-2466 - Use current NIO back pressure mechanism to limit received messages. Mark them process only when backups acknowledged.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/220db882
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/220db882
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/220db882
Branch: refs/heads/ignite-4932
Commit: 220db882b466c03eadd148b3a19a0bf70d82d4a6
Parents: a039260
Author: dkarachentsev <dk...@gridgain.com>
Authored: Mon Apr 10 10:28:15 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Mon Apr 10 10:28:15 2017 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 12 +-
.../dht/atomic/GridDhtAtomicCache.java | 23 ++-
.../util/nio/GridNioBackPressureControl.java | 39 ++++-
.../util/nio/GridNioMessageTracker.java | 7 +
.../CacheAtomicPrimarySyncBackPressureTest.java | 151 +++++++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 3 +
6 files changed, 220 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index bb3add4..dbd5db6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -738,7 +738,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
Runnable c = new Runnable() {
@Override public void run() {
try {
- threadProcessingMessage(true);
+ threadProcessingMessage(true, msgC);
GridMessageListener lsnr = listenerGet0(msg.topic());
@@ -752,7 +752,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
invokeListener(msg.policy(), lsnr, nodeId, obj);
}
finally {
- threadProcessingMessage(false);
+ threadProcessingMessage(false, null);
msgC.run();
}
@@ -787,12 +787,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
Runnable c = new Runnable() {
@Override public void run() {
try {
- threadProcessingMessage(true);
+ threadProcessingMessage(true, msgC);
processRegularMessage0(msg, nodeId);
}
finally {
- threadProcessingMessage(false);
+ threadProcessingMessage(false, null);
msgC.run();
}
@@ -1148,12 +1148,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
Runnable c = new Runnable() {
@Override public void run() {
try {
- threadProcessingMessage(true);
+ threadProcessingMessage(true, msgC);
unwindMessageSet(msgSet0, lsnr);
}
finally {
- threadProcessingMessage(false);
+ threadProcessingMessage(false, null);
}
}
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 463fc57..047be87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -84,6 +84,8 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.nio.GridNioBackPressureControl;
+import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -92,14 +94,15 @@ import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
@@ -110,6 +113,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_AC
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
@@ -1904,8 +1908,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (req.writeSynchronizationMode() != FULL_ASYNC)
req.cleanup(!node.isLocal());
- if (dhtFut != null)
+ if (dhtFut != null) {
+ if (req.writeSynchronizationMode() == PRIMARY_SYNC && !dhtFut.isDone()) {
+ final IgniteRunnable tracker = GridNioBackPressureControl.threadTracker();
+
+ if (tracker != null && tracker instanceof GridNioMessageTracker) {
+ ((GridNioMessageTracker)tracker).onMessageReceived();
+
+ dhtFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
+ @Override public void apply(IgniteInternalFuture<Void> fut) {
+ ((GridNioMessageTracker)tracker).onMessageProcessed();
+ }
+ });
+ }
+ }
+
ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut);
+ }
}
else
// Should remap all keys.
http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
index 96a1ab3..37d985f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java
@@ -17,14 +17,17 @@
package org.apache.ignite.internal.util.nio;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.jetbrains.annotations.Nullable;
+
/**
* Utility class that allows to ignore back-pressure control for threads that are processing messages.
*/
public class GridNioBackPressureControl {
/** Thread local flag indicating that thread is processing message. */
- private static ThreadLocal<Boolean> threadProcMsg = new ThreadLocal<Boolean>() {
- @Override protected Boolean initialValue() {
- return Boolean.FALSE;
+ private static ThreadLocal<Holder> threadProcMsg = new ThreadLocal<Holder>() {
+ @Override protected Holder initialValue() {
+ return new Holder();
}
};
@@ -32,13 +35,35 @@ public class GridNioBackPressureControl {
* @return Flag indicating whether current thread is processing message.
*/
public static boolean threadProcessingMessage() {
- return threadProcMsg.get();
+ return threadProcMsg.get().procMsg;
}
/**
* @param processing Flag indicating whether current thread is processing message.
+ * @param tracker Thread local back pressure tracker of messages, associated with one connection.
+ */
+ public static void threadProcessingMessage(boolean processing, @Nullable IgniteRunnable tracker) {
+ Holder holder = threadProcMsg.get();
+
+ holder.procMsg = processing;
+ holder.tracker = tracker;
+ }
+
+ /**
+ * @return Thread local back pressure tracker of messages, associated with one connection.
*/
- public static void threadProcessingMessage(boolean processing) {
- threadProcMsg.set(processing);
+ @Nullable public static IgniteRunnable threadTracker() {
+ return threadProcMsg.get().tracker;
+ }
+
+ /**
+ *
+ */
+ private static class Holder {
+ /** Process message. */
+ private boolean procMsg;
+
+ /** Tracker. */
+ private IgniteRunnable tracker;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
index e02c7ca..f05ee0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java
@@ -93,6 +93,13 @@ public class GridNioMessageTracker implements IgniteRunnable {
}
/**
+ *
+ */
+ public void onMessageProcessed() {
+ run();
+ }
+
+ /**
*/
public void onMessageReceived() {
int cnt = msgCnt.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java
new file mode 100644
index 0000000..49e3e5c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Checks that back-pressure control restricts uncontrolled growing
+ * of backup message queue. This means, if queue too big - any reads
+ * will be stopped until received acks from backup nodes.
+ */
+public class CacheAtomicPrimarySyncBackPressureTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration ccfg = new CacheConfiguration("cache");
+
+ ccfg.setBackups(1);
+
+ ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+ ccfg.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED);
+ ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+
+ TestCommunicationSpi spi = new TestCommunicationSpi();
+
+ spi.setMessageQueueLimit(100);
+
+ cfg.setCommunicationSpi(spi);
+ cfg.setClientMode(gridName.contains("client"));
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientPut() throws Exception {
+ Ignite srv1 = startGrid("server1");
+ Ignite srv2 = startGrid("server2");
+
+ final Ignite client = startGrid("client");
+
+ checkBackPressure(client, srv1, srv2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerPut() throws Exception {
+ Ignite srv1 = startGrid("server1");
+ Ignite srv2 = startGrid("server2");
+
+ final Ignite client = startGrid("server3");
+
+ checkBackPressure(client, srv1, srv2);
+ }
+
+ /**
+ * @param client Producer node.
+ * @throws InterruptedException If failed.
+ */
+ private void checkBackPressure(Ignite client, final Ignite srv1, final Ignite srv2) throws Exception {
+ final IgniteCache<Integer, String> cache = client.cache("cache");
+
+ awaitPartitionMapExchange();
+
+ for (int i = 0; i < 10000; i++) {
+ cache.put(i, String.valueOf(i));
+
+ if (i % 100 == 0) {
+ int size1 = futuresNum(srv1);
+ int size2 = futuresNum(srv2);
+
+ assert size1 < 150 : size1;
+ assert size2 < 150 : size2;
+ }
+ }
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @return Size of the backup queue.
+ */
+ private int futuresNum(Ignite ignite) {
+ return ((IgniteKernal)ignite).context().cache().context().mvcc().atomicFutures().size();
+ }
+
+ /**
+ * Delays backup update acks.
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
+ IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+ if (((GridIoMessage)msg).message() instanceof GridDhtAtomicDeferredUpdateResponse)
+ sleep(100);
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
+
+ /**
+ * @param millis Millis.
+ */
+ private static void sleep(long millis) {
+ try {
+ Thread.sleep(millis);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteSpiException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 5a09a1c..9fcf31a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -94,6 +94,7 @@ import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransaction
import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest;
import org.apache.ignite.internal.processors.cache.MarshallerCacheJobRunNodeRestartTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest;
@@ -334,6 +335,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteCacheNearOnlyTxTest.class);
+ suite.addTestSuite(CacheAtomicPrimarySyncBackPressureTest.class);
+
return suite;
}
}
\ No newline at end of file