You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/18 14:54:56 UTC

ignite git commit: IGNITE-2643: Fixed.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1786 fe63a9086 -> 6a7cb0a9c


IGNITE-2643: Fixed.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a7cb0a9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a7cb0a9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a7cb0a9

Branch: refs/heads/ignite-1786
Commit: 6a7cb0a9c242708c0e267465e812a0ca90bd8459
Parents: fe63a90
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Feb 18 16:54:45 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Feb 18 16:54:45 2016 +0300

----------------------------------------------------------------------
 .../ignite/configuration/OdbcConfiguration.java | 27 +++++++
 .../processors/odbc/OdbcNioListener.java        | 76 +++++---------------
 .../internal/processors/odbc/OdbcProcessor.java |  2 +-
 .../processors/odbc/OdbcRequestHandler.java     | 69 ++++++++++++------
 4 files changed, 95 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6a7cb0a9/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
index d6f0500..8f0a0fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java
@@ -38,6 +38,9 @@ public class OdbcConfiguration {
     /** Default socket send and receive buffer size. */
     public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
 
+    /** Default max number of open cursors per connection. */
+    public static final int DFLT_MAX_OPEN_CURSORS = 128;
+
     /** TCP port. */
     private int port = DFLT_TCP_PORT;
 
@@ -65,6 +68,9 @@ public class OdbcConfiguration {
     /** Idle timeout. */
     private long idleTimeout = DFLT_IDLE_TIMEOUT;
 
+    /** Max number of opened cursors per connection. */
+    private int maxOpenCursors = DFLT_MAX_OPEN_CURSORS;
+
     /**
      * Creates ODBC server configuration with all default values.
      */
@@ -84,6 +90,7 @@ public class OdbcConfiguration {
         directBuf = cfg.isDirectBuffer();
         host = cfg.getHost();
         idleTimeout = cfg.getIdleTimeout();
+        maxOpenCursors = cfg.getMaxOpenCursors();
         noDelay = cfg.isNoDelay();
         port = cfg.getPort();
         rcvBufSize = cfg.getReceiveBufferSize();
@@ -289,4 +296,24 @@ public class OdbcConfiguration {
     public void setIdleTimeout(long idleTimeout) {
         this.idleTimeout = idleTimeout;
     }
+
+    /**
+     * Gets maximum number of opened cursors per connection.
+     * <p>
+     * Defaults to {@link #DFLT_MAX_OPEN_CURSORS}.
+     *
+     * @return Maximum number of opened cursors.
+     */
+    public int getMaxOpenCursors() {
+        return maxOpenCursors;
+    }
+
+    /**
+     * Sets maximum number of opened cursors per connection. See {@link #getMaxOpenCursors()}.
+     *
+     * @param maxOpenCursors Maximum number of opened cursors.
+     */
+    public void setMaxOpenCursors(int maxOpenCursors) {
+        this.maxOpenCursors = maxOpenCursors;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a7cb0a9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
index 23560b1..e25ae5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
 import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
 import org.jetbrains.annotations.Nullable;
 
 import java.io.IOException;
@@ -42,14 +43,17 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
     /** Initial output stream capacity. */
     private static final int INIT_CAP = 1024;
 
+    /** Handler metadata key. */
+    private static final int HANDLER_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
     /** Request ID generator. */
     private static final AtomicLong REQ_ID_GEN = new AtomicLong();
 
     /** Busy lock. */
     private final GridSpinBusyLock busyLock;
 
-    /** Request handler. */
-    private final OdbcRequestHandler handler;
+    /** Kernal context. */
+    private final GridKernalContext ctx;
 
     /** Marshaller. */
     private final GridBinaryMarshaller marsh;
@@ -60,15 +64,14 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
     /**
      * @param ctx Context.
      * @param busyLock Shutdown busy lock.
-     * @param handler Request handler.
      */
-    public OdbcNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock, OdbcRequestHandler handler) {
+    public OdbcNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock) {
+        this.ctx = ctx;
         this.busyLock = busyLock;
-        this.handler = handler;
 
         CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects();
 
-        marsh = cacheObjProc.marshaller();
+        this.marsh = cacheObjProc.marshaller();
 
         this.log = ctx.log(getClass());
     }
@@ -77,6 +80,8 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
     @Override public void onConnected(GridNioSession ses) {
         if (log.isDebugEnabled())
             log.debug("ODBC client connected: " + ses.remoteAddress());
+
+        ses.addMeta(HANDLER_META_KEY, new OdbcRequestHandler(ctx, busyLock));
     }
 
     /** {@inheritDoc} */
@@ -102,10 +107,15 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
             if (log.isDebugEnabled()) {
                 startTime = System.nanoTime();
 
-                log.debug("ODBC request received [id=" + reqId + ", addr=" + ses.remoteAddress() + ", req=" + req + ']');
+                log.debug("ODBC request received [id=" + reqId + ", addr=" + ses.remoteAddress() +
+                    ", req=" + req + ']');
             }
 
-            OdbcResponse resp = handle(req);
+            OdbcRequestHandler handler = ses.meta(HANDLER_META_KEY);
+
+            assert handler != null;
+
+            OdbcResponse resp = handler.handle(req);
 
             if (log.isDebugEnabled()) {
                 long dur = (System.nanoTime() - startTime) / 1000;
@@ -160,16 +170,10 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
 
         switch (cmd) {
             case OdbcRequest.EXECUTE_SQL_QUERY: {
-
                 String cache = reader.readString();
                 String sql = reader.readString();
                 int argsNum = reader.readInt();
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Message: [cmd=EXECUTE_SQL_QUERY, cache=" + cache +
-                            ", query=" + sql + ", argsNum=" + argsNum + ']');
-                }
-
                 Object[] params = new Object[argsNum];
 
                 for (int i = 0; i < argsNum; ++i)
@@ -181,26 +185,17 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
             }
 
             case OdbcRequest.FETCH_SQL_QUERY: {
-
                 long queryId = reader.readLong();
                 int pageSize = reader.readInt();
 
-                if (log.isDebugEnabled())
-                    log.debug("Message: [cmd=FETCH_SQL_QUERY, queryId=" + queryId + ", pageSize=" + pageSize + ']');
-
                 res = new OdbcQueryFetchRequest(queryId, pageSize);
 
                 break;
             }
 
             case OdbcRequest.CLOSE_SQL_QUERY: {
-
                 long queryId = reader.readLong();
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Message: [cmd=CLOSE_SQL_QUERY, queryId=" + queryId + ']');
-                }
-
                 res = new OdbcQueryCloseRequest(queryId);
 
                 break;
@@ -212,36 +207,24 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
                 String table = reader.readString();
                 String column = reader.readString();
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Message: [cmd=GET_COLUMNS_META, cache=" + cache +
-                            ", table=" + table + ", column: " + column + ']');
-                }
-
                 res = new OdbcQueryGetColumnsMetaRequest(cache, table, column);
 
                 break;
             }
 
             case OdbcRequest.GET_TABLES_META: {
-
                 String catalog = reader.readString();
                 String schema = reader.readString();
                 String table = reader.readString();
                 String tableType = reader.readString();
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Message: [cmd=GET_COLUMNS_META, catalog=" + catalog +
-                            ", schema=" + schema + ", table=" + table + ", tableType=" + tableType + ']');
-                }
-
                 res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType);
 
                 break;
             }
 
             default:
-                throw new IOException("Failed to parse incoming packet (unknown command type) " +
-                        "[cmd=[" + Byte.toString(cmd) + ']');
+                throw new IOException("Unknown ODBC command: " + cmd);
         }
 
         return res;
@@ -352,25 +335,4 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
 
         return writer.array();
     }
-
-    /**
-     * Handle request.
-     *
-     * @param req Request.
-     * @return Response.
-     */
-    private OdbcResponse handle(OdbcRequest req) {
-        assert req != null;
-
-        if (!busyLock.enterBusy())
-            return new OdbcResponse(OdbcResponse.STATUS_FAILED,
-                "Failed to handle ODBC request because node is stopping: " + req);
-
-        try {
-            return handler.handle(req);
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a7cb0a9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
index 9d22e4a..87be686 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
@@ -73,7 +73,7 @@ public class OdbcProcessor extends GridProcessorAdapter {
                 srv = GridNioServer.<byte[]>builder()
                     .address(host)
                     .port(port)
-                    .listener(new OdbcNioListener(ctx, busyLock, new OdbcRequestHandler(ctx)))
+                    .listener(new OdbcNioListener(ctx, busyLock))
                     .logger(log)
                     .selectorCount(odbcCfg.getSelectorCount())
                     .gridName(ctx.gridName())

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a7cb0a9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
index 332a5cc..1af14b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.processors.odbc;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.OdbcConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteBiTuple;
 
@@ -43,16 +45,21 @@ public class OdbcRequestHandler {
     /** Kernel context. */
     private final GridKernalContext ctx;
 
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock;
+
     /** Current queries cursors. */
-    private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCursors = new ConcurrentHashMap<>();
 
     /**
      * Constructor.
      *
      * @param ctx Context.
+     * @param busyLock Shutdown latch.
      */
-    public OdbcRequestHandler(final GridKernalContext ctx) {
+    public OdbcRequestHandler(final GridKernalContext ctx, final GridSpinBusyLock busyLock) {
         this.ctx = ctx;
+        this.busyLock = busyLock;
     }
 
     /**
@@ -64,24 +71,33 @@ public class OdbcRequestHandler {
     public OdbcResponse handle(OdbcRequest req) {
         assert req != null;
 
-        switch (req.command()) {
-            case EXECUTE_SQL_QUERY:
-                return executeQuery((OdbcQueryExecuteRequest)req);
+        if (!busyLock.enterBusy())
+            return new OdbcResponse(OdbcResponse.STATUS_FAILED,
+                    "Failed to handle ODBC request because node is stopping: " + req);
 
-            case FETCH_SQL_QUERY:
-                return fetchQuery((OdbcQueryFetchRequest)req);
+        try {
+            switch (req.command()) {
+                case EXECUTE_SQL_QUERY:
+                    return executeQuery((OdbcQueryExecuteRequest)req);
 
-            case CLOSE_SQL_QUERY:
-                return closeQuery((OdbcQueryCloseRequest)req);
+                case FETCH_SQL_QUERY:
+                    return fetchQuery((OdbcQueryFetchRequest)req);
 
-            case GET_COLUMNS_META:
-                return getColumnsMeta((OdbcQueryGetColumnsMetaRequest) req);
+                case CLOSE_SQL_QUERY:
+                    return closeQuery((OdbcQueryCloseRequest)req);
 
-            case GET_TABLES_META:
-                return getTablesMeta((OdbcQueryGetTablesMetaRequest) req);
-        }
+                case GET_COLUMNS_META:
+                    return getColumnsMeta((OdbcQueryGetColumnsMetaRequest) req);
 
-        return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Unsupported ODBC request: " + req);
+                case GET_TABLES_META:
+                    return getTablesMeta((OdbcQueryGetTablesMetaRequest) req);
+            }
+
+            return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Unsupported ODBC request: " + req);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -91,6 +107,17 @@ public class OdbcRequestHandler {
      * @return Response.
      */
     private OdbcResponse executeQuery(OdbcQueryExecuteRequest req) {
+        OdbcConfiguration cfg = ctx.config().getOdbcConfiguration();
+
+        assert cfg != null;
+
+        int cursorCnt = qryCursors.size();
+
+        if (cursorCnt >= cfg.getMaxOpenCursors())
+            return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Too many opened cursors (either close other " +
+                "opened cursors or increase the limit through OdbcConfiguration.setMaxOpenCursors()) " +
+                "[maximum=" + cfg.getMaxOpenCursors() + ", current=" + cursorCnt + ']');
+
         long qryId = QRY_ID_GEN.getAndIncrement();
 
         try {
@@ -108,7 +135,7 @@ public class OdbcRequestHandler {
 
             Iterator iter = qryCur.iterator();
 
-            qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, iter));
+            qryCursors.put(qryId, new IgniteBiTuple<>(qryCur, iter));
 
             List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta();
 
@@ -117,7 +144,7 @@ public class OdbcRequestHandler {
             return new OdbcResponse(res);
         }
         catch (Exception e) {
-            qryCurs.remove(qryId);
+            qryCursors.remove(qryId);
 
             return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage());
         }
@@ -131,21 +158,21 @@ public class OdbcRequestHandler {
      */
     private OdbcResponse closeQuery(OdbcQueryCloseRequest req) {
         try {
-            QueryCursor cur = qryCurs.get(req.queryId()).get1();
+            QueryCursor cur = qryCursors.get(req.queryId()).get1();
 
             if (cur == null)
                 return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId());
 
             cur.close();
 
-            qryCurs.remove(req.queryId());
+            qryCursors.remove(req.queryId());
 
             OdbcQueryCloseResult res = new OdbcQueryCloseResult(req.queryId());
 
             return new OdbcResponse(res);
         }
         catch (Exception e) {
-            qryCurs.remove(req.queryId());
+            qryCursors.remove(req.queryId());
 
             return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage());
         }
@@ -159,7 +186,7 @@ public class OdbcRequestHandler {
      */
     private OdbcResponse fetchQuery(OdbcQueryFetchRequest req) {
         try {
-            Iterator cur = qryCurs.get(req.queryId()).get2();
+            Iterator cur = qryCursors.get(req.queryId()).get2();
 
             if (cur == null)
                 return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId());