You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/11 14:39:32 UTC
ignite git commit: Refactored ODBC processor.
Repository: ignite
Updated Branches:
refs/heads/ignite-1786 70d5323b2 -> 9453392cf
Refactored ODBC processor.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9453392c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9453392c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9453392c
Branch: refs/heads/ignite-1786
Commit: 9453392cf2b22cacadc260a4da372eb4222cb8e1
Parents: 70d5323
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Feb 11 16:39:29 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Feb 11 16:39:29 2016 +0300
----------------------------------------------------------------------
.../processors/odbc/OdbcCommandHandler.java | 392 ------------------
.../processors/odbc/OdbcNioListener.java | 400 +++++++++++++++++++
.../internal/processors/odbc/OdbcNioParser.java | 381 ++++++++++++++++++
.../internal/processors/odbc/OdbcParser.java | 378 ------------------
.../internal/processors/odbc/OdbcProcessor.java | 172 ++------
5 files changed, 824 insertions(+), 899 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9453392c/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcCommandHandler.java
deleted file mode 100644
index 91f84b9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcCommandHandler.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.odbc;
-
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
-import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
-import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
-import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.ignite.internal.processors.odbc.OdbcRequest.*;
-
-/**
- * SQL query handler.
- */
-public class OdbcCommandHandler extends GridNioServerListenerAdapter<OdbcRequest> {
- /** Query ID sequence. */
- private static final AtomicLong qryIdGen = new AtomicLong();
-
- /** Kernel context. */
- private final GridKernalContext ctx;
-
- /** Busy lock. */
- private final GridSpinBusyLock busyLock;
-
- /** Logger. */
- private final IgniteLogger log;
-
- /** Current queries cursors. */
- private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs = new ConcurrentHashMap<>();
-
- /**
- * Constructor.
- *
- * @param ctx Context.
- */
- public OdbcCommandHandler(final GridKernalContext ctx, final GridSpinBusyLock busyLock) {
- this.ctx = ctx;
- this.busyLock = busyLock;
-
- this.log = ctx.log(getClass());
- }
-
- /** {@inheritDoc} */
- @Override public void onConnected(GridNioSession ses) {
- if (log.isDebugEnabled())
- log.debug("Driver connected");
- }
-
- /** {@inheritDoc} */
- @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
- if (log.isDebugEnabled())
- log.debug("Driver disconnected");
-
- if (e != null) {
- if (e instanceof RuntimeException)
- U.error(log, "Failed to process request from remote client: " + ses, e);
- else
- U.warn(log, "Closed client session due to exception [ses=" + ses + ", msg=" + e.getMessage() + ']');
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onMessage(GridNioSession ses, OdbcRequest msg) {
- assert msg != null;
-
- if (log.isDebugEnabled())
- log.debug("Received request from client: [msg=" + msg + ']');
-
- OdbcResponse res = handle(msg);
-
- if (log.isDebugEnabled())
- log.debug("Handling result: [res=" + res.status() + ']');
-
- ses.send(res);
- }
-
- /**
- * Handle request.
- *
- * @param req Request.
- * @return Response.
- */
- public OdbcResponse handle(OdbcRequest req) {
- assert req != null;
-
- if (!busyLock.enterBusy()) {
- String errMsg = "Failed to handle request [req=" + req +
- ", err=Received request while stopping grid]";
-
- U.error(log, errMsg);
-
- return new OdbcResponse(OdbcResponse.STATUS_FAILED, errMsg);
- }
-
- try {
- switch (req.command()) {
- case EXECUTE_SQL_QUERY:
- return executeQuery((OdbcQueryExecuteRequest)req);
-
- case FETCH_SQL_QUERY:
- return fetchQuery((OdbcQueryFetchRequest)req);
-
- case CLOSE_SQL_QUERY:
- return closeQuery((OdbcQueryCloseRequest)req);
-
- case GET_COLUMNS_META:
- return getColumnsMeta((OdbcQueryGetColumnsMetaRequest) req);
-
- case GET_TABLES_META:
- return getTablesMeta((OdbcQueryGetTablesMetaRequest) req);
- }
-
- return new OdbcResponse(OdbcResponse.STATUS_FAILED,
- "Failed to find registered handler for command: " + req.command());
- }
- finally {
- busyLock.leaveBusy();
- }
- }
-
- /**
- * {@link OdbcQueryExecuteRequest} command handler.
- *
- * @param req Execute query request.
- * @return Response.
- */
- private OdbcResponse executeQuery(OdbcQueryExecuteRequest req) {
- long qryId = qryIdGen.getAndIncrement();
-
- try {
- SqlFieldsQuery qry = new SqlFieldsQuery(req.sqlQuery());
-
- qry.setArgs(req.arguments());
-
- IgniteCache<Object, Object> cache = ctx.grid().cache(req.cacheName());
-
- if (cache == null)
- return new OdbcResponse(OdbcResponse.STATUS_FAILED,
- "Failed to find cache with name: " + req.cacheName());
-
- QueryCursor qryCur = cache.query(qry);
-
- Iterator cur = qryCur.iterator();
-
- qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, cur));
-
- List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta();
-
- if (log.isDebugEnabled())
- log.debug("Field meta: " + fieldsMeta);
-
- OdbcQueryExecuteResult res = new OdbcQueryExecuteResult(qryId, convertMetadata(fieldsMeta));
-
- return new OdbcResponse(res);
- }
- catch (Exception e) {
- qryCurs.remove(qryId);
-
- return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage());
- }
- }
-
- /**
- * {@link OdbcQueryCloseRequest} command handler.
- *
- * @param req Execute query request.
- * @return Response.
- */
- private OdbcResponse closeQuery(OdbcQueryCloseRequest req) {
- try {
- QueryCursor cur = qryCurs.get(req.queryId()).get1();
-
- if (cur == null)
- return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId());
-
- cur.close();
-
- qryCurs.remove(req.queryId());
-
- OdbcQueryCloseResult res = new OdbcQueryCloseResult(req.queryId());
-
- return new OdbcResponse(res);
- }
- catch (Exception e) {
- qryCurs.remove(req.queryId());
-
- return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage());
- }
- }
-
- /**
- * {@link OdbcQueryFetchRequest} command handler.
- *
- * @param req Execute query request.
- * @return Response.
- */
- private OdbcResponse fetchQuery(OdbcQueryFetchRequest req) {
- try {
- Iterator cur = qryCurs.get(req.queryId()).get2();
-
- if (cur == null)
- return new OdbcResponse(OdbcResponse.STATUS_FAILED,
- "Failed to find query with ID: " + req.queryId());
-
- List<Object> items = new ArrayList<>();
-
- for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i)
- items.add(cur.next());
-
- OdbcQueryFetchResult res = new OdbcQueryFetchResult(req.queryId(), items, !cur.hasNext());
-
- return new OdbcResponse(res);
- }
- catch (Exception e) {
- qryCurs.remove(req.queryId());
-
- return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage());
- }
- }
-
- /**
- * {@link OdbcQueryGetColumnsMetaRequest} command handler.
- *
- * @param req Get columns metadata request.
- * @return Response.
- */
- private OdbcResponse getColumnsMeta(OdbcQueryGetColumnsMetaRequest req) {
- try {
- List<OdbcColumnMeta> meta = new ArrayList<>();
-
- String cacheName;
- String tableName;
-
- if (req.tableName().contains(".")) {
- // Parsing two-part table name.
- String[] parts = req.tableName().split("\\.");
-
- cacheName = removeQuotationMarksIfNeeded(parts[0]);
-
- tableName = parts[1];
- }
- else {
- cacheName = removeQuotationMarksIfNeeded(req.cacheName());
-
- tableName = req.tableName();
- }
-
- Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName);
-
- for (GridQueryTypeDescriptor table : tablesMeta) {
- if (!matches(table.name(), tableName))
- continue;
-
- for (Map.Entry<String, Class<?>> field : table.fields().entrySet()) {
- if (!matches(field.getKey(), req.columnName()))
- continue;
-
- OdbcColumnMeta columnMeta = new OdbcColumnMeta(req.cacheName(),
- table.name(), field.getKey(), field.getValue());
-
- if (!meta.contains(columnMeta))
- meta.add(columnMeta);
- }
- }
- OdbcQueryGetColumnsMetaResult res = new OdbcQueryGetColumnsMetaResult(meta);
-
- return new OdbcResponse(res);
- }
- catch (Exception e) {
- return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage());
- }
- }
-
- /**
- * {@link OdbcQueryGetTablesMetaRequest} command handler.
- *
- * @param req Get tables metadata request.
- * @return Response.
- */
- private OdbcResponse getTablesMeta(OdbcQueryGetTablesMetaRequest req) {
- try {
- List<OdbcTableMeta> meta = new ArrayList<>();
-
- String realSchema = removeQuotationMarksIfNeeded(req.schema());
-
- for (String cacheName : ctx.cache().cacheNames())
- {
- if (!matches(cacheName, realSchema))
- continue;
-
- Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName);
-
- for (GridQueryTypeDescriptor table : tablesMeta) {
- if (!matches(table.name(), req.table()))
- continue;
-
- if (!matches("TABLE", req.tableType()))
- continue;
-
- OdbcTableMeta tableMeta = new OdbcTableMeta(req.catalog(), cacheName,
- table.name(), "TABLE");
-
- if (!meta.contains(tableMeta))
- meta.add(tableMeta);
- }
- }
-
- OdbcQueryGetTablesMetaResult res = new OdbcQueryGetTablesMetaResult(meta);
-
- return new OdbcResponse(res);
- }
- catch (Exception e) {
- return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage());
- }
- }
-
- /**
- * Convert metadata in collection from {@link GridQueryFieldMetadata} to
- * {@link OdbcColumnMeta}.
- *
- * @param meta Internal query field metadata.
- * @return Odbc query field metadata.
- */
- private static Collection<OdbcColumnMeta> convertMetadata(Collection<?> meta) {
- List<OdbcColumnMeta> res = new ArrayList<>();
-
- if (meta != null) {
- for (Object info : meta) {
- assert info instanceof GridQueryFieldMetadata;
-
- res.add(new OdbcColumnMeta((GridQueryFieldMetadata)info));
- }
- }
-
- return res;
- }
-
- /**
- * Checks whether string matches SQL pattern.
- *
- * @param str String.
- * @param ptrn Pattern.
- * @return Whether string matches pattern.
- */
- private static boolean matches(String str, String ptrn) {
- return str != null && (F.isEmpty(ptrn) ||
- str.toUpperCase().matches(ptrn.toUpperCase().replace("%", ".*").replace("_", ".")));
- }
-
- /**
- * Remove quotation marks at the beginning and end of the string if present.
- *
- * @param str Input string.
- * @return String without leading and trailing quotation marks.
- */
- private static String removeQuotationMarksIfNeeded(String str) {
- if (str.startsWith("\"") && str.endsWith("\""))
- return str.substring(1, str.length() - 1);
-
- return str;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9453392c/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
new file mode 100644
index 0000000..0a5ae0e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.ignite.internal.processors.odbc.OdbcRequest.CLOSE_SQL_QUERY;
+import static org.apache.ignite.internal.processors.odbc.OdbcRequest.EXECUTE_SQL_QUERY;
+import static org.apache.ignite.internal.processors.odbc.OdbcRequest.FETCH_SQL_QUERY;
+import static org.apache.ignite.internal.processors.odbc.OdbcRequest.GET_COLUMNS_META;
+import static org.apache.ignite.internal.processors.odbc.OdbcRequest.GET_TABLES_META;
+
+/**
+ * SQL query handler.
+ */
+public class OdbcNioListener extends GridNioServerListenerAdapter<OdbcRequest> {
+ /** Query ID sequence. */
+ private static final AtomicLong qryIdGen = new AtomicLong();
+
+ /** Kernel context. */
+ private final GridKernalContext ctx;
+
+ /** Busy lock. */
+ private final GridSpinBusyLock busyLock;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Current queries cursors. */
+ private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs = new ConcurrentHashMap<>();
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Context.
+ */
+ public OdbcNioListener(final GridKernalContext ctx, final GridSpinBusyLock busyLock) {
+ this.ctx = ctx;
+ this.busyLock = busyLock;
+
+ this.log = ctx.log(getClass());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onConnected(GridNioSession ses) {
+ if (log.isDebugEnabled())
+ log.debug("Driver connected");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+ if (log.isDebugEnabled())
+ log.debug("Driver disconnected");
+
+ if (e != null) {
+ if (e instanceof RuntimeException)
+ U.error(log, "Failed to process request from remote client: " + ses, e);
+ else
+ U.warn(log, "Closed client session due to exception [ses=" + ses + ", msg=" + e.getMessage() + ']');
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(GridNioSession ses, OdbcRequest msg) {
+ assert msg != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Received request from client: [msg=" + msg + ']');
+
+ OdbcResponse res = handle(msg);
+
+ if (log.isDebugEnabled())
+ log.debug("Handling result: [res=" + res.status() + ']');
+
+ ses.send(res);
+ }
+
+ /**
+ * Handle request.
+ *
+ * @param req Request.
+ * @return Response.
+ */
+ public OdbcResponse handle(OdbcRequest req) {
+ assert req != null;
+
+ if (!busyLock.enterBusy()) {
+ String errMsg = "Failed to handle request [req=" + req +
+ ", err=Received request while stopping grid]";
+
+ U.error(log, errMsg);
+
+ return new OdbcResponse(OdbcResponse.STATUS_FAILED, errMsg);
+ }
+
+ try {
+ switch (req.command()) {
+ case EXECUTE_SQL_QUERY:
+ return executeQuery((OdbcQueryExecuteRequest)req);
+
+ case FETCH_SQL_QUERY:
+ return fetchQuery((OdbcQueryFetchRequest)req);
+
+ case CLOSE_SQL_QUERY:
+ return closeQuery((OdbcQueryCloseRequest)req);
+
+ case GET_COLUMNS_META:
+ return getColumnsMeta((OdbcQueryGetColumnsMetaRequest) req);
+
+ case GET_TABLES_META:
+ return getTablesMeta((OdbcQueryGetTablesMetaRequest) req);
+ }
+
+ return new OdbcResponse(OdbcResponse.STATUS_FAILED,
+ "Failed to find registered handler for command: " + req.command());
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * {@link OdbcQueryExecuteRequest} command handler.
+ *
+ * @param req Execute query request.
+ * @return Response.
+ */
+ private OdbcResponse executeQuery(OdbcQueryExecuteRequest req) {
+ long qryId = qryIdGen.getAndIncrement();
+
+ try {
+ SqlFieldsQuery qry = new SqlFieldsQuery(req.sqlQuery());
+
+ qry.setArgs(req.arguments());
+
+ IgniteCache<Object, Object> cache = ctx.grid().cache(req.cacheName());
+
+ if (cache == null)
+ return new OdbcResponse(OdbcResponse.STATUS_FAILED,
+ "Failed to find cache with name: " + req.cacheName());
+
+ QueryCursor qryCur = cache.query(qry);
+
+ Iterator cur = qryCur.iterator();
+
+ qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, cur));
+
+ List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta();
+
+ if (log.isDebugEnabled())
+ log.debug("Field meta: " + fieldsMeta);
+
+ OdbcQueryExecuteResult res = new OdbcQueryExecuteResult(qryId, convertMetadata(fieldsMeta));
+
+ return new OdbcResponse(res);
+ }
+ catch (Exception e) {
+ qryCurs.remove(qryId);
+
+ return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage());
+ }
+ }
+
+ /**
+ * {@link OdbcQueryCloseRequest} command handler.
+ *
+ * @param req Execute query request.
+ * @return Response.
+ */
+ private OdbcResponse closeQuery(OdbcQueryCloseRequest req) {
+ try {
+ QueryCursor cur = qryCurs.get(req.queryId()).get1();
+
+ if (cur == null)
+ return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId());
+
+ cur.close();
+
+ qryCurs.remove(req.queryId());
+
+ OdbcQueryCloseResult res = new OdbcQueryCloseResult(req.queryId());
+
+ return new OdbcResponse(res);
+ }
+ catch (Exception e) {
+ qryCurs.remove(req.queryId());
+
+ return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage());
+ }
+ }
+
+ /**
+ * {@link OdbcQueryFetchRequest} command handler.
+ *
+ * @param req Execute query request.
+ * @return Response.
+ */
+ private OdbcResponse fetchQuery(OdbcQueryFetchRequest req) {
+ try {
+ Iterator cur = qryCurs.get(req.queryId()).get2();
+
+ if (cur == null)
+ return new OdbcResponse(OdbcResponse.STATUS_FAILED,
+ "Failed to find query with ID: " + req.queryId());
+
+ List<Object> items = new ArrayList<>();
+
+ for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i)
+ items.add(cur.next());
+
+ OdbcQueryFetchResult res = new OdbcQueryFetchResult(req.queryId(), items, !cur.hasNext());
+
+ return new OdbcResponse(res);
+ }
+ catch (Exception e) {
+ qryCurs.remove(req.queryId());
+
+ return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage());
+ }
+ }
+
+ /**
+ * {@link OdbcQueryGetColumnsMetaRequest} command handler.
+ *
+ * @param req Get columns metadata request.
+ * @return Response.
+ */
+ private OdbcResponse getColumnsMeta(OdbcQueryGetColumnsMetaRequest req) {
+ try {
+ List<OdbcColumnMeta> meta = new ArrayList<>();
+
+ String cacheName;
+ String tableName;
+
+ if (req.tableName().contains(".")) {
+ // Parsing two-part table name.
+ String[] parts = req.tableName().split("\\.");
+
+ cacheName = removeQuotationMarksIfNeeded(parts[0]);
+
+ tableName = parts[1];
+ }
+ else {
+ cacheName = removeQuotationMarksIfNeeded(req.cacheName());
+
+ tableName = req.tableName();
+ }
+
+ Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName);
+
+ for (GridQueryTypeDescriptor table : tablesMeta) {
+ if (!matches(table.name(), tableName))
+ continue;
+
+ for (Map.Entry<String, Class<?>> field : table.fields().entrySet()) {
+ if (!matches(field.getKey(), req.columnName()))
+ continue;
+
+ OdbcColumnMeta columnMeta = new OdbcColumnMeta(req.cacheName(),
+ table.name(), field.getKey(), field.getValue());
+
+ if (!meta.contains(columnMeta))
+ meta.add(columnMeta);
+ }
+ }
+ OdbcQueryGetColumnsMetaResult res = new OdbcQueryGetColumnsMetaResult(meta);
+
+ return new OdbcResponse(res);
+ }
+ catch (Exception e) {
+ return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage());
+ }
+ }
+
+ /**
+ * {@link OdbcQueryGetTablesMetaRequest} command handler.
+ *
+ * @param req Get tables metadata request.
+ * @return Response.
+ */
+ private OdbcResponse getTablesMeta(OdbcQueryGetTablesMetaRequest req) {
+ try {
+ List<OdbcTableMeta> meta = new ArrayList<>();
+
+ String realSchema = removeQuotationMarksIfNeeded(req.schema());
+
+ for (String cacheName : ctx.cache().cacheNames())
+ {
+ if (!matches(cacheName, realSchema))
+ continue;
+
+ Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName);
+
+ for (GridQueryTypeDescriptor table : tablesMeta) {
+ if (!matches(table.name(), req.table()))
+ continue;
+
+ if (!matches("TABLE", req.tableType()))
+ continue;
+
+ OdbcTableMeta tableMeta = new OdbcTableMeta(req.catalog(), cacheName,
+ table.name(), "TABLE");
+
+ if (!meta.contains(tableMeta))
+ meta.add(tableMeta);
+ }
+ }
+
+ OdbcQueryGetTablesMetaResult res = new OdbcQueryGetTablesMetaResult(meta);
+
+ return new OdbcResponse(res);
+ }
+ catch (Exception e) {
+ return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage());
+ }
+ }
+
+ /**
+ * Convert metadata in collection from {@link GridQueryFieldMetadata} to
+ * {@link OdbcColumnMeta}.
+ *
+ * @param meta Internal query field metadata.
+ * @return Odbc query field metadata.
+ */
+ private static Collection<OdbcColumnMeta> convertMetadata(Collection<?> meta) {
+ List<OdbcColumnMeta> res = new ArrayList<>();
+
+ if (meta != null) {
+ for (Object info : meta) {
+ assert info instanceof GridQueryFieldMetadata;
+
+ res.add(new OdbcColumnMeta((GridQueryFieldMetadata)info));
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * Checks whether string matches SQL pattern.
+ *
+ * @param str String.
+ * @param ptrn Pattern.
+ * @return Whether string matches pattern.
+ */
+ private static boolean matches(String str, String ptrn) {
+ return str != null && (F.isEmpty(ptrn) ||
+ str.toUpperCase().matches(ptrn.toUpperCase().replace("%", ".*").replace("_", ".")));
+ }
+
+ /**
+ * Remove quotation marks at the beginning and end of the string if present.
+ *
+ * @param str Input string.
+ * @return String without leading and trailing quotation marks.
+ */
+ private static String removeQuotationMarksIfNeeded(String str) {
+ if (str.startsWith("\"") && str.endsWith("\""))
+ return str.substring(1, str.length() - 1);
+
+ return str;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9453392c/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioParser.java
new file mode 100644
index 0000000..60c059a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioParser.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+/**
+ * ODBC protocol parser.
+ */
+public class OdbcNioParser implements GridNioParser {
+ /** Initial output stream capacity. */
+ private static final int INIT_CAP = 1024;
+
+ /** Length in bytes of the remaining message part. */
+ private int leftToReceive = 0;
+
+ /** Already received bytes of current message. */
+ private ByteBuffer currentMessage = null;
+
+ /** Marshaller. */
+ private final GridBinaryMarshaller marsh;
+
+ /** Logger. */
+ protected final IgniteLogger log;
+
+ /**
+ * @param ctx Kernel context.
+ */
+ public OdbcNioParser(GridKernalContext ctx) {
+ CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects();
+
+ marsh = cacheObjProc.marshaller();
+
+ log = ctx.log(getClass());
+ }
+
+ /**
+ * Process data chunk and try to construct new message using stored and
+ * freshly received data.
+ *
+ * @param buf Fresh data buffer.
+ * @return Instance of the {@link BinaryReaderExImpl} positioned to read
+ * from the beginning of the message on success and null otherwise.
+ */
+ private BinaryRawReaderEx tryConstructMessage(ByteBuffer buf) {
+ if (leftToReceive != 0) {
+ // Still receiving message
+ int toConsume = Math.min(leftToReceive, buf.remaining());
+
+ currentMessage.put(buf.array(), buf.arrayOffset(), toConsume);
+ leftToReceive -= toConsume;
+
+ buf.position(buf.position() + toConsume);
+
+ if (leftToReceive != 0)
+ return null;
+
+ BinaryInputStream stream = new BinaryHeapInputStream(currentMessage.array());
+
+ BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null);
+
+ currentMessage = null;
+
+ return reader;
+ }
+
+ // Receiving new message
+ BinaryInputStream stream = new BinaryHeapInputStream(buf.array());
+
+ BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null);
+
+ // Getting message length. It's in the first four bytes of the message.
+ int messageLen = reader.readInt();
+
+ // Just skipping int here to sync position.
+ buf.getInt();
+
+ int remaining = buf.remaining();
+
+ // Checking if we have not entire message in buffer.
+ if (messageLen > remaining) {
+ leftToReceive = messageLen - remaining;
+
+ currentMessage = ByteBuffer.allocate(messageLen);
+ currentMessage.put(buf);
+
+ return null;
+ }
+
+ buf.position(buf.position() + messageLen);
+
+ return reader;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public OdbcRequest decode(GridNioSession ses, ByteBuffer buf) throws IOException,
+ IgniteCheckedException {
+ BinaryRawReaderEx messageReader = tryConstructMessage(buf);
+
+ return messageReader == null ? null : readRequest(ses, messageReader);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+ assert msg != null;
+ assert msg instanceof OdbcResponse;
+
+ if (log.isDebugEnabled())
+ log.debug("Encoding query processing result");
+
+ BinaryRawWriterEx writer = marsh.writer(new BinaryHeapOutputStream(INIT_CAP));
+
+ // Reserving space for the message length.
+ int msgLenPos = writer.reserveInt();
+
+ writeResponse(ses, writer, (OdbcResponse)msg);
+
+ int msgLenWithHdr = writer.out().position() - msgLenPos;
+
+ int msgLen = msgLenWithHdr - 4;
+
+ writer.writeInt(msgLenPos, msgLen);
+
+ ByteBuffer buf = ByteBuffer.allocate(msgLenWithHdr);
+
+ buf.put(writer.out().array(), msgLenPos, msgLenWithHdr);
+
+ buf.flip();
+
+ return buf;
+ }
+
+ /**
+ * Read ODBC request from the raw data using provided {@link BinaryReaderExImpl} instance.
+ *
+ * @param ses Current session.
+ * @param reader Reader positioned to read the request.
+ * @return Instance of the {@link OdbcRequest}.
+ * @throws IOException if the type of the request is unknown to the parser.
+ */
+ private OdbcRequest readRequest(GridNioSession ses, BinaryRawReaderEx reader) throws IOException {
+ OdbcRequest res;
+
+ byte cmd = reader.readByte();
+
+ switch (cmd) {
+ case OdbcRequest.EXECUTE_SQL_QUERY: {
+
+ String cache = reader.readString();
+ String sql = reader.readString();
+ int argsNum = reader.readInt();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Message EXECUTE_SQL_QUERY:");
+ log.debug("cache: " + cache);
+ log.debug("query: " + sql);
+ log.debug("argsNum: " + argsNum);
+ }
+
+ Object[] params = new Object[argsNum];
+
+ for (int i = 0; i < argsNum; ++i)
+ params[i] = reader.readObjectDetached();
+
+ res = new OdbcQueryExecuteRequest(cache, sql, params);
+
+ break;
+ }
+
+ case OdbcRequest.FETCH_SQL_QUERY: {
+
+ long queryId = reader.readLong();
+ int pageSize = reader.readInt();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Message FETCH_SQL_QUERY:");
+ log.debug("queryId: " + queryId);
+ log.debug("pageSize: " + pageSize);
+ }
+
+ res = new OdbcQueryFetchRequest(queryId, pageSize);
+
+ break;
+ }
+
+ case OdbcRequest.CLOSE_SQL_QUERY: {
+
+ long queryId = reader.readLong();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Message CLOSE_SQL_QUERY:");
+ log.debug("queryId: " + queryId);
+ }
+
+ res = new OdbcQueryCloseRequest(queryId);
+
+ break;
+ }
+
+ case OdbcRequest.GET_COLUMNS_META: {
+
+ String cache = reader.readString();
+ String table = reader.readString();
+ String column = reader.readString();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Message GET_COLUMNS_META:");
+ log.debug("cache: " + cache);
+ log.debug("table: " + table);
+ log.debug("column: " + column);
+ }
+
+ res = new OdbcQueryGetColumnsMetaRequest(cache, table, column);
+
+ break;
+ }
+
+ case OdbcRequest.GET_TABLES_META: {
+
+ String catalog = reader.readString();
+ String schema = reader.readString();
+ String table = reader.readString();
+ String tableType = reader.readString();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Message GET_COLUMNS_META:");
+ log.debug("catalog: " + catalog);
+ log.debug("schema: " + schema);
+ log.debug("table: " + table);
+ log.debug("tableType: " + tableType);
+ }
+
+ res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType);
+
+ break;
+ }
+
+ default:
+ throw new IOException("Failed to parse incoming packet (unknown command type) [ses=" + ses +
+ ", cmd=[" + Byte.toString(cmd) + ']');
+ }
+
+ return res;
+ }
+
+ /**
+ * Write ODBC response using provided {@link BinaryRawWriterEx} instance.
+ *
+ * @param ses Current session.
+ * @param writer Writer.
+ * @param rsp ODBC response that should be written.
+ * @throws IOException if the type of the response is unknown to the parser.
+ */
+ private void writeResponse(GridNioSession ses, BinaryRawWriterEx writer, OdbcResponse rsp) throws IOException {
+ // Writing status
+ writer.writeByte((byte)rsp.status());
+
+ if (rsp.status() != OdbcResponse.STATUS_SUCCESS) {
+ writer.writeString(rsp.error());
+
+ return;
+ }
+
+ Object res0 = rsp.response();
+
+ if (res0 instanceof OdbcQueryExecuteResult) {
+ OdbcQueryExecuteResult res = (OdbcQueryExecuteResult) res0;
+
+ if (log.isDebugEnabled())
+ log.debug("Resulting query ID: " + res.getQueryId());
+
+ writer.writeLong(res.getQueryId());
+
+ Collection<OdbcColumnMeta> metas = res.getColumnsMetadata();
+
+ assert metas != null;
+
+ writer.writeInt(metas.size());
+
+ for (OdbcColumnMeta meta : metas)
+ meta.writeBinary(writer, marsh.context());
+
+ }
+ else if (res0 instanceof OdbcQueryFetchResult) {
+ OdbcQueryFetchResult res = (OdbcQueryFetchResult) res0;
+
+ if (log.isDebugEnabled())
+ log.debug("Resulting query ID: " + res.queryId());
+
+ writer.writeLong(res.queryId());
+
+ Collection<?> items0 = res.items();
+
+ assert items0 != null;
+
+ writer.writeBoolean(res.last());
+
+ writer.writeInt(items0.size());
+
+ for (Object row0 : items0) {
+ if (row0 != null) {
+ Collection<?> row = (Collection<?>)row0;
+
+ writer.writeInt(row.size());
+
+ for (Object obj : row)
+ writer.writeObjectDetached(obj);
+ }
+ }
+ }
+ else if (res0 instanceof OdbcQueryCloseResult) {
+ OdbcQueryCloseResult res = (OdbcQueryCloseResult) res0;
+
+ if (log.isDebugEnabled())
+ log.debug("Resulting query ID: " + res.getQueryId());
+
+ writer.writeLong(res.getQueryId());
+
+ }
+ else if (res0 instanceof OdbcQueryGetColumnsMetaResult) {
+ OdbcQueryGetColumnsMetaResult res = (OdbcQueryGetColumnsMetaResult) res0;
+
+ Collection<OdbcColumnMeta> columnsMeta = res.meta();
+
+ assert columnsMeta != null;
+
+ writer.writeInt(columnsMeta.size());
+
+ for (OdbcColumnMeta columnMeta : columnsMeta)
+ columnMeta.writeBinary(writer, marsh.context());
+
+ }
+ else if (res0 instanceof OdbcQueryGetTablesMetaResult) {
+ OdbcQueryGetTablesMetaResult res = (OdbcQueryGetTablesMetaResult) res0;
+
+ Collection<OdbcTableMeta> tablesMeta = res.meta();
+
+ assert tablesMeta != null;
+
+ writer.writeInt(tablesMeta.size());
+
+ for (OdbcTableMeta tableMeta : tablesMeta)
+ tableMeta.writeBinary(writer);
+
+ }
+ else
+ throw new IOException("Failed to serialize response packet (unknown response type) [ses=" + ses + "]");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9453392c/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcParser.java
deleted file mode 100644
index 4078063..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcParser.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.odbc;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.binary.*;
-import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
-import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
-import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
-import org.apache.ignite.internal.util.nio.GridNioParser;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.jetbrains.annotations.Nullable;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-
-/**
- * ODBC protocol parser.
- */
-public class OdbcParser implements GridNioParser {
- /** Initial output stream capacity. */
- private static final int INIT_CAP = 1024;
-
- /** Length in bytes of the remaining message part. */
- private int leftToReceive = 0;
-
- /** Already received bytes of current message. */
- private ByteBuffer currentMessage = null;
-
- /** Marshaller. */
- private final GridBinaryMarshaller marsh;
-
- /** Logger. */
- protected final IgniteLogger log;
-
- /**
- * @param ctx Kernel context.
- */
- public OdbcParser(GridKernalContext ctx) {
- CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects();
-
- marsh = cacheObjProc.marshaller();
-
- log = ctx.log(getClass());
- }
-
- /**
- * Process data chunk and try to construct new message using stored and
- * freshly received data.
- *
- * @param buf Fresh data buffer.
- * @return Instance of the {@link BinaryReaderExImpl} positioned to read
- * from the beginning of the message on success and null otherwise.
- */
- private BinaryRawReaderEx tryConstructMessage(ByteBuffer buf) {
- if (leftToReceive != 0) {
- // Still receiving message
- int toConsume = Math.min(leftToReceive, buf.remaining());
-
- currentMessage.put(buf.array(), buf.arrayOffset(), toConsume);
- leftToReceive -= toConsume;
-
- buf.position(buf.position() + toConsume);
-
- if (leftToReceive != 0)
- return null;
-
- BinaryInputStream stream = new BinaryHeapInputStream(currentMessage.array());
-
- BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null);
-
- currentMessage = null;
-
- return reader;
- }
-
- // Receiving new message
- BinaryInputStream stream = new BinaryHeapInputStream(buf.array());
-
- BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null);
-
- // Getting message length. It's in the first four bytes of the message.
- int messageLen = reader.readInt();
-
- // Just skipping int here to sync position.
- buf.getInt();
-
- int remaining = buf.remaining();
-
- // Checking if we have not entire message in buffer.
- if (messageLen > remaining) {
- leftToReceive = messageLen - remaining;
-
- currentMessage = ByteBuffer.allocate(messageLen);
- currentMessage.put(buf);
-
- return null;
- }
-
- buf.position(buf.position() + messageLen);
-
- return reader;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public OdbcRequest decode(GridNioSession ses, ByteBuffer buf) throws IOException,
- IgniteCheckedException {
- BinaryRawReaderEx messageReader = tryConstructMessage(buf);
-
- return messageReader == null ? null : readRequest(ses, messageReader);
- }
-
- /** {@inheritDoc} */
- @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
- assert msg != null;
- assert msg instanceof OdbcResponse;
-
- if (log.isDebugEnabled())
- log.debug("Encoding query processing result");
-
- BinaryRawWriterEx writer = marsh.writer(new BinaryHeapOutputStream(INIT_CAP));
-
- // Reserving space for the message length.
- int msgLenPos = writer.reserveInt();
-
- writeResponse(ses, writer, (OdbcResponse)msg);
-
- int msgLenWithHdr = writer.out().position() - msgLenPos;
-
- int msgLen = msgLenWithHdr - 4;
-
- writer.writeInt(msgLenPos, msgLen);
-
- ByteBuffer buf = ByteBuffer.allocate(msgLenWithHdr);
-
- buf.put(writer.out().array(), msgLenPos, msgLenWithHdr);
-
- buf.flip();
-
- return buf;
- }
-
- /**
- * Read ODBC request from the raw data using provided {@link BinaryReaderExImpl} instance.
- *
- * @param ses Current session.
- * @param reader Reader positioned to read the request.
- * @return Instance of the {@link OdbcRequest}.
- * @throws IOException if the type of the request is unknown to the parser.
- */
- private OdbcRequest readRequest(GridNioSession ses, BinaryRawReaderEx reader) throws IOException {
- OdbcRequest res;
-
- byte cmd = reader.readByte();
-
- switch (cmd) {
- case OdbcRequest.EXECUTE_SQL_QUERY: {
-
- String cache = reader.readString();
- String sql = reader.readString();
- int argsNum = reader.readInt();
-
- if (log.isDebugEnabled()) {
- log.debug("Message EXECUTE_SQL_QUERY:");
- log.debug("cache: " + cache);
- log.debug("query: " + sql);
- log.debug("argsNum: " + argsNum);
- }
-
- Object[] params = new Object[argsNum];
-
- for (int i = 0; i < argsNum; ++i)
- params[i] = reader.readObjectDetached();
-
- res = new OdbcQueryExecuteRequest(cache, sql, params);
-
- break;
- }
-
- case OdbcRequest.FETCH_SQL_QUERY: {
-
- long queryId = reader.readLong();
- int pageSize = reader.readInt();
-
- if (log.isDebugEnabled()) {
- log.debug("Message FETCH_SQL_QUERY:");
- log.debug("queryId: " + queryId);
- log.debug("pageSize: " + pageSize);
- }
-
- res = new OdbcQueryFetchRequest(queryId, pageSize);
-
- break;
- }
-
- case OdbcRequest.CLOSE_SQL_QUERY: {
-
- long queryId = reader.readLong();
-
- if (log.isDebugEnabled()) {
- log.debug("Message CLOSE_SQL_QUERY:");
- log.debug("queryId: " + queryId);
- }
-
- res = new OdbcQueryCloseRequest(queryId);
-
- break;
- }
-
- case OdbcRequest.GET_COLUMNS_META: {
-
- String cache = reader.readString();
- String table = reader.readString();
- String column = reader.readString();
-
- if (log.isDebugEnabled()) {
- log.debug("Message GET_COLUMNS_META:");
- log.debug("cache: " + cache);
- log.debug("table: " + table);
- log.debug("column: " + column);
- }
-
- res = new OdbcQueryGetColumnsMetaRequest(cache, table, column);
-
- break;
- }
-
- case OdbcRequest.GET_TABLES_META: {
-
- String catalog = reader.readString();
- String schema = reader.readString();
- String table = reader.readString();
- String tableType = reader.readString();
-
- if (log.isDebugEnabled()) {
- log.debug("Message GET_COLUMNS_META:");
- log.debug("catalog: " + catalog);
- log.debug("schema: " + schema);
- log.debug("table: " + table);
- log.debug("tableType: " + tableType);
- }
-
- res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType);
-
- break;
- }
-
- default:
- throw new IOException("Failed to parse incoming packet (unknown command type) [ses=" + ses +
- ", cmd=[" + Byte.toString(cmd) + ']');
- }
-
- return res;
- }
-
- /**
- * Write ODBC response using provided {@link BinaryRawWriterEx} instance.
- *
- * @param ses Current session.
- * @param writer Writer.
- * @param rsp ODBC response that should be written.
- * @throws IOException if the type of the response is unknown to the parser.
- */
- private void writeResponse(GridNioSession ses, BinaryRawWriterEx writer, OdbcResponse rsp) throws IOException {
- // Writing status
- writer.writeByte((byte)rsp.status());
-
- if (rsp.status() != OdbcResponse.STATUS_SUCCESS) {
- writer.writeString(rsp.error());
-
- return;
- }
-
- Object res0 = rsp.response();
-
- if (res0 instanceof OdbcQueryExecuteResult) {
- OdbcQueryExecuteResult res = (OdbcQueryExecuteResult) res0;
-
- if (log.isDebugEnabled())
- log.debug("Resulting query ID: " + res.getQueryId());
-
- writer.writeLong(res.getQueryId());
-
- Collection<OdbcColumnMeta> metas = res.getColumnsMetadata();
-
- assert metas != null;
-
- writer.writeInt(metas.size());
-
- for (OdbcColumnMeta meta : metas)
- meta.writeBinary(writer, marsh.context());
-
- }
- else if (res0 instanceof OdbcQueryFetchResult) {
- OdbcQueryFetchResult res = (OdbcQueryFetchResult) res0;
-
- if (log.isDebugEnabled())
- log.debug("Resulting query ID: " + res.queryId());
-
- writer.writeLong(res.queryId());
-
- Collection<?> items0 = res.items();
-
- assert items0 != null;
-
- writer.writeBoolean(res.last());
-
- writer.writeInt(items0.size());
-
- for (Object row0 : items0) {
- if (row0 != null) {
- Collection<?> row = (Collection<?>)row0;
-
- writer.writeInt(row.size());
-
- for (Object obj : row)
- writer.writeObjectDetached(obj);
- }
- }
- }
- else if (res0 instanceof OdbcQueryCloseResult) {
- OdbcQueryCloseResult res = (OdbcQueryCloseResult) res0;
-
- if (log.isDebugEnabled())
- log.debug("Resulting query ID: " + res.getQueryId());
-
- writer.writeLong(res.getQueryId());
-
- }
- else if (res0 instanceof OdbcQueryGetColumnsMetaResult) {
- OdbcQueryGetColumnsMetaResult res = (OdbcQueryGetColumnsMetaResult) res0;
-
- Collection<OdbcColumnMeta> columnsMeta = res.meta();
-
- assert columnsMeta != null;
-
- writer.writeInt(columnsMeta.size());
-
- for (OdbcColumnMeta columnMeta : columnsMeta)
- columnMeta.writeBinary(writer, marsh.context());
-
- }
- else if (res0 instanceof OdbcQueryGetTablesMetaResult) {
- OdbcQueryGetTablesMetaResult res = (OdbcQueryGetTablesMetaResult) res0;
-
- Collection<OdbcTableMeta> tablesMeta = res.meta();
-
- assert tablesMeta != null;
-
- writer.writeInt(tablesMeta.size());
-
- for (OdbcTableMeta tableMeta : tablesMeta)
- tableMeta.writeBinary(writer);
-
- }
- else
- throw new IOException("Failed to serialize response packet (unknown response type) [ses=" + ses + "]");
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9453392c/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
index d19aef8..5d9e01f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
@@ -18,18 +18,17 @@
package org.apache.ignite.internal.processors.odbc;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.OdbcConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.IgnitePortProtocol;
-import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteOrder;
@@ -37,12 +36,12 @@ import java.nio.ByteOrder;
* ODBC processor.
*/
public class OdbcProcessor extends GridProcessorAdapter {
- /** OBCD TCP Server. */
- private GridNioServer<OdbcRequest> srv;
-
/** Busy lock. */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+ /** OBCD TCP Server. */
+ private GridNioServer<OdbcRequest> srv;
+
/**
* @param ctx Kernal context.
*/
@@ -52,151 +51,66 @@ public class OdbcProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
- if (isOdbcEnabled()) {
- Marshaller marsh = ctx.config().getMarshaller();
+ OdbcConfiguration odbcCfg = ctx.config().getOdbcConfiguration();
- if (marsh != null && !(marsh instanceof BinaryMarshaller))
- throw new IgniteCheckedException("ODBC may only be used with BinaryMarshaller.");
+ if (odbcCfg != null) {
+ try {
+ Marshaller marsh = ctx.config().getMarshaller();
- OdbcConfiguration cfg = ctx.config().getOdbcConfiguration();
+ if (marsh != null && !(marsh instanceof BinaryMarshaller))
+ throw new IgniteCheckedException("ODBC can only be used with BinaryMarshaller (please set it " +
+ "through IgniteConfiguration.setMarshaller())");
- assert cfg != null;
+ String hostStr = odbcCfg.getHost();
- GridNioServerListener<OdbcRequest> listener = new OdbcCommandHandler(ctx, busyLock);
+ if (hostStr == null)
+ hostStr = ctx.config().getLocalHost();
- GridNioParser parser = new OdbcParser(ctx);
+ InetAddress host = U.resolveLocalHost(hostStr);
- try {
- InetAddress host = resolveOdbcTcpHost(ctx.config());
+ int port = odbcCfg.getPort();
- int port = cfg.getPort();
+ srv = GridNioServer.<OdbcRequest>builder()
+ .address(host)
+ .port(port)
+ .listener(new OdbcNioListener(ctx, busyLock))
+ .logger(log)
+ .selectorCount(odbcCfg.getSelectorCount())
+ .gridName(ctx.gridName())
+ .tcpNoDelay(odbcCfg.isNoDelay())
+ .directBuffer(odbcCfg.isDirectBuffer())
+ .byteOrder(ByteOrder.nativeOrder())
+ .socketSendBufferSize(odbcCfg.getSendBufferSize())
+ .socketReceiveBufferSize(odbcCfg.getReceiveBufferSize())
+ .sendQueueLimit(odbcCfg.getSendQueueLimit())
+ .filters(new GridNioCodecFilter(new OdbcNioParser(ctx), log, false))
+ .directMode(false)
+ .idleTimeout(odbcCfg.getIdleTimeout())
+ .build();
- if (startTcpServer(host, port, listener, parser, cfg)) {
- if (log.isDebugEnabled())
- log.debug("ODBC Server has started on TCP port " + port);
+ srv.start();
- return;
- }
+ ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass());
- U.warn(log, "Failed to start ODBC server (possibly all ports in range are in use) " +
- "[port=" + port + ", host=" + host + ']');
+ log.info("ODBC processor has started on TCP port " + port);
}
- catch (IOException e) {
- U.warn(log, "Failed to start ODBC server: " + e.getMessage(),
- "Failed to start ODBC server. Check odbcTcpHost configuration property.");
+ catch (Exception e) {
+ throw new IgniteCheckedException("Failed to start ODBC processor.", e);
}
}
}
/** {@inheritDoc} */
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- if (isOdbcEnabled()) {
- if (srv != null) {
- ctx.ports().deregisterPorts(getClass());
-
- srv.stop();
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- if (isOdbcEnabled()) {
- if (log.isDebugEnabled())
- log.debug("ODBC processor started.");
- }
- }
-
- /** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
- if (isOdbcEnabled()) {
+ if (srv != null) {
busyLock.block();
- if (log.isDebugEnabled())
- log.debug("ODBC processor stopped.");
- }
- }
-
- /**
- * Check if the ODBC is enabled.
- *
- * @return Whether or not ODBC is enabled.
- */
- public boolean isOdbcEnabled() {
- return ctx.config().getOdbcConfiguration() != null;
- }
-
- /**
- * Resolves host for server using grid configuration.
- *
- * @param cfg Grid configuration.
- * @return Host address.
- * @throws IOException If failed to resolve host.
- */
- private static InetAddress resolveOdbcTcpHost(IgniteConfiguration cfg) throws IOException {
- String host = null;
-
- OdbcConfiguration odbcCfg = cfg.getOdbcConfiguration();
-
- if (odbcCfg != null)
- host = odbcCfg.getHost();
-
- if (host == null)
- host = cfg.getLocalHost();
-
- return U.resolveLocalHost(host);
- }
-
- /**
- * Tries to start server with given parameters.
- *
- * @param hostAddr Host on which server should be bound.
- * @param port Port on which server should be bound.
- * @param listener Server message listener.
- * @param parser Server message parser.
- * @param cfg Configuration for other parameters.
- * @return {@code True} if server successfully started, {@code false} if port is used and
- * server was unable to start.
- */
- private boolean startTcpServer(InetAddress hostAddr, int port, GridNioServerListener<OdbcRequest> listener,
- GridNioParser parser, OdbcConfiguration cfg) {
- try {
- GridNioFilter codec = new GridNioCodecFilter(parser, log, false);
+ srv.stop();
- GridNioFilter[] filters;
+ ctx.ports().deregisterPorts(getClass());
- filters = new GridNioFilter[] { codec };
-
- srv = GridNioServer.<OdbcRequest>builder()
- .address(hostAddr)
- .port(port)
- .listener(listener)
- .logger(log)
- .selectorCount(cfg.getSelectorCount())
- .gridName(ctx.gridName())
- .tcpNoDelay(cfg.isNoDelay())
- .directBuffer(cfg.isDirectBuffer())
- .byteOrder(ByteOrder.nativeOrder())
- .socketSendBufferSize(cfg.getSendBufferSize())
- .socketReceiveBufferSize(cfg.getReceiveBufferSize())
- .sendQueueLimit(cfg.getSendQueueLimit())
- .filters(filters)
- .directMode(false)
- .build();
-
- srv.idleTimeout(cfg.getIdleTimeout());
-
- srv.start();
-
- ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass());
-
- return true;
- }
- catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
- log.debug("Failed to start ODBC server on port " + port + ": " + e.getMessage());
-
- return false;
+ log.debug("ODBC processor stopped.");
}
}
}