You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/24 13:53:41 UTC
[1/3] ignite git commit: IGNITE-2876: Fixed possible starvation in
system pool caused by IgfsBlockMessage. This closes #575.
Repository: ignite
Updated Branches:
refs/heads/ignite-gg-10994 5a7cb70f6 -> d27eee8cc
IGNITE-2876: Fixed possible starvation in system pool caused by IgfsBlockMessage. This closes #575.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc14c802
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc14c802
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc14c802
Branch: refs/heads/ignite-gg-10994
Commit: bc14c802f8d404392901abb7fb2da6088d638d36
Parents: 88c65b8
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 24 13:28:30 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 24 13:28:30 2016 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 11 +
.../managers/communication/GridIoPolicy.java | 3 +
.../processors/igfs/IgfsDataManager.java | 29 +-
.../processors/igfs/IgfsDeleteWorker.java | 2 +-
.../igfs/IgfsFragmentizerManager.java | 4 +-
...lockMessageSystemPoolStarvationSelfTest.java | 299 +++++++++++++++++++
.../ignite/testsuites/IgniteIgfsTestSuite.java | 3 +
7 files changed, 329 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc14c802/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9ffbf4e..4577dc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -88,6 +88,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL;
@@ -150,6 +151,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** Marshaller cache pool. */
private ExecutorService marshCachePool;
+ /** IGFS pool. */
+ private ExecutorService igfsPool;
+
/** Discovery listener. */
private GridLocalEventListener discoLsnr;
@@ -252,6 +256,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
mgmtPool = ctx.getManagementExecutorService();
utilityCachePool = ctx.utilityCachePool();
marshCachePool = ctx.marshallerCachePool();
+ igfsPool = ctx.getIgfsExecutorService();
affPool = new IgniteThreadPoolExecutor(
"aff",
ctx.gridName(),
@@ -643,6 +648,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
case AFFINITY_POOL:
case UTILITY_CACHE_POOL:
case MARSH_CACHE_POOL:
+ case IGFS_POOL:
{
if (msg.isOrdered())
processOrderedMessage(nodeId, msg, plc, msgC);
@@ -703,6 +709,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return marshCachePool;
+ case IGFS_POOL:
+ assert igfsPool != null : "IGFS pool is not configured.";
+
+ return igfsPool;
+
default: {
assert plc >= 0 : "Negative policy: " + plc;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc14c802/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 57622c9..00590ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -43,6 +43,9 @@ public class GridIoPolicy {
/** Marshaller cache execution pool. */
public static final byte MARSH_CACHE_POOL = 6;
+ /** Marshaller cache execution pool. */
+ public static final byte IGFS_POOL = 7;
+
/**
* Defines the range of reserved pools that are not available for plugins.
* @param key The key.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc14c802/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 3825086..16fbeb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -91,7 +91,7 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -1014,26 +1014,18 @@ public class IgfsDataManager extends IgfsManager {
if (!node.isLocal()) {
final IgfsBlocksMessage msg = new IgfsBlocksMessage(fileId, batchId, blocks);
- callIgfsLocalSafe(new GridPlainCallable<Object>() {
- @Override @Nullable public Object call() throws Exception {
- try {
- igfsCtx.send(nodeId, topic, msg, SYSTEM_POOL);
- } catch (IgniteCheckedException e) {
- completionFut.onError(nodeId, e);
- }
-
- return null;
- }
- });
+ try {
+ igfsCtx.send(nodeId, topic, msg, IGFS_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ completionFut.onError(nodeId, e);
+ }
}
else {
callIgfsLocalSafe(new GridPlainCallable<Object>() {
- @Override
- @Nullable
- public Object call() throws Exception {
+ @Override @Nullable public Object call() throws Exception {
storeBlocksAsync(blocks).listen(new CI1<IgniteInternalFuture<?>>() {
- @Override
- public void apply(IgniteInternalFuture<?> fut) {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
try {
fut.get();
@@ -1276,8 +1268,7 @@ public class IgfsDataManager extends IgfsManager {
try {
// Send reply back to node.
- igfsCtx.send(nodeId, topic, new IgfsAckMessage(blocksMsg.fileId(), blocksMsg.id(), err),
- SYSTEM_POOL);
+ igfsCtx.send(nodeId, topic, new IgfsAckMessage(blocksMsg.fileId(), blocksMsg.id(), err), IGFS_POOL);
}
catch (IgniteCheckedException e) {
U.warn(log, "Failed to send batch acknowledgement (did node leave the grid?) [nodeId=" + nodeId +
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc14c802/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
index 7e4dac8..e5914e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
@@ -357,7 +357,7 @@ public class IgfsDeleteWorker extends IgfsThread {
for (ClusterNode node : nodes) {
try {
- igfsCtx.send(node, topic, msg, GridIoPolicy.SYSTEM_POOL);
+ igfsCtx.send(node, topic, msg, GridIoPolicy.IGFS_POOL);
}
catch (IgniteCheckedException e) {
U.warn(log, "Failed to send IGFS delete message to node [nodeId=" + node.id() +
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc14c802/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
index 99e7cd6..d64c64a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
@@ -59,7 +59,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
import static org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange.RANGE_STATUS_INITIAL;
import static org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange.RANGE_STATUS_MOVED;
import static org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange.RANGE_STATUS_MOVING;
@@ -186,7 +186,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
private void sendWithRetries(UUID nodeId, IgfsCommunicationMessage msg) throws IgniteCheckedException {
for (int i = 0; i < MESSAGE_SEND_RETRY_COUNT; i++) {
try {
- igfsCtx.send(nodeId, topic, msg, SYSTEM_POOL);
+ igfsCtx.send(nodeId, topic, msg, IGFS_POOL);
return;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc14c802/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
new file mode 100644
index 0000000..ec3b808
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsOutputStream;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Test to check for system pool starvation due to {@link IgfsBlocksMessage}.
+ */
+public class IgfsBlockMessageSystemPoolStarvationSelfTest extends IgfsCommonAbstractTest {
+ /** First node name. */
+ private static final String NODE_1_NAME = "node1";
+
+ /** Second node name. */
+ private static final String NODE_2_NAME = "node2";
+
+ /** Data cache name. */
+ private static final String DATA_CACHE_NAME = "data";
+
+ /** Meta cache name. */
+ private static final String META_CACHE_NAME = "meta";
+
+ /** Key in data caceh we will use to reproduce the issue. */
+ private static final Integer DATA_KEY = 1;
+
+ /** First node. */
+ private Ignite victim;
+
+ /** Second node. */
+ private Ignite attacker;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"unchecked", "ConstantConditions"})
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ // Start nodes.
+ TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ victim = Ignition.start(config(NODE_1_NAME, ipFinder));
+ attacker = Ignition.start(config(NODE_2_NAME, ipFinder));
+
+ // Check if we selected victim correctly.
+ if (F.eq(dataCache(victim).affinity().mapKeyToNode(DATA_KEY).id(), attacker.cluster().localNode().id())) {
+ Ignite tmp = victim;
+
+ victim = attacker;
+
+ attacker = tmp;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ G.stopAll(true);
+
+ victim = null;
+ attacker = null;
+
+ super.afterTest();
+ }
+
+ /**
+ * Test starvation.
+ *
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void testStarvation() throws Exception {
+ // 1. Create two IGFS file to make all system threads busy.
+ CountDownLatch fileWriteLatch = new CountDownLatch(1);
+
+ final IgniteInternalFuture fileFut1 = createFileAsync(new IgfsPath("/file1"), fileWriteLatch);
+ final IgniteInternalFuture fileFut2 = createFileAsync(new IgfsPath("/file2"), fileWriteLatch);
+
+ // 2. Start transaction and keep it opened.
+ final CountDownLatch txStartLatch = new CountDownLatch(1);
+ final CountDownLatch txCommitLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<Void> txFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ GridCacheAdapter dataCache = dataCache(attacker);
+
+ try (IgniteInternalTx tx =
+ dataCache.txStartEx(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ dataCache.put(DATA_KEY, 0);
+
+ txStartLatch.countDown();
+
+ txCommitLatch.await();
+
+ tx.commit();
+ }
+
+ return null;
+ }
+ });
+
+ txStartLatch.await();
+
+ // 3. Start async operation to drain semaphore permits.
+ final IgniteInternalFuture putFut = dataCache(victim).putAsync(DATA_KEY, 1);
+
+ assert !awaitFuture(putFut);
+
+ // 4. Write data to files and ensure we stuck.
+ fileWriteLatch.countDown();
+
+ assert !awaitFuture(fileFut1);
+ assert !awaitFuture(fileFut2);
+
+ // 5. Finish transaction.
+ txCommitLatch.countDown();
+
+ assert awaitFuture(txFut);
+
+ // 6. Async put must succeed.
+ assert awaitFuture(putFut);
+
+ // 7. Writes must succeed.
+ assert awaitFuture(fileFut1);
+ assert awaitFuture(fileFut2);
+ }
+
+ /**
+ * Await future completion.
+ *
+ * @param fut Future.
+ * @return {@code True} if future completed.
+ * @throws Exception If failed.
+ */
+ private static boolean awaitFuture(final IgniteInternalFuture fut) throws Exception {
+ return GridTestUtils.waitForCondition(new GridAbsPredicateX() {
+ @Override public boolean applyx() throws IgniteCheckedException {
+ return fut.isDone();
+ }
+ }, 1000);
+ }
+
+ /**
+ * Create IGFS file asynchronously.
+ *
+ * @param path Path.
+ * @return Future.
+ */
+ private IgniteInternalFuture<Void> createFileAsync(final IgfsPath path, final CountDownLatch writeStartLatch) {
+ return GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ IgniteFileSystem igfs = attacker.fileSystem(null);
+
+ try (IgfsOutputStream out = igfs.create(path, true)) {
+ writeStartLatch.await();
+
+ out.write(new byte[1024]);
+
+ out.flush();
+ }
+
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Get data cache for node.
+ *
+ * @param node Node.
+ * @return Data cache.
+ * @throws Exception If failed.
+ */
+ private GridCacheAdapter dataCache(Ignite node) throws Exception {
+ return ((IgniteKernal)node).internalCache(DATA_CACHE_NAME);
+ }
+
+ /**
+ * Create node configuration.
+ *
+ * @param name Node name.
+ * @return Configuration.
+ * @throws Exception If failed.
+ */
+ private IgniteConfiguration config(String name, TcpDiscoveryVmIpFinder ipFinder) throws Exception {
+ // Data cache configuration.
+ CacheConfiguration dataCcfg = new CacheConfiguration();
+
+ dataCcfg.setName(DATA_CACHE_NAME);
+ dataCcfg.setCacheMode(CacheMode.REPLICATED);
+ dataCcfg.setAtomicityMode(TRANSACTIONAL);
+ dataCcfg.setWriteSynchronizationMode(FULL_SYNC);
+ dataCcfg.setAffinityMapper(new DummyAffinityMapper(1));
+ dataCcfg.setMaxConcurrentAsyncOperations(1);
+
+ // Meta cache configuration.
+ CacheConfiguration metaCcfg = new CacheConfiguration();
+
+ metaCcfg.setName(META_CACHE_NAME);
+ metaCcfg.setCacheMode(CacheMode.REPLICATED);
+ metaCcfg.setAtomicityMode(TRANSACTIONAL);
+ metaCcfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ // File system configuration.
+ FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+ igfsCfg.setDefaultMode(IgfsMode.PRIMARY);
+ igfsCfg.setDataCacheName(DATA_CACHE_NAME);
+ igfsCfg.setMetaCacheName(META_CACHE_NAME);
+ igfsCfg.setFragmentizerEnabled(false);
+ igfsCfg.setBlockSize(1024);
+
+ // Ignite configuration.
+ IgniteConfiguration cfg = getConfiguration(name);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(discoSpi);
+ cfg.setCacheConfiguration(dataCcfg, metaCcfg);
+ cfg.setFileSystemConfiguration(igfsCfg);
+
+ cfg.setLocalHost("127.0.0.1");
+ cfg.setConnectorConfiguration(null);
+
+ cfg.setSystemThreadPoolSize(2);
+ cfg.setRebalanceThreadPoolSize(1);
+ cfg.setPublicThreadPoolSize(1);
+
+ return cfg;
+ }
+
+ /**
+ * Dimmy affinity mapper.
+ */
+ private static class DummyAffinityMapper extends IgfsGroupDataBlocksKeyMapper {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Dummy affinity key. */
+ private static final Integer KEY = 1;
+
+ /**
+ * Constructor.
+ *
+ * @param grpSize Group size.
+ */
+ public DummyAffinityMapper(int grpSize) {
+ super(grpSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object affinityKey(Object key) {
+ return KEY;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc14c802/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
index aff3ad7..25c54e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsAttributesSelfTest;
import org.apache.ignite.internal.processors.igfs.IgfsBackupsDualAsyncSelfTest;
import org.apache.ignite.internal.processors.igfs.IgfsBackupsDualSyncSelfTest;
import org.apache.ignite.internal.processors.igfs.IgfsBackupsPrimarySelfTest;
+import org.apache.ignite.internal.processors.igfs.IgfsBlockMessageSystemPoolStarvationSelfTest;
import org.apache.ignite.internal.processors.igfs.IgfsCachePerBlockLruEvictionPolicySelfTest;
import org.apache.ignite.internal.processors.igfs.IgfsCacheSelfTest;
import org.apache.ignite.internal.processors.igfs.IgfsClientCacheSelfTest;
@@ -118,6 +119,8 @@ public class IgniteIgfsTestSuite extends TestSuite {
suite.addTestSuite(IgfsBackupsDualSyncSelfTest.class);
suite.addTestSuite(IgfsBackupsDualAsyncSelfTest.class);
+ suite.addTestSuite(IgfsBlockMessageSystemPoolStarvationSelfTest.class);
+
// TODO: Enable when IGFS failover is fixed.
//suite.addTestSuite(IgfsBackupFailoverSelfTest.class);
[2/3] ignite git commit: IGNITE-2883: IGFS: Now IPC messages are
handled in a dedicated thread pool.
Posted by vo...@apache.org.
IGNITE-2883: IGFS: Now IPC messages are handled in a dedicated thread pool.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cfc7d4ee
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cfc7d4ee
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cfc7d4ee
Branch: refs/heads/ignite-gg-10994
Commit: cfc7d4eec255d0fe720398882d0c058f6821611a
Parents: bc14c80
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 24 14:29:35 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 24 14:29:35 2016 +0300
----------------------------------------------------------------------
.../igfs/IgfsIpcEndpointConfiguration.java | 28 +++++++
.../processors/igfs/IgfsIpcHandler.java | 81 +++++++++++++++-----
.../internal/processors/igfs/IgfsProcessor.java | 11 ++-
.../internal/processors/igfs/IgfsServer.java | 2 +-
.../igfs/IgfsProcessorValidationSelfTest.java | 16 ++++
5 files changed, 117 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java
index 23993a6..1c68d0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java
@@ -49,6 +49,9 @@ public class IgfsIpcEndpointConfiguration {
*/
public static final String DFLT_TOKEN_DIR_PATH = "ipc/shmem";
+ /** Default threads count. */
+ public static final int DFLT_THREAD_CNT = IgniteConfiguration.AVAILABLE_PROC_CNT;
+
/** Endpoint type. */
private IgfsIpcEndpointType type = DFLT_TYPE;
@@ -64,6 +67,9 @@ public class IgfsIpcEndpointConfiguration {
/** Token directory path. */
private String tokenDirPath = DFLT_TOKEN_DIR_PATH;
+ /** Thread count. */
+ private int threadCnt = DFLT_THREAD_CNT;
+
/**
* Default constructor.
*/
@@ -236,6 +242,28 @@ public class IgfsIpcEndpointConfiguration {
this.tokenDirPath = tokenDirPath;
}
+ /**
+ * Get number of threads used by this endpoint to process incoming requests.
+ * <p>
+ * Defaults to {@link #DFLT_THREAD_CNT}.
+ *
+ * @return Number of threads used by this endpoint to process incoming requests.
+ */
+ public int getThreadCount() {
+ return threadCnt;
+ }
+
+ /**
+ * Set number of threads used by this endpoint to process incoming requests.
+ * <p>
+ * See {@link #getThreadCount()} for more information.
+ *
+ * @param threadCnt Number of threads used by this endpoint to process incoming requests.
+ */
+ public void setThreadCount(int threadCnt) {
+ this.threadCnt = threadCnt;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgfsIpcEndpointConfiguration.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index eadbdb2..bf87384 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
import org.apache.ignite.igfs.IgfsOutOfSpaceException;
import org.apache.ignite.igfs.IgfsOutputStream;
import org.apache.ignite.igfs.IgfsUserContext;
@@ -31,20 +32,22 @@ import org.apache.ignite.internal.igfs.common.IgfsIpcCommand;
import org.apache.ignite.internal.igfs.common.IgfsMessage;
import org.apache.ignite.internal.igfs.common.IgfsPathControlRequest;
import org.apache.ignite.internal.igfs.common.IgfsStreamControlRequest;
-import org.apache.ignite.internal.processors.closure.GridClosurePolicy;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.lang.GridPlainCallable;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import java.io.Closeable;
import java.io.DataInput;
import java.io.IOException;
import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -70,6 +73,9 @@ class IgfsIpcHandler implements IgfsServerHandler {
/** Resource ID generator. */
private final AtomicLong rsrcIdGen = new AtomicLong();
+ /** Thread pool. */
+ private volatile IgniteThreadPoolExecutor pool;
+
/** Stopping flag. */
private volatile boolean stopping;
@@ -77,8 +83,10 @@ class IgfsIpcHandler implements IgfsServerHandler {
* Constructs IGFS IPC handler.
*
* @param igfsCtx Context.
+ * @param endpointCfg Endpoint configuration.
+ * @param mgmt Management flag.
*/
- IgfsIpcHandler(IgfsContext igfsCtx) {
+ IgfsIpcHandler(IgfsContext igfsCtx, IgfsIpcEndpointConfiguration endpointCfg, boolean mgmt) {
assert igfsCtx != null;
ctx = igfsCtx.kernalContext();
@@ -87,12 +95,24 @@ class IgfsIpcHandler implements IgfsServerHandler {
// Keep buffer size multiple of block size so no extra byte array copies is performed.
bufSize = igfsCtx.configuration().getBlockSize() * 2;
+ // Create thread pool for request handling.
+ int threadCnt = endpointCfg.getThreadCount();
+
+ String prefix = "igfs-" + igfsCtx.igfs().name() + (mgmt ? "mgmt-" : "") + "-ipc";
+
+ pool = new IgniteThreadPoolExecutor(prefix, igfsCtx.kernalContext().gridName(), threadCnt, threadCnt,
+ Long.MAX_VALUE, new LinkedBlockingQueue<Runnable>());
+
log = ctx.log(IgfsIpcHandler.class);
}
/** {@inheritDoc} */
@Override public void stop() throws IgniteCheckedException {
stopping = true;
+
+ U.shutdownNow(getClass(), pool, log);
+
+ pool = null;
}
/** {@inheritDoc} */
@@ -114,7 +134,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<IgfsMessage> handleAsync(final IgfsClientSession ses,
- final IgfsMessage msg, DataInput in) {
+ final IgfsMessage msg, final DataInput in) {
try {
// Even if will be closed right after this call, response write error will be ignored.
if (stopping)
@@ -130,21 +150,32 @@ class IgfsIpcHandler implements IgfsServerHandler {
case MAKE_DIRECTORIES:
case LIST_FILES:
case LIST_PATHS: {
- IgfsMessage res = execute(ses, cmd, msg, in);
-
- fut = res == null ? null : new GridFinishedFuture<>(res);
+ fut = executeSynchronously(ses, cmd, msg, in);
break;
}
- // Execute command asynchronously in user's pool.
+ // Execute command asynchronously in pool.
default: {
- fut = ctx.closure().callLocalSafe(new GridPlainCallable<IgfsMessage>() {
- @Override public IgfsMessage call() throws Exception {
- // No need to pass data input for non-write-block commands.
- return execute(ses, cmd, msg, null);
- }
- }, GridClosurePolicy.IGFS_POOL);
+ try {
+ final GridFutureAdapter<IgfsMessage> fut0 = new GridFutureAdapter<>();
+
+ pool.execute(new Runnable() {
+ @Override public void run() {
+ try {
+ fut0.onDone(execute(ses, cmd, msg, in));
+ }
+ catch (Exception e) {
+ fut0.onDone(e);
+ }
+ }
+ });
+
+ fut = fut0;
+ }
+ catch (RejectedExecutionException ignored) {
+ fut = executeSynchronously(ses, cmd, msg, in);
+ }
}
}
@@ -157,6 +188,23 @@ class IgfsIpcHandler implements IgfsServerHandler {
}
/**
+ * Execute operation synchronously.
+ *
+ * @param ses Session.
+ * @param cmd Command.
+ * @param msg Message.
+ * @param in Input.
+ * @return Result.
+ * @throws Exception If failed.
+ */
+ @Nullable private IgniteInternalFuture<IgfsMessage> executeSynchronously(IgfsClientSession ses,
+ IgfsIpcCommand cmd, IgfsMessage msg, DataInput in) throws Exception {
+ IgfsMessage res = execute(ses, cmd, msg, in);
+
+ return res == null ? null : new GridFinishedFuture<>(res);
+ }
+
+ /**
* Execute IGFS command.
*
* @param ses Client connection session.
@@ -167,8 +215,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
* @throws Exception If failed.
*/
private IgfsMessage execute(IgfsClientSession ses, IgfsIpcCommand cmd, IgfsMessage msg,
- @Nullable DataInput in)
- throws Exception {
+ @Nullable DataInput in) throws Exception {
switch (cmd) {
case HANDSHAKE:
return processHandshakeRequest((IgfsHandshakeRequest)msg);
@@ -495,8 +542,6 @@ class IgfsIpcHandler implements IgfsServerHandler {
}
case WRITE_BLOCK: {
- assert rsrcId != null : "Missing stream ID";
-
IgfsOutputStream out = (IgfsOutputStream)resource(ses, rsrcId);
if (out == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index 44f6e44..778de99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@ -26,6 +26,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.mapreduce.IgfsJob;
@@ -316,12 +317,18 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
throw new IgniteCheckedException("Invalid IGFS data cache configuration (key affinity mapper class should be " +
IgfsGroupDataBlocksKeyMapper.class.getSimpleName() + "): " + cfg);
- if (cfg.getIpcEndpointConfiguration() != null) {
- final int tcpPort = cfg.getIpcEndpointConfiguration().getPort();
+ IgfsIpcEndpointConfiguration ipcCfg = cfg.getIpcEndpointConfiguration();
+
+ if (ipcCfg != null) {
+ final int tcpPort = ipcCfg.getPort();
if (!(tcpPort >= MIN_TCP_PORT && tcpPort <= MAX_TCP_PORT))
throw new IgniteCheckedException("IGFS endpoint TCP port is out of range [" + MIN_TCP_PORT +
".." + MAX_TCP_PORT + "]: " + tcpPort);
+
+ if (ipcCfg.getThreadCount() <= 0)
+ throw new IgniteCheckedException("IGFS endpoint thread count must be positive: " +
+ ipcCfg.getThreadCount());
}
long maxSpaceSize = cfg.getMaxSpaceSize();
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
index 314baac..aa4b115 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
@@ -139,7 +139,7 @@ public class IgfsServer {
if (srvEndpoint.getPort() >= 0)
igfsCtx.kernalContext().ports().registerPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass());
- hnd = new IgfsIpcHandler(igfsCtx);
+ hnd = new IgfsIpcHandler(igfsCtx, endpointCfg, mgmt);
// Start client accept worker.
acceptWorker = new AcceptWorker();
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
index 27f47e8..0ce1036 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
@@ -469,6 +469,22 @@ public class IgfsProcessorValidationSelfTest extends IgfsCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testInvalidEndpointThreadCount() throws Exception {
+ final String failMsg = "IGFS endpoint thread count must be positive";
+ g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class));
+
+ final String igfsCfgName = "igfs-cfg";
+ final IgfsIpcEndpointConfiguration igfsEndpointCfg = new IgfsIpcEndpointConfiguration();
+ igfsEndpointCfg.setThreadCount(0);
+ g1IgfsCfg1.setName(igfsCfgName);
+ g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg);
+
+ checkGridStartFails(g1Cfg, failMsg, true);
+ }
+
+ /**
* Checks that the given grid configuration will lead to {@link IgniteCheckedException} upon grid startup.
*
* @param cfg Grid configuration to check.
[3/3] ignite git commit: Merge branch 'master' into ignite-gg-10994
Posted by vo...@apache.org.
Merge branch 'master' into ignite-gg-10994
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d27eee8c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d27eee8c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d27eee8c
Branch: refs/heads/ignite-gg-10994
Commit: d27eee8cc1b4339907784230d2653768349b9638
Parents: 5a7cb70 cfc7d4e
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 24 15:53:30 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 24 15:53:30 2016 +0300
----------------------------------------------------------------------
.../igfs/IgfsIpcEndpointConfiguration.java | 28 ++
.../managers/communication/GridIoManager.java | 11 +
.../managers/communication/GridIoPolicy.java | 3 +
.../processors/igfs/IgfsDataManager.java | 29 +-
.../processors/igfs/IgfsDeleteWorker.java | 2 +-
.../igfs/IgfsFragmentizerManager.java | 4 +-
.../processors/igfs/IgfsIpcHandler.java | 81 +++--
.../internal/processors/igfs/IgfsProcessor.java | 11 +-
.../internal/processors/igfs/IgfsServer.java | 2 +-
...lockMessageSystemPoolStarvationSelfTest.java | 299 +++++++++++++++++++
.../igfs/IgfsProcessorValidationSelfTest.java | 16 +
.../ignite/testsuites/IgniteIgfsTestSuite.java | 3 +
12 files changed, 446 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d27eee8c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------