You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/14 11:10:36 UTC

[20/50] ignite git commit: IGNITE-2629: ODBC: Requests are now processed asynchronously in separate thread pool. This closes #996.

IGNITE-2629: ODBC: Requests are now processed asynchronously in separate thread pool. This closes #996.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2c397d2f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2c397d2f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2c397d2f

Branch: refs/heads/ignite-3661
Commit: 2c397d2fb8851b25aa5f0a5589ad1deffbe7eee9
Parents: bf9371a
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Mon Sep 5 17:38:47 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Sep 5 17:38:47 2016 +0300

----------------------------------------------------------------------
 .../ignite/configuration/OdbcConfiguration.java | 34 +++++++++++++++-
 .../internal/processors/odbc/OdbcProcessor.java | 41 ++++++++++++++++++--
 .../odbc/OdbcProcessorValidationSelfTest.java   | 16 +++++++-
 3 files changed, 83 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2c397d2f/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
index c098e09..8fe1665 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
@@ -38,6 +38,9 @@ public class OdbcConfiguration {
     /** Default max number of open cursors per connection. */
     public static final int DFLT_MAX_OPEN_CURSORS = 128;
 
+    /** Default size of thread pool. */
+    public static final int DFLT_THREAD_POOL_SIZE = IgniteConfiguration.DFLT_PUBLIC_THREAD_CNT;
+
     /** Endpoint address. */
     private String endpointAddr;
 
@@ -50,6 +53,9 @@ public class OdbcConfiguration {
     /** Max number of opened cursors per connection. */
     private int maxOpenCursors = DFLT_MAX_OPEN_CURSORS;
 
+    /** Thread pool size. */
+    private int threadPoolSize = DFLT_THREAD_POOL_SIZE;
+
     /**
      * Creates ODBC server configuration with all default values.
      */
@@ -58,8 +64,7 @@ public class OdbcConfiguration {
     }
 
     /**
-     * Creates ODBC server configuration by copying all properties from
-     * given configuration.
+     * Creates ODBC server configuration by copying all properties from given configuration.
      *
      * @param cfg ODBC server configuration.
      */
@@ -70,6 +75,7 @@ public class OdbcConfiguration {
         maxOpenCursors = cfg.getMaxOpenCursors();
         sockRcvBufSize = cfg.getSocketReceiveBufferSize();
         sockSndBufSize = cfg.getSocketSendBufferSize();
+        threadPoolSize = cfg.getThreadPoolSize();
     }
 
     /**
@@ -175,6 +181,30 @@ public class OdbcConfiguration {
         return this;
     }
 
+    /**
+     * Size of thread pool that is in charge of processing ODBC tasks.
+     * <p>
+     * Defaults {@link #DFLT_THREAD_POOL_SIZE}.
+     *
+     * @return Thread pool that is in charge of processing ODBC tasks.
+     */
+    public int getThreadPoolSize() {
+        return threadPoolSize;
+    }
+
+    /**
+     * Sets thread pool that is in charge of processing ODBC tasks. See {@link #getThreadPoolSize()} for more
+     * information.
+     *
+     * @param threadPoolSize Thread pool that is in charge of processing ODBC tasks.
+     * @return This instance for chaining.
+     */
+    public OdbcConfiguration setThreadPoolSize(int threadPoolSize) {
+        this.threadPoolSize = threadPoolSize;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(OdbcConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2c397d2f/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
index adfdc22..a672d7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
@@ -18,20 +18,27 @@
 package org.apache.ignite.internal.processors.odbc;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.OdbcConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.HostAndPortRange;
+import org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter;
 import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.spi.IgnitePortProtocol;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 
 import java.net.InetAddress;
 import java.nio.ByteOrder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * ODBC processor.
@@ -49,9 +56,12 @@ public class OdbcProcessor extends GridProcessorAdapter {
     /** Busy lock. */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
-    /** OBCD TCP Server. */
+    /** ODBC TCP Server. */
     private GridNioServer<byte[]> srv;
 
+    /** ODBC executor service. */
+    private ExecutorService odbcExecSvc;
+
     /**
      * @param ctx Kernal context.
      */
@@ -61,11 +71,13 @@ public class OdbcProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        OdbcConfiguration odbcCfg = ctx.config().getOdbcConfiguration();
+        IgniteConfiguration cfg = ctx.config();
+
+        OdbcConfiguration odbcCfg = cfg.getOdbcConfiguration();
 
         if (odbcCfg != null) {
             try {
-                Marshaller marsh = ctx.config().getMarshaller();
+                Marshaller marsh = cfg.getMarshaller();
 
                 if (marsh != null && !(marsh instanceof BinaryMarshaller))
                     throw new IgniteCheckedException("ODBC can only be used with BinaryMarshaller (please set it " +
@@ -87,6 +99,16 @@ public class OdbcProcessor extends GridProcessorAdapter {
                     );
                 }
 
+                assertParameter(odbcCfg.getThreadPoolSize() > 0, "threadPoolSize > 0");
+
+                odbcExecSvc = new IgniteThreadPoolExecutor(
+                    "odbc",
+                    cfg.getGridName(),
+                    odbcCfg.getThreadPoolSize(),
+                    odbcCfg.getThreadPoolSize(),
+                    0,
+                    new LinkedBlockingQueue<Runnable>());
+
                 InetAddress host;
 
                 try {
@@ -100,6 +122,11 @@ public class OdbcProcessor extends GridProcessorAdapter {
 
                 for (int port = hostPort.portFrom(); port <= hostPort.portTo(); port++) {
                     try {
+                        GridNioFilter[] filters = new GridNioFilter[] {
+                            new GridNioAsyncNotifyFilter(ctx.gridName(), odbcExecSvc, log),
+                            new GridNioCodecFilter(new OdbcBufferedParser(), log, false)
+                        };
+
                         GridNioServer<byte[]> srv0 = GridNioServer.<byte[]>builder()
                             .address(host)
                             .port(port)
@@ -112,7 +139,7 @@ public class OdbcProcessor extends GridProcessorAdapter {
                             .byteOrder(ByteOrder.nativeOrder())
                             .socketSendBufferSize(odbcCfg.getSocketSendBufferSize())
                             .socketReceiveBufferSize(odbcCfg.getSocketReceiveBufferSize())
-                            .filters(new GridNioCodecFilter(new OdbcBufferedParser(), log, false))
+                            .filters(filters)
                             .directMode(false)
                             .build();
 
@@ -154,6 +181,12 @@ public class OdbcProcessor extends GridProcessorAdapter {
 
             ctx.ports().deregisterPorts(getClass());
 
+            if (odbcExecSvc != null) {
+                U.shutdownNow(getClass(), odbcExecSvc, log);
+
+                odbcExecSvc = null;
+            }
+
             if (log.isDebugEnabled())
                 log.debug("ODBC processor stopped.");
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2c397d2f/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcProcessorValidationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcProcessorValidationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcProcessorValidationSelfTest.java
index bb08c6c..aaee2a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcProcessorValidationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcProcessorValidationSelfTest.java
@@ -134,11 +134,22 @@ public class OdbcProcessorValidationSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Test thread pool size.
+     *
+     * @throws Exception If failed.
+     */
+    public void testThreadPoolSize() throws Exception {
+        check(new OdbcConfiguration().setThreadPoolSize(0), false);
+        check(new OdbcConfiguration().setThreadPoolSize(-1), false);
+
+        check(new OdbcConfiguration().setThreadPoolSize(4), true);
+    }
+
+    /**
      * Perform check.
      *
      * @param odbcCfg ODBC configuration.
-     * @param success Success flag.
-     * @throws Exception If failed.
+     * @param success Success flag. * @throws Exception If failed.
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
     private void check(OdbcConfiguration odbcCfg, boolean success) throws Exception {
@@ -160,4 +171,5 @@ public class OdbcProcessorValidationSelfTest extends GridCommonAbstractTest {
             }, IgniteException.class, null);
         }
     }
+
 }