You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/09/12 08:26:35 UTC
[21/46] ignite git commit: IGNITE-2629: ODBC: Requests are now
processed asynchronously in separate thread pool. This closes #996.
IGNITE-2629: ODBC: Requests are now processed asynchronously in separate thread pool. This closes #996.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2c397d2f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2c397d2f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2c397d2f
Branch: refs/heads/master
Commit: 2c397d2fb8851b25aa5f0a5589ad1deffbe7eee9
Parents: bf9371a
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Mon Sep 5 17:38:47 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Sep 5 17:38:47 2016 +0300
----------------------------------------------------------------------
.../ignite/configuration/OdbcConfiguration.java | 34 +++++++++++++++-
.../internal/processors/odbc/OdbcProcessor.java | 41 ++++++++++++++++++--
.../odbc/OdbcProcessorValidationSelfTest.java | 16 +++++++-
3 files changed, 83 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2c397d2f/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
index c098e09..8fe1665 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
@@ -38,6 +38,9 @@ public class OdbcConfiguration {
/** Default max number of open cursors per connection. */
public static final int DFLT_MAX_OPEN_CURSORS = 128;
+ /** Default size of thread pool. */
+ public static final int DFLT_THREAD_POOL_SIZE = IgniteConfiguration.DFLT_PUBLIC_THREAD_CNT;
+
/** Endpoint address. */
private String endpointAddr;
@@ -50,6 +53,9 @@ public class OdbcConfiguration {
/** Max number of opened cursors per connection. */
private int maxOpenCursors = DFLT_MAX_OPEN_CURSORS;
+ /** Thread pool size. */
+ private int threadPoolSize = DFLT_THREAD_POOL_SIZE;
+
/**
* Creates ODBC server configuration with all default values.
*/
@@ -58,8 +64,7 @@ public class OdbcConfiguration {
}
/**
- * Creates ODBC server configuration by copying all properties from
- * given configuration.
+ * Creates ODBC server configuration by copying all properties from given configuration.
*
* @param cfg ODBC server configuration.
*/
@@ -70,6 +75,7 @@ public class OdbcConfiguration {
maxOpenCursors = cfg.getMaxOpenCursors();
sockRcvBufSize = cfg.getSocketReceiveBufferSize();
sockSndBufSize = cfg.getSocketSendBufferSize();
+ threadPoolSize = cfg.getThreadPoolSize();
}
/**
@@ -175,6 +181,30 @@ public class OdbcConfiguration {
return this;
}
+ /**
+ * Size of thread pool that is in charge of processing ODBC tasks.
+ * <p>
+ * Defaults {@link #DFLT_THREAD_POOL_SIZE}.
+ *
+ * @return Thread pool that is in charge of processing ODBC tasks.
+ */
+ public int getThreadPoolSize() {
+ return threadPoolSize;
+ }
+
+ /**
+ * Sets thread pool that is in charge of processing ODBC tasks. See {@link #getThreadPoolSize()} for more
+ * information.
+ *
+ * @param threadPoolSize Thread pool that is in charge of processing ODBC tasks.
+ * @return This instance for chaining.
+ */
+ public OdbcConfiguration setThreadPoolSize(int threadPoolSize) {
+ this.threadPoolSize = threadPoolSize;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(OdbcConfiguration.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2c397d2f/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
index adfdc22..a672d7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
@@ -18,20 +18,27 @@
package org.apache.ignite.internal.processors.odbc;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.OdbcConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.HostAndPortRange;
+import org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.IgnitePortProtocol;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import java.net.InetAddress;
import java.nio.ByteOrder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* ODBC processor.
@@ -49,9 +56,12 @@ public class OdbcProcessor extends GridProcessorAdapter {
/** Busy lock. */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
- /** OBCD TCP Server. */
+ /** ODBC TCP Server. */
private GridNioServer<byte[]> srv;
+ /** ODBC executor service. */
+ private ExecutorService odbcExecSvc;
+
/**
* @param ctx Kernal context.
*/
@@ -61,11 +71,13 @@ public class OdbcProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
- OdbcConfiguration odbcCfg = ctx.config().getOdbcConfiguration();
+ IgniteConfiguration cfg = ctx.config();
+
+ OdbcConfiguration odbcCfg = cfg.getOdbcConfiguration();
if (odbcCfg != null) {
try {
- Marshaller marsh = ctx.config().getMarshaller();
+ Marshaller marsh = cfg.getMarshaller();
if (marsh != null && !(marsh instanceof BinaryMarshaller))
throw new IgniteCheckedException("ODBC can only be used with BinaryMarshaller (please set it " +
@@ -87,6 +99,16 @@ public class OdbcProcessor extends GridProcessorAdapter {
);
}
+ assertParameter(odbcCfg.getThreadPoolSize() > 0, "threadPoolSize > 0");
+
+ odbcExecSvc = new IgniteThreadPoolExecutor(
+ "odbc",
+ cfg.getGridName(),
+ odbcCfg.getThreadPoolSize(),
+ odbcCfg.getThreadPoolSize(),
+ 0,
+ new LinkedBlockingQueue<Runnable>());
+
InetAddress host;
try {
@@ -100,6 +122,11 @@ public class OdbcProcessor extends GridProcessorAdapter {
for (int port = hostPort.portFrom(); port <= hostPort.portTo(); port++) {
try {
+ GridNioFilter[] filters = new GridNioFilter[] {
+ new GridNioAsyncNotifyFilter(ctx.gridName(), odbcExecSvc, log),
+ new GridNioCodecFilter(new OdbcBufferedParser(), log, false)
+ };
+
GridNioServer<byte[]> srv0 = GridNioServer.<byte[]>builder()
.address(host)
.port(port)
@@ -112,7 +139,7 @@ public class OdbcProcessor extends GridProcessorAdapter {
.byteOrder(ByteOrder.nativeOrder())
.socketSendBufferSize(odbcCfg.getSocketSendBufferSize())
.socketReceiveBufferSize(odbcCfg.getSocketReceiveBufferSize())
- .filters(new GridNioCodecFilter(new OdbcBufferedParser(), log, false))
+ .filters(filters)
.directMode(false)
.build();
@@ -154,6 +181,12 @@ public class OdbcProcessor extends GridProcessorAdapter {
ctx.ports().deregisterPorts(getClass());
+ if (odbcExecSvc != null) {
+ U.shutdownNow(getClass(), odbcExecSvc, log);
+
+ odbcExecSvc = null;
+ }
+
if (log.isDebugEnabled())
log.debug("ODBC processor stopped.");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2c397d2f/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcProcessorValidationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcProcessorValidationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcProcessorValidationSelfTest.java
index bb08c6c..aaee2a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcProcessorValidationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/OdbcProcessorValidationSelfTest.java
@@ -134,11 +134,22 @@ public class OdbcProcessorValidationSelfTest extends GridCommonAbstractTest {
}
/**
+ * Test thread pool size.
+ *
+ * @throws Exception If failed.
+ */
+ public void testThreadPoolSize() throws Exception {
+ check(new OdbcConfiguration().setThreadPoolSize(0), false);
+ check(new OdbcConfiguration().setThreadPoolSize(-1), false);
+
+ check(new OdbcConfiguration().setThreadPoolSize(4), true);
+ }
+
+ /**
* Perform check.
*
* @param odbcCfg ODBC configuration.
- * @param success Success flag.
- * @throws Exception If failed.
+ * @param success Success flag. * @throws Exception If failed.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
private void check(OdbcConfiguration odbcCfg, boolean success) throws Exception {
@@ -160,4 +171,5 @@ public class OdbcProcessorValidationSelfTest extends GridCommonAbstractTest {
}, IgniteException.class, null);
}
}
+
}