You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/18 14:54:56 UTC
ignite git commit: IGNITE-2643: Fixed.
Repository: ignite
Updated Branches:
refs/heads/ignite-1786 fe63a9086 -> 6a7cb0a9c
IGNITE-2643: Fixed.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a7cb0a9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a7cb0a9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a7cb0a9
Branch: refs/heads/ignite-1786
Commit: 6a7cb0a9c242708c0e267465e812a0ca90bd8459
Parents: fe63a90
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Feb 18 16:54:45 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Feb 18 16:54:45 2016 +0300
----------------------------------------------------------------------
.../ignite/configuration/OdbcConfiguration.java | 27 +++++++
.../processors/odbc/OdbcNioListener.java | 76 +++++---------------
.../internal/processors/odbc/OdbcProcessor.java | 2 +-
.../processors/odbc/OdbcRequestHandler.java | 69 ++++++++++++------
4 files changed, 95 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6a7cb0a9/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
index d6f0500..8f0a0fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
@@ -38,6 +38,9 @@ public class OdbcConfiguration {
/** Default socket send and receive buffer size. */
public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
+ /** Default max number of open cursors per connection. */
+ public static final int DFLT_MAX_OPEN_CURSORS = 128;
+
/** TCP port. */
private int port = DFLT_TCP_PORT;
@@ -65,6 +68,9 @@ public class OdbcConfiguration {
/** Idle timeout. */
private long idleTimeout = DFLT_IDLE_TIMEOUT;
+ /** Max number of opened cursors per connection. */
+ private int maxOpenCursors = DFLT_MAX_OPEN_CURSORS;
+
/**
* Creates ODBC server configuration with all default values.
*/
@@ -84,6 +90,7 @@ public class OdbcConfiguration {
directBuf = cfg.isDirectBuffer();
host = cfg.getHost();
idleTimeout = cfg.getIdleTimeout();
+ maxOpenCursors = cfg.getMaxOpenCursors();
noDelay = cfg.isNoDelay();
port = cfg.getPort();
rcvBufSize = cfg.getReceiveBufferSize();
@@ -289,4 +296,24 @@ public class OdbcConfiguration {
public void setIdleTimeout(long idleTimeout) {
this.idleTimeout = idleTimeout;
}
+
+ /**
+ * Gets maximum number of opened cursors per connection.
+ * <p>
+ * Defaults to {@link #DFLT_MAX_OPEN_CURSORS}.
+ *
+ * @return Maximum number of opened cursors.
+ */
+ public int getMaxOpenCursors() {
+ return maxOpenCursors;
+ }
+
+ /**
+ * Sets maximum number of opened cursors per connection. See {@link #getMaxOpenCursors()}.
+ *
+ * @param maxOpenCursors Maximum number of opened cursors.
+ */
+ public void setMaxOpenCursors(int maxOpenCursors) {
+ this.maxOpenCursors = maxOpenCursors;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6a7cb0a9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
index 23560b1..e25ae5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.jetbrains.annotations.Nullable;
import java.io.IOException;
@@ -42,14 +43,17 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
/** Initial output stream capacity. */
private static final int INIT_CAP = 1024;
+ /** Handler metadata key. */
+ private static final int HANDLER_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
/** Request ID generator. */
private static final AtomicLong REQ_ID_GEN = new AtomicLong();
/** Busy lock. */
private final GridSpinBusyLock busyLock;
- /** Request handler. */
- private final OdbcRequestHandler handler;
+ /** Kernal context. */
+ private final GridKernalContext ctx;
/** Marshaller. */
private final GridBinaryMarshaller marsh;
@@ -60,15 +64,14 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
/**
* @param ctx Context.
* @param busyLock Shutdown busy lock.
- * @param handler Request handler.
*/
- public OdbcNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock, OdbcRequestHandler handler) {
+ public OdbcNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock) {
+ this.ctx = ctx;
this.busyLock = busyLock;
- this.handler = handler;
CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects();
- marsh = cacheObjProc.marshaller();
+ this.marsh = cacheObjProc.marshaller();
this.log = ctx.log(getClass());
}
@@ -77,6 +80,8 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
@Override public void onConnected(GridNioSession ses) {
if (log.isDebugEnabled())
log.debug("ODBC client connected: " + ses.remoteAddress());
+
+ ses.addMeta(HANDLER_META_KEY, new OdbcRequestHandler(ctx, busyLock));
}
/** {@inheritDoc} */
@@ -102,10 +107,15 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
if (log.isDebugEnabled()) {
startTime = System.nanoTime();
- log.debug("ODBC request received [id=" + reqId + ", addr=" + ses.remoteAddress() + ", req=" + req + ']');
+ log.debug("ODBC request received [id=" + reqId + ", addr=" + ses.remoteAddress() +
+ ", req=" + req + ']');
}
- OdbcResponse resp = handle(req);
+ OdbcRequestHandler handler = ses.meta(HANDLER_META_KEY);
+
+ assert handler != null;
+
+ OdbcResponse resp = handler.handle(req);
if (log.isDebugEnabled()) {
long dur = (System.nanoTime() - startTime) / 1000;
@@ -160,16 +170,10 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
switch (cmd) {
case OdbcRequest.EXECUTE_SQL_QUERY: {
-
String cache = reader.readString();
String sql = reader.readString();
int argsNum = reader.readInt();
- if (log.isDebugEnabled()) {
- log.debug("Message: [cmd=EXECUTE_SQL_QUERY, cache=" + cache +
- ", query=" + sql + ", argsNum=" + argsNum + ']');
- }
-
Object[] params = new Object[argsNum];
for (int i = 0; i < argsNum; ++i)
@@ -181,26 +185,17 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
}
case OdbcRequest.FETCH_SQL_QUERY: {
-
long queryId = reader.readLong();
int pageSize = reader.readInt();
- if (log.isDebugEnabled())
- log.debug("Message: [cmd=FETCH_SQL_QUERY, queryId=" + queryId + ", pageSize=" + pageSize + ']');
-
res = new OdbcQueryFetchRequest(queryId, pageSize);
break;
}
case OdbcRequest.CLOSE_SQL_QUERY: {
-
long queryId = reader.readLong();
- if (log.isDebugEnabled()) {
- log.debug("Message: [cmd=CLOSE_SQL_QUERY, queryId=" + queryId + ']');
- }
-
res = new OdbcQueryCloseRequest(queryId);
break;
@@ -212,36 +207,24 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
String table = reader.readString();
String column = reader.readString();
- if (log.isDebugEnabled()) {
- log.debug("Message: [cmd=GET_COLUMNS_META, cache=" + cache +
- ", table=" + table + ", column: " + column + ']');
- }
-
res = new OdbcQueryGetColumnsMetaRequest(cache, table, column);
break;
}
case OdbcRequest.GET_TABLES_META: {
-
String catalog = reader.readString();
String schema = reader.readString();
String table = reader.readString();
String tableType = reader.readString();
- if (log.isDebugEnabled()) {
- log.debug("Message: [cmd=GET_COLUMNS_META, catalog=" + catalog +
- ", schema=" + schema + ", table=" + table + ", tableType=" + tableType + ']');
- }
-
res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType);
break;
}
default:
- throw new IOException("Failed to parse incoming packet (unknown command type) " +
- "[cmd=[" + Byte.toString(cmd) + ']');
+ throw new IOException("Unknown ODBC command: " + cmd);
}
return res;
@@ -352,25 +335,4 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
return writer.array();
}
-
- /**
- * Handle request.
- *
- * @param req Request.
- * @return Response.
- */
- private OdbcResponse handle(OdbcRequest req) {
- assert req != null;
-
- if (!busyLock.enterBusy())
- return new OdbcResponse(OdbcResponse.STATUS_FAILED,
- "Failed to handle ODBC request because node is stopping: " + req);
-
- try {
- return handler.handle(req);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6a7cb0a9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
index 9d22e4a..87be686 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
@@ -73,7 +73,7 @@ public class OdbcProcessor extends GridProcessorAdapter {
srv = GridNioServer.<byte[]>builder()
.address(host)
.port(port)
- .listener(new OdbcNioListener(ctx, busyLock, new OdbcRequestHandler(ctx)))
+ .listener(new OdbcNioListener(ctx, busyLock))
.logger(log)
.selectorCount(odbcCfg.getSelectorCount())
.gridName(ctx.gridName())
http://git-wip-us.apache.org/repos/asf/ignite/blob/6a7cb0a9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
index 332a5cc..1af14b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.processors.odbc;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.OdbcConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -43,16 +45,21 @@ public class OdbcRequestHandler {
/** Kernel context. */
private final GridKernalContext ctx;
+ /** Busy lock. */
+ private final GridSpinBusyLock busyLock;
+
/** Current queries cursors. */
- private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCursors = new ConcurrentHashMap<>();
/**
* Constructor.
*
* @param ctx Context.
+ * @param busyLock Shutdown latch.
*/
- public OdbcRequestHandler(final GridKernalContext ctx) {
+ public OdbcRequestHandler(final GridKernalContext ctx, final GridSpinBusyLock busyLock) {
this.ctx = ctx;
+ this.busyLock = busyLock;
}
/**
@@ -64,24 +71,33 @@ public class OdbcRequestHandler {
public OdbcResponse handle(OdbcRequest req) {
assert req != null;
- switch (req.command()) {
- case EXECUTE_SQL_QUERY:
- return executeQuery((OdbcQueryExecuteRequest)req);
+ if (!busyLock.enterBusy())
+ return new OdbcResponse(OdbcResponse.STATUS_FAILED,
+ "Failed to handle ODBC request because node is stopping: " + req);
- case FETCH_SQL_QUERY:
- return fetchQuery((OdbcQueryFetchRequest)req);
+ try {
+ switch (req.command()) {
+ case EXECUTE_SQL_QUERY:
+ return executeQuery((OdbcQueryExecuteRequest)req);
- case CLOSE_SQL_QUERY:
- return closeQuery((OdbcQueryCloseRequest)req);
+ case FETCH_SQL_QUERY:
+ return fetchQuery((OdbcQueryFetchRequest)req);
- case GET_COLUMNS_META:
- return getColumnsMeta((OdbcQueryGetColumnsMetaRequest) req);
+ case CLOSE_SQL_QUERY:
+ return closeQuery((OdbcQueryCloseRequest)req);
- case GET_TABLES_META:
- return getTablesMeta((OdbcQueryGetTablesMetaRequest) req);
- }
+ case GET_COLUMNS_META:
+ return getColumnsMeta((OdbcQueryGetColumnsMetaRequest) req);
- return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Unsupported ODBC request: " + req);
+ case GET_TABLES_META:
+ return getTablesMeta((OdbcQueryGetTablesMetaRequest) req);
+ }
+
+ return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Unsupported ODBC request: " + req);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -91,6 +107,17 @@ public class OdbcRequestHandler {
* @return Response.
*/
private OdbcResponse executeQuery(OdbcQueryExecuteRequest req) {
+ OdbcConfiguration cfg = ctx.config().getOdbcConfiguration();
+
+ assert cfg != null;
+
+ int cursorCnt = qryCursors.size();
+
+ if (cursorCnt >= cfg.getMaxOpenCursors())
+ return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Too many opened cursors (either close other " +
+ "opened cursors or increase the limit through OdbcConfiguration.setMaxOpenCursors()) " +
+ "[maximum=" + cfg.getMaxOpenCursors() + ", current=" + cursorCnt + ']');
+
long qryId = QRY_ID_GEN.getAndIncrement();
try {
@@ -108,7 +135,7 @@ public class OdbcRequestHandler {
Iterator iter = qryCur.iterator();
- qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, iter));
+ qryCursors.put(qryId, new IgniteBiTuple<>(qryCur, iter));
List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta();
@@ -117,7 +144,7 @@ public class OdbcRequestHandler {
return new OdbcResponse(res);
}
catch (Exception e) {
- qryCurs.remove(qryId);
+ qryCursors.remove(qryId);
return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage());
}
@@ -131,21 +158,21 @@ public class OdbcRequestHandler {
*/
private OdbcResponse closeQuery(OdbcQueryCloseRequest req) {
try {
- QueryCursor cur = qryCurs.get(req.queryId()).get1();
+ QueryCursor cur = qryCursors.get(req.queryId()).get1();
if (cur == null)
return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId());
cur.close();
- qryCurs.remove(req.queryId());
+ qryCursors.remove(req.queryId());
OdbcQueryCloseResult res = new OdbcQueryCloseResult(req.queryId());
return new OdbcResponse(res);
}
catch (Exception e) {
- qryCurs.remove(req.queryId());
+ qryCursors.remove(req.queryId());
return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage());
}
@@ -159,7 +186,7 @@ public class OdbcRequestHandler {
*/
private OdbcResponse fetchQuery(OdbcQueryFetchRequest req) {
try {
- Iterator cur = qryCurs.get(req.queryId()).get2();
+ Iterator cur = qryCursors.get(req.queryId()).get2();
if (cur == null)
return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId());