You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/03/24 14:14:10 UTC

[09/11] ignite git commit: IGNITE-2883: IGFS: Now IPC messages are handled in a dedicated thread pool.

IGNITE-2883: IGFS: Now IPC messages are handled in a dedicated thread pool.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cfc7d4ee
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cfc7d4ee
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cfc7d4ee

Branch: refs/heads/ignite-2004
Commit: cfc7d4eec255d0fe720398882d0c058f6821611a
Parents: bc14c80
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 24 14:29:35 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 24 14:29:35 2016 +0300

----------------------------------------------------------------------
 .../igfs/IgfsIpcEndpointConfiguration.java      | 28 +++++++
 .../processors/igfs/IgfsIpcHandler.java         | 81 +++++++++++++++-----
 .../internal/processors/igfs/IgfsProcessor.java | 11 ++-
 .../internal/processors/igfs/IgfsServer.java    |  2 +-
 .../igfs/IgfsProcessorValidationSelfTest.java   | 16 ++++
 5 files changed, 117 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java
index 23993a6..1c68d0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java
@@ -49,6 +49,9 @@ public class IgfsIpcEndpointConfiguration {
      */
     public static final String DFLT_TOKEN_DIR_PATH = "ipc/shmem";
 
+    /** Default threads count. */
+    public static final int DFLT_THREAD_CNT = IgniteConfiguration.AVAILABLE_PROC_CNT;
+
     /** Endpoint type. */
     private IgfsIpcEndpointType type = DFLT_TYPE;
 
@@ -64,6 +67,9 @@ public class IgfsIpcEndpointConfiguration {
     /** Token directory path. */
     private String tokenDirPath = DFLT_TOKEN_DIR_PATH;
 
+    /** Thread count. */
+    private int threadCnt = DFLT_THREAD_CNT;
+
     /**
      * Default constructor.
      */
@@ -236,6 +242,28 @@ public class IgfsIpcEndpointConfiguration {
         this.tokenDirPath = tokenDirPath;
     }
 
+    /**
+     * Get number of threads used by this endpoint to process incoming requests.
+     * <p>
+     * Defaults to {@link #DFLT_THREAD_CNT}.
+     *
+     * @return Number of threads used by this endpoint to process incoming requests.
+     */
+    public int getThreadCount() {
+        return threadCnt;
+    }
+
+    /**
+     * Set number of threads used by this endpoint to process incoming requests.
+     * <p>
+     * See {@link #getThreadCount()} for more information.
+     *
+     * @param threadCnt Number of threads used by this endpoint to process incoming requests.
+     */
+    public void setThreadCount(int threadCnt) {
+        this.threadCnt = threadCnt;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgfsIpcEndpointConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index eadbdb2..bf87384 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.igfs;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
 import org.apache.ignite.igfs.IgfsOutOfSpaceException;
 import org.apache.ignite.igfs.IgfsOutputStream;
 import org.apache.ignite.igfs.IgfsUserContext;
@@ -31,20 +32,22 @@ import org.apache.ignite.internal.igfs.common.IgfsIpcCommand;
 import org.apache.ignite.internal.igfs.common.IgfsMessage;
 import org.apache.ignite.internal.igfs.common.IgfsPathControlRequest;
 import org.apache.ignite.internal.igfs.common.IgfsStreamControlRequest;
-import org.apache.ignite.internal.processors.closure.GridClosurePolicy;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.lang.GridPlainCallable;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 
 import java.io.Closeable;
 import java.io.DataInput;
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -70,6 +73,9 @@ class IgfsIpcHandler implements IgfsServerHandler {
     /** Resource ID generator. */
     private final AtomicLong rsrcIdGen = new AtomicLong();
 
+    /** Thread pool. */
+    private volatile IgniteThreadPoolExecutor pool;
+
     /** Stopping flag. */
     private volatile boolean stopping;
 
@@ -77,8 +83,10 @@ class IgfsIpcHandler implements IgfsServerHandler {
      * Constructs IGFS IPC handler.
      *
      * @param igfsCtx Context.
+     * @param endpointCfg Endpoint configuration.
+     * @param mgmt Management flag.
      */
-    IgfsIpcHandler(IgfsContext igfsCtx) {
+    IgfsIpcHandler(IgfsContext igfsCtx, IgfsIpcEndpointConfiguration endpointCfg, boolean mgmt) {
         assert igfsCtx != null;
 
         ctx = igfsCtx.kernalContext();
@@ -87,12 +95,24 @@ class IgfsIpcHandler implements IgfsServerHandler {
         // Keep buffer size multiple of block size so no extra byte array copies is performed.
         bufSize = igfsCtx.configuration().getBlockSize() * 2;
 
+        // Create thread pool for request handling.
+        int threadCnt = endpointCfg.getThreadCount();
+
+        String prefix = "igfs-" + igfsCtx.igfs().name() + (mgmt ? "mgmt-" : "") + "-ipc";
+
+        pool = new IgniteThreadPoolExecutor(prefix, igfsCtx.kernalContext().gridName(), threadCnt, threadCnt,
+            Long.MAX_VALUE, new LinkedBlockingQueue<Runnable>());
+
         log = ctx.log(IgfsIpcHandler.class);
     }
 
     /** {@inheritDoc} */
     @Override public void stop() throws IgniteCheckedException {
         stopping = true;
+
+        U.shutdownNow(getClass(), pool, log);
+
+        pool = null;
     }
 
     /** {@inheritDoc} */
@@ -114,7 +134,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<IgfsMessage> handleAsync(final IgfsClientSession ses,
-        final IgfsMessage msg, DataInput in) {
+        final IgfsMessage msg, final DataInput in) {
         try {
             // Even if will be closed right after this call, response write error will be ignored.
             if (stopping)
@@ -130,21 +150,32 @@ class IgfsIpcHandler implements IgfsServerHandler {
                 case MAKE_DIRECTORIES:
                 case LIST_FILES:
                 case LIST_PATHS: {
-                    IgfsMessage res = execute(ses, cmd, msg, in);
-
-                    fut = res == null ? null : new GridFinishedFuture<>(res);
+                    fut = executeSynchronously(ses, cmd, msg, in);
 
                     break;
                 }
 
-                // Execute command asynchronously in user's pool.
+                // Execute command asynchronously in pool.
                 default: {
-                    fut = ctx.closure().callLocalSafe(new GridPlainCallable<IgfsMessage>() {
-                        @Override public IgfsMessage call() throws Exception {
-                            // No need to pass data input for non-write-block commands.
-                            return execute(ses, cmd, msg, null);
-                        }
-                    }, GridClosurePolicy.IGFS_POOL);
+                    try {
+                        final GridFutureAdapter<IgfsMessage> fut0 = new GridFutureAdapter<>();
+
+                        pool.execute(new Runnable() {
+                            @Override public void run()  {
+                                try {
+                                    fut0.onDone(execute(ses, cmd, msg, in));
+                                }
+                                catch (Exception e) {
+                                    fut0.onDone(e);
+                                }
+                            }
+                        });
+
+                        fut = fut0;
+                    }
+                    catch (RejectedExecutionException ignored) {
+                        fut = executeSynchronously(ses, cmd, msg, in);
+                    }
                 }
             }
 
@@ -157,6 +188,23 @@ class IgfsIpcHandler implements IgfsServerHandler {
     }
 
     /**
+     * Execute operation synchronously.
+     *
+     * @param ses Session.
+     * @param cmd Command.
+     * @param msg Message.
+     * @param in Input.
+     * @return Result.
+     * @throws Exception If failed.
+     */
+    @Nullable private IgniteInternalFuture<IgfsMessage> executeSynchronously(IgfsClientSession ses,
+        IgfsIpcCommand cmd, IgfsMessage msg, DataInput in) throws Exception {
+        IgfsMessage res = execute(ses, cmd, msg, in);
+
+        return res == null ? null : new GridFinishedFuture<>(res);
+    }
+
+    /**
      * Execute IGFS command.
      *
      * @param ses Client connection session.
@@ -167,8 +215,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
      * @throws Exception If failed.
      */
     private IgfsMessage execute(IgfsClientSession ses, IgfsIpcCommand cmd, IgfsMessage msg,
-        @Nullable DataInput in)
-        throws Exception {
+        @Nullable DataInput in) throws Exception {
         switch (cmd) {
             case HANDSHAKE:
                 return processHandshakeRequest((IgfsHandshakeRequest)msg);
@@ -495,8 +542,6 @@ class IgfsIpcHandler implements IgfsServerHandler {
             }
 
             case WRITE_BLOCK: {
-                assert rsrcId != null : "Missing stream ID";
-
                 IgfsOutputStream out = (IgfsOutputStream)resource(ses, rsrcId);
 
                 if (out == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index 44f6e44..778de99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@ -26,6 +26,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
 import org.apache.ignite.igfs.IgfsMode;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.mapreduce.IgfsJob;
@@ -316,12 +317,18 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
                 throw new IgniteCheckedException("Invalid IGFS data cache configuration (key affinity mapper class should be " +
                     IgfsGroupDataBlocksKeyMapper.class.getSimpleName() + "): " + cfg);
 
-            if (cfg.getIpcEndpointConfiguration() != null) {
-                final int tcpPort = cfg.getIpcEndpointConfiguration().getPort();
+            IgfsIpcEndpointConfiguration ipcCfg = cfg.getIpcEndpointConfiguration();
+
+            if (ipcCfg != null) {
+                final int tcpPort = ipcCfg.getPort();
 
                 if (!(tcpPort >= MIN_TCP_PORT && tcpPort <= MAX_TCP_PORT))
                     throw new IgniteCheckedException("IGFS endpoint TCP port is out of range [" + MIN_TCP_PORT +
                         ".." + MAX_TCP_PORT + "]: " + tcpPort);
+
+                if (ipcCfg.getThreadCount() <= 0)
+                    throw new IgniteCheckedException("IGFS endpoint thread count must be positive: " +
+                        ipcCfg.getThreadCount());
             }
 
             long maxSpaceSize = cfg.getMaxSpaceSize();

http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
index 314baac..aa4b115 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
@@ -139,7 +139,7 @@ public class IgfsServer {
         if (srvEndpoint.getPort() >= 0)
             igfsCtx.kernalContext().ports().registerPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass());
 
-        hnd = new IgfsIpcHandler(igfsCtx);
+        hnd = new IgfsIpcHandler(igfsCtx, endpointCfg, mgmt);
 
         // Start client accept worker.
         acceptWorker = new AcceptWorker();

http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
index 27f47e8..0ce1036 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
@@ -469,6 +469,22 @@ public class IgfsProcessorValidationSelfTest extends IgfsCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testInvalidEndpointThreadCount() throws Exception {
+        final String failMsg = "IGFS endpoint thread count must be positive";
+        g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class));
+
+        final String igfsCfgName = "igfs-cfg";
+        final IgfsIpcEndpointConfiguration igfsEndpointCfg = new IgfsIpcEndpointConfiguration();
+        igfsEndpointCfg.setThreadCount(0);
+        g1IgfsCfg1.setName(igfsCfgName);
+        g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg);
+
+        checkGridStartFails(g1Cfg, failMsg, true);
+    }
+
+    /**
      * Checks that the given grid configuration will lead to {@link IgniteCheckedException} upon grid startup.
      *
      * @param cfg Grid configuration to check.