You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/25 12:50:21 UTC
ignite git commit: IGNITE-2663: Added Protocol version field to
OdbcProcesor. This closes #541.
Repository: ignite
Updated Branches:
refs/heads/ignite-1786 a86cf9ba8 -> 63ed28519
IGNITE-2663: Added Protocol version field to OdbcProcesor. This closes #541.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/63ed2851
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/63ed2851
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/63ed2851
Branch: refs/heads/ignite-1786
Commit: 63ed2851942a6403e256bc404a55aa9faea2c480
Parents: a86cf9b
Author: isapego <is...@gridgain.com>
Authored: Fri Mar 25 14:50:13 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 25 14:50:13 2016 +0300
----------------------------------------------------------------------
.../processors/odbc/OdbcHandshakeRequest.java | 49 ++++
.../processors/odbc/OdbcHandshakeResult.java | 66 +++++
.../processors/odbc/OdbcMessageParser.java | 277 +++++++++++++++++++
.../processors/odbc/OdbcNioListener.java | 260 ++++-------------
.../internal/processors/odbc/OdbcRequest.java | 13 +-
.../processors/odbc/OdbcRequestHandler.java | 32 ++-
.../cpp/odbc/include/ignite/odbc/connection.h | 55 ++--
.../cpp/odbc/include/ignite/odbc/message.h | 135 ++++++++-
.../cpp/odbc/include/ignite/odbc/parser.h | 3 -
modules/platforms/cpp/odbc/src/connection.cpp | 133 ++++++---
.../odbc/src/query/column_metadata_query.cpp | 10 +-
.../platforms/cpp/odbc/src/query/data_query.cpp | 33 ++-
.../cpp/odbc/src/query/table_metadata_query.cpp | 10 +-
13 files changed, 782 insertions(+), 294 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/63ed2851/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcHandshakeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcHandshakeRequest.java
new file mode 100644
index 0000000..5e09041
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcHandshakeRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * ODBC handshake request.
+ */
+public class OdbcHandshakeRequest extends OdbcRequest {
+ /** Protocol version. */
+ private final long ver;
+
+ /**
+ * @param ver Protocol version.
+ */
+ public OdbcHandshakeRequest(long ver) {
+ super(HANDSHAKE);
+
+ this.ver = ver;
+ }
+
+ /**
+ * @return Protocol version.
+ */
+ public long version() {
+ return ver;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(OdbcHandshakeRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/63ed2851/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcHandshakeResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcHandshakeResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcHandshakeResult.java
new file mode 100644
index 0000000..bf1c61e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcHandshakeResult.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * ODBC handshake result.
+ */
+public class OdbcHandshakeResult {
+ /** Handshake accepted. */
+ private final boolean accepted;
+
+ /** Apache Ignite version when protocol version has been introduced. */
+ private final String protoVerSince;
+
+ /** Current Apache Ignite version. */
+ private final String curVer;
+
+ /**
+ * @param accepted Handshake accepted.
+ * @param protoVerSince Apache Ignite version when protocol version has been introduced.
+ * @param curVer Current Apache Ignite version.
+ */
+ public OdbcHandshakeResult(boolean accepted, @Nullable String protoVerSince, @Nullable String curVer) {
+ this.accepted = accepted;
+ this.protoVerSince = protoVerSince;
+ this.curVer = curVer;
+ }
+
+ /**
+ * @return Query ID.
+ */
+ public boolean accepted() {
+ return accepted;
+ }
+
+ /**
+ * @return Apache Ignite version when protocol version has been introduced.
+ */
+ @Nullable public String protoVerSince() {
+ return protoVerSince;
+ }
+
+ /**
+ * @return Current Apache Ignite version.
+ */
+ @Nullable public String currentVer() {
+ return curVer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/63ed2851/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcMessageParser.java
new file mode 100644
index 0000000..ba0c4f1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcMessageParser.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+
+import java.util.Collection;
+
+/**
+ * ODBC message parser.
+ */
+public class OdbcMessageParser {
+ /** Current ODBC communication protocol version. */
+ public static final long PROTO_VER = 1;
+
+ /** Apache Ignite version when ODBC communication protocol version has been introduced. */
+ public static final String PROTO_VER_SINCE = "1.6.0";
+
+ /** Initial output stream capacity. */
+ private static final int INIT_CAP = 1024;
+
+ /** Marshaller. */
+ private final GridBinaryMarshaller marsh;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Protocol version confirmation flag. */
+ private boolean verConfirmed = false;
+
+ /**
+ * @param ctx Context.
+ */
+ public OdbcMessageParser(final GridKernalContext ctx) {
+ CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects();
+
+ this.marsh = cacheObjProc.marshaller();
+
+ this.log = ctx.log(getClass());
+ }
+
+ /**
+ * Decode OdbcRequest from byte array.
+ *
+ * @param msg Message.
+ * @return Assembled ODBC request.
+ */
+ public OdbcRequest decode(byte[] msg) {
+ assert msg != null;
+
+ BinaryInputStream stream = new BinaryHeapInputStream(msg);
+
+ BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null);
+
+ byte cmd = reader.readByte();
+
+ // This is a special case because we can not decode protocol messages until
+ // we has not confirmed that the remote client uses the same protocol version.
+ if (!verConfirmed) {
+ if (cmd == OdbcRequest.HANDSHAKE)
+ return new OdbcHandshakeRequest(reader.readLong());
+ else
+ throw new IgniteException("Unexpected ODBC command (first message is not a handshake request): [cmd=" +
+ cmd + ']');
+ }
+
+ OdbcRequest res;
+
+ switch (cmd) {
+ case OdbcRequest.EXECUTE_SQL_QUERY: {
+ String cache = reader.readString();
+ String sql = reader.readString();
+ int argsNum = reader.readInt();
+
+ Object[] params = new Object[argsNum];
+
+ for (int i = 0; i < argsNum; ++i)
+ params[i] = reader.readObjectDetached();
+
+ res = new OdbcQueryExecuteRequest(cache, sql, params);
+
+ break;
+ }
+
+ case OdbcRequest.FETCH_SQL_QUERY: {
+ long queryId = reader.readLong();
+ int pageSize = reader.readInt();
+
+ res = new OdbcQueryFetchRequest(queryId, pageSize);
+
+ break;
+ }
+
+ case OdbcRequest.CLOSE_SQL_QUERY: {
+ long queryId = reader.readLong();
+
+ res = new OdbcQueryCloseRequest(queryId);
+
+ break;
+ }
+
+ case OdbcRequest.GET_COLUMNS_META: {
+ String cache = reader.readString();
+ String table = reader.readString();
+ String column = reader.readString();
+
+ res = new OdbcQueryGetColumnsMetaRequest(cache, table, column);
+
+ break;
+ }
+
+ case OdbcRequest.GET_TABLES_META: {
+ String catalog = reader.readString();
+ String schema = reader.readString();
+ String table = reader.readString();
+ String tableType = reader.readString();
+
+ res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType);
+
+ break;
+ }
+
+ default:
+ throw new IgniteException("Unknown ODBC command: [cmd=" + cmd + ']');
+ }
+
+ return res;
+ }
+
+ /**
+ * Encode OdbcResponse to byte array.
+ *
+ * @param msg Message.
+ * @return Byte array.
+ */
+ public byte[] encode(OdbcResponse msg) {
+ assert msg != null;
+
+ // Creating new binary writer
+ BinaryWriterExImpl writer = marsh.writer(new BinaryHeapOutputStream(INIT_CAP));
+
+ // Writing status.
+ writer.writeByte((byte) msg.status());
+
+ if (msg.status() != OdbcResponse.STATUS_SUCCESS) {
+ writer.writeString(msg.error());
+
+ return writer.array();
+ }
+
+ Object res0 = msg.response();
+
+ if (res0 instanceof OdbcHandshakeResult) {
+ OdbcHandshakeResult res = (OdbcHandshakeResult) res0;
+
+ if (log.isDebugEnabled())
+ log.debug("Handshake result: " + (res.accepted() ? "accepted" : "rejected"));
+
+ verConfirmed = res.accepted();
+
+ if (res.accepted()) {
+ verConfirmed = true;
+
+ writer.writeBoolean(true);
+ }
+ else {
+ writer.writeBoolean(false);
+ writer.writeString(res.protoVerSince());
+ writer.writeString(res.currentVer());
+ }
+ }
+ else if (res0 instanceof OdbcQueryExecuteResult) {
+ OdbcQueryExecuteResult res = (OdbcQueryExecuteResult) res0;
+
+ if (log.isDebugEnabled())
+ log.debug("Resulting query ID: " + res.getQueryId());
+
+ writer.writeLong(res.getQueryId());
+
+ Collection<OdbcColumnMeta> metas = res.getColumnsMetadata();
+
+ assert metas != null;
+
+ writer.writeInt(metas.size());
+
+ for (OdbcColumnMeta meta : metas)
+ meta.writeBinary(writer, marsh.context());
+ }
+ else if (res0 instanceof OdbcQueryFetchResult) {
+ OdbcQueryFetchResult res = (OdbcQueryFetchResult) res0;
+
+ if (log.isDebugEnabled())
+ log.debug("Resulting query ID: " + res.queryId());
+
+ writer.writeLong(res.queryId());
+
+ Collection<?> items0 = res.items();
+
+ assert items0 != null;
+
+ writer.writeBoolean(res.last());
+
+ writer.writeInt(items0.size());
+
+ for (Object row0 : items0) {
+ if (row0 != null) {
+ Collection<?> row = (Collection<?>)row0;
+
+ writer.writeInt(row.size());
+
+ for (Object obj : row)
+ writer.writeObjectDetached(obj);
+ }
+ }
+ }
+ else if (res0 instanceof OdbcQueryCloseResult) {
+ OdbcQueryCloseResult res = (OdbcQueryCloseResult) res0;
+
+ if (log.isDebugEnabled())
+ log.debug("Resulting query ID: " + res.getQueryId());
+
+ writer.writeLong(res.getQueryId());
+ }
+ else if (res0 instanceof OdbcQueryGetColumnsMetaResult) {
+ OdbcQueryGetColumnsMetaResult res = (OdbcQueryGetColumnsMetaResult) res0;
+
+ Collection<OdbcColumnMeta> columnsMeta = res.meta();
+
+ assert columnsMeta != null;
+
+ writer.writeInt(columnsMeta.size());
+
+ for (OdbcColumnMeta columnMeta : columnsMeta)
+ columnMeta.writeBinary(writer, marsh.context());
+ }
+ else if (res0 instanceof OdbcQueryGetTablesMetaResult) {
+ OdbcQueryGetTablesMetaResult res = (OdbcQueryGetTablesMetaResult) res0;
+
+ Collection<OdbcTableMeta> tablesMeta = res.meta();
+
+ assert tablesMeta != null;
+
+ writer.writeInt(tablesMeta.size());
+
+ for (OdbcTableMeta tableMeta : tablesMeta)
+ tableMeta.writeBinary(writer);
+ }
+ else
+ assert false : "Should not reach here.";
+
+ return writer.array();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/63ed2851/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
index c8b53e7..28b2b5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
@@ -17,34 +17,22 @@
package org.apache.ignite.internal.processors.odbc;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.binary.BinaryReaderExImpl;
-import org.apache.ignite.internal.binary.BinaryWriterExImpl;
-import org.apache.ignite.internal.binary.GridBinaryMarshaller;
-import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
-import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
-import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.jetbrains.annotations.Nullable;
-import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
/**
* ODBC message listener.
*/
public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
- /** Initial output stream capacity. */
- private static final int INIT_CAP = 1024;
-
- /** Handler metadata key. */
- private static final int HANDLER_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+ /** Connection-related metadata key. */
+ private static final int CONNECTION_DATA_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
/** Request ID generator. */
private static final AtomicLong REQ_ID_GEN = new AtomicLong();
@@ -55,9 +43,6 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
/** Kernal context. */
private final GridKernalContext ctx;
- /** Marshaller. */
- private final GridBinaryMarshaller marsh;
-
/** Logger. */
private final IgniteLogger log;
@@ -68,11 +53,6 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
public OdbcNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock) {
this.ctx = ctx;
this.busyLock = busyLock;
-
- CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects();
-
- this.marsh = cacheObjProc.marshaller();
-
this.log = ctx.log(getClass());
}
@@ -81,7 +61,7 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
if (log.isDebugEnabled())
log.debug("ODBC client connected: " + ses.remoteAddress());
- ses.addMeta(HANDLER_META_KEY, new OdbcRequestHandler(ctx, busyLock));
+ ses.addMeta(CONNECTION_DATA_META_KEY, new ConnectionData(ctx, busyLock));
}
/** {@inheritDoc} */
@@ -100,10 +80,29 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
long reqId = REQ_ID_GEN.incrementAndGet();
+ ConnectionData connData = ses.meta(CONNECTION_DATA_META_KEY);
+
+ assert connData != null;
+
+ OdbcMessageParser parser = connData.getParser();
+
+ OdbcRequest req;
+
try {
- long startTime = 0;
+ req = parser.decode(msg);
+ }
+ catch (Exception e) {
+ log.error("Failed to parse message [id=" + reqId + ", err=" + e + ']');
- OdbcRequest req = decode(msg);
+ ses.close();
+
+ return;
+ }
+
+ assert req != null;
+
+ try {
+ long startTime = 0;
if (log.isDebugEnabled()) {
startTime = System.nanoTime();
@@ -112,9 +111,7 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
", req=" + req + ']');
}
- OdbcRequestHandler handler = ses.meta(HANDLER_META_KEY);
-
- assert handler != null;
+ OdbcRequestHandler handler = connData.getHandler();
OdbcResponse resp = handler.handle(req);
@@ -125,199 +122,50 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
", resp=" + resp.status() + ']');
}
- byte[] outMsg = encode(resp);
+ byte[] outMsg = parser.encode(resp);
ses.send(outMsg);
}
catch (Exception e) {
log.error("Failed to process ODBC request [id=" + reqId + ", err=" + e + ']');
- ses.send(encode(new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage())));
- }
- }
-
- /**
- * Decode OdbcRequest from byte array.
- *
- * @param msg Message.
- * @return Assembled ODBC request.
- */
- private OdbcRequest decode(byte[] msg) {
- assert msg != null;
-
- BinaryInputStream stream = new BinaryHeapInputStream(msg);
-
- BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null);
-
- OdbcRequest res;
-
- byte cmd = reader.readByte();
-
- switch (cmd) {
- case OdbcRequest.EXECUTE_SQL_QUERY: {
- String cache = reader.readString();
- String sql = reader.readString();
- int argsNum = reader.readInt();
-
- Object[] params = new Object[argsNum];
-
- for (int i = 0; i < argsNum; ++i)
- params[i] = reader.readObjectDetached();
-
- res = new OdbcQueryExecuteRequest(cache, sql, params);
-
- break;
- }
-
- case OdbcRequest.FETCH_SQL_QUERY: {
- long queryId = reader.readLong();
- int pageSize = reader.readInt();
-
- res = new OdbcQueryFetchRequest(queryId, pageSize);
-
- break;
- }
-
- case OdbcRequest.CLOSE_SQL_QUERY: {
- long queryId = reader.readLong();
-
- res = new OdbcQueryCloseRequest(queryId);
-
- break;
- }
-
- case OdbcRequest.GET_COLUMNS_META: {
-
- String cache = reader.readString();
- String table = reader.readString();
- String column = reader.readString();
-
- res = new OdbcQueryGetColumnsMetaRequest(cache, table, column);
-
- break;
- }
-
- case OdbcRequest.GET_TABLES_META: {
- String catalog = reader.readString();
- String schema = reader.readString();
- String table = reader.readString();
- String tableType = reader.readString();
-
- res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType);
-
- break;
- }
-
- default:
- throw new IgniteException("Unknown ODBC command: " + cmd);
+ ses.send(parser.encode(new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage())));
}
-
- return res;
}
/**
- * Encode OdbcResponse to byte array.
- *
- * @param msg Message.
- * @return Byte array.
+ * Connection-related data.
*/
- private byte[] encode(OdbcResponse msg) {
- assert msg != null;
-
- // Creating new binary writer
- BinaryWriterExImpl writer = marsh.writer(new BinaryHeapOutputStream(INIT_CAP));
-
- // Writing status
- writer.writeByte((byte) msg.status());
-
- if (msg.status() != OdbcResponse.STATUS_SUCCESS) {
- writer.writeString(msg.error());
-
- return writer.array();
+ private static class ConnectionData {
+ /** Request handler. */
+ private final OdbcRequestHandler handler;
+
+ /** Message parser. */
+ private final OdbcMessageParser parser;
+
+ /**
+ * @param ctx Context.
+ * @param busyLock Shutdown busy lock.
+ */
+ public ConnectionData(GridKernalContext ctx, GridSpinBusyLock busyLock) {
+ handler = new OdbcRequestHandler(ctx, busyLock);
+ parser = new OdbcMessageParser(ctx);
}
- Object res0 = msg.response();
-
- if (res0 instanceof OdbcQueryExecuteResult) {
- OdbcQueryExecuteResult res = (OdbcQueryExecuteResult) res0;
-
- if (log.isDebugEnabled())
- log.debug("Resulting query ID: " + res.getQueryId());
-
- writer.writeLong(res.getQueryId());
-
- Collection<OdbcColumnMeta> metas = res.getColumnsMetadata();
-
- assert metas != null;
-
- writer.writeInt(metas.size());
-
- for (OdbcColumnMeta meta : metas)
- meta.writeBinary(writer, marsh.context());
-
+ /**
+ * Handler getter.
+ * @return Request handler for the connection.
+ */
+ public OdbcRequestHandler getHandler() {
+ return handler;
}
- else if (res0 instanceof OdbcQueryFetchResult) {
- OdbcQueryFetchResult res = (OdbcQueryFetchResult) res0;
-
- if (log.isDebugEnabled())
- log.debug("Resulting query ID: " + res.queryId());
- writer.writeLong(res.queryId());
-
- Collection<?> items0 = res.items();
-
- assert items0 != null;
-
- writer.writeBoolean(res.last());
-
- writer.writeInt(items0.size());
-
- for (Object row0 : items0) {
- if (row0 != null) {
- Collection<?> row = (Collection<?>)row0;
-
- writer.writeInt(row.size());
-
- for (Object obj : row)
- writer.writeObjectDetached(obj);
- }
- }
- }
- else if (res0 instanceof OdbcQueryCloseResult) {
- OdbcQueryCloseResult res = (OdbcQueryCloseResult) res0;
-
- if (log.isDebugEnabled())
- log.debug("Resulting query ID: " + res.getQueryId());
-
- writer.writeLong(res.getQueryId());
+ /**
+ * Parser getter
+ * @return Message parser for the connection.
+ */
+ public OdbcMessageParser getParser() {
+ return parser;
}
- else if (res0 instanceof OdbcQueryGetColumnsMetaResult) {
- OdbcQueryGetColumnsMetaResult res = (OdbcQueryGetColumnsMetaResult) res0;
-
- Collection<OdbcColumnMeta> columnsMeta = res.meta();
-
- assert columnsMeta != null;
-
- writer.writeInt(columnsMeta.size());
-
- for (OdbcColumnMeta columnMeta : columnsMeta)
- columnMeta.writeBinary(writer, marsh.context());
- }
- else if (res0 instanceof OdbcQueryGetTablesMetaResult) {
- OdbcQueryGetTablesMetaResult res = (OdbcQueryGetTablesMetaResult) res0;
-
- Collection<OdbcTableMeta> tablesMeta = res.meta();
-
- assert tablesMeta != null;
-
- writer.writeInt(tablesMeta.size());
-
- for (OdbcTableMeta tableMeta : tablesMeta)
- tableMeta.writeBinary(writer);
- }
- else
- assert false : "Should nor reach here.";
-
- return writer.array();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/63ed2851/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequest.java
index 7df7474..ebecb60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequest.java
@@ -21,20 +21,23 @@ package org.apache.ignite.internal.processors.odbc;
* ODBC command request.
*/
public class OdbcRequest {
+ /** Handshake request. */
+ public static final int HANDSHAKE = 1;
+
/** Execute sql query. */
- public static final int EXECUTE_SQL_QUERY = 1;
+ public static final int EXECUTE_SQL_QUERY = 2;
/** Fetch query results. */
- public static final int FETCH_SQL_QUERY = 2;
+ public static final int FETCH_SQL_QUERY = 3;
/** Close query. */
- public static final int CLOSE_SQL_QUERY = 3;
+ public static final int CLOSE_SQL_QUERY = 4;
/** Get columns meta query. */
- public static final int GET_COLUMNS_META = 4;
+ public static final int GET_COLUMNS_META = 5;
/** Get columns meta query. */
- public static final int GET_TABLES_META = 5;
+ public static final int GET_TABLES_META = 6;
/** Command. */
private final int cmd;
http://git-wip-us.apache.org/repos/asf/ignite/blob/63ed2851/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
index 1af14b3..9aba295 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteProductVersion;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -77,14 +78,17 @@ public class OdbcRequestHandler {
try {
switch (req.command()) {
+ case HANDSHAKE:
+ return performHandshake((OdbcHandshakeRequest) req);
+
case EXECUTE_SQL_QUERY:
- return executeQuery((OdbcQueryExecuteRequest)req);
+ return executeQuery((OdbcQueryExecuteRequest) req);
case FETCH_SQL_QUERY:
- return fetchQuery((OdbcQueryFetchRequest)req);
+ return fetchQuery((OdbcQueryFetchRequest) req);
case CLOSE_SQL_QUERY:
- return closeQuery((OdbcQueryCloseRequest)req);
+ return closeQuery((OdbcQueryCloseRequest) req);
case GET_COLUMNS_META:
return getColumnsMeta((OdbcQueryGetColumnsMetaRequest) req);
@@ -101,6 +105,28 @@ public class OdbcRequestHandler {
}
/**
+ * {@link OdbcHandshakeRequest} command handler.
+ *
+ * @param req Handshake request.
+ * @return Response.
+ */
+ private OdbcResponse performHandshake(OdbcHandshakeRequest req) {
+ OdbcHandshakeResult res;
+
+ if (req.version() == OdbcMessageParser.PROTO_VER)
+ res = new OdbcHandshakeResult(true, null, null);
+ else {
+ IgniteProductVersion ver = ctx.grid().version();
+
+ String verStr = Byte.toString(ver.major()) + '.' + ver.minor() + '.' + ver.maintenance();
+
+ res = new OdbcHandshakeResult(false, OdbcMessageParser.PROTO_VER_SINCE, verStr);
+ }
+
+ return new OdbcResponse(res);
+ }
+
+ /**
* {@link OdbcQueryExecuteRequest} command handler.
*
* @param req Execute query request.
http://git-wip-us.apache.org/repos/asf/ignite/blob/63ed2851/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h b/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
index 56037f5..116fce3 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h
@@ -40,6 +40,15 @@ namespace ignite
{
friend class Environment;
public:
+ /** ODBC communication protocol version. */
+ enum { PROTOCOL_VERSION = 1 };
+
+ /**
+ * Apache Ignite version when the current ODBC communication
+ * protocol version has been introduced.
+ */
+ static const std::string PROTOCOL_VERSION_SINCE;
+
/**
* Destructor.
*/
@@ -97,17 +106,15 @@ namespace ignite
*
* @param data Data buffer.
* @param len Data length.
- * @return True on success.
*/
- bool Send(const int8_t* data, size_t len);
+ void Send(const int8_t* data, size_t len);
/**
* Receive next message.
*
* @param msg Buffer for message.
- * @return True on success.
*/
- bool Receive(std::vector<int8_t>& msg);
+ void Receive(std::vector<int8_t>& msg);
/**
* Get name of the assotiated cache.
@@ -133,28 +140,19 @@ namespace ignite
*
* @param req Request message.
* @param rsp Response message.
- * @return True on success.
*/
template<typename ReqT, typename RspT>
- bool SyncMessage(const ReqT& req, RspT& rsp)
+ void SyncMessage(const ReqT& req, RspT& rsp)
{
std::vector<int8_t> tempBuffer;
parser.Encode(req, tempBuffer);
- bool requestSent = Send(tempBuffer.data(), tempBuffer.size());
-
- if (!requestSent)
- return false;
-
- bool responseReceived = Receive(tempBuffer);
+ Send(tempBuffer.data(), tempBuffer.size());
- if (!responseReceived)
- return false;
+ Receive(tempBuffer);
parser.Decode(rsp, tempBuffer);
-
- return true;
}
/**
@@ -236,6 +234,31 @@ namespace ignite
SqlResult InternalTransactionRollback();
/**
+ * Receive specified number of bytes.
+ *
+ * @param dst Buffer for data.
+ * @param len Number of bytes to receive.
+ * @return Number of successfully received bytes.
+ */
+ size_t ReceiveAll(void* dst, size_t len);
+
+ /**
+ * Send specified number of bytes.
+ *
+ * @param data Data buffer.
+ * @param len Data length.
+ * @return Number of successfully sent bytes.
+ */
+ size_t SendAll(const int8_t* data, size_t len);
+
+ /**
+ * Perform handshake request.
+ *
+ * @return Operation result.
+ */
+ SqlResult MakeRequestHandshake();
+
+ /**
* Constructor.
*/
Connection();
http://git-wip-us.apache.org/repos/asf/ignite/blob/63ed2851/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
index f4d1e3c..94509f3 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
@@ -36,15 +36,17 @@ namespace ignite
{
enum RequestType
{
- REQUEST_TYPE_EXECUTE_SQL_QUERY = 1,
+ REQUEST_TYPE_HANDSHAKE = 1,
- REQUEST_TYPE_FETCH_SQL_QUERY = 2,
+ REQUEST_TYPE_EXECUTE_SQL_QUERY = 2,
- REQUEST_TYPE_CLOSE_SQL_QUERY = 3,
+ REQUEST_TYPE_FETCH_SQL_QUERY = 3,
- REQUEST_TYPE_GET_COLUMNS_METADATA = 4,
+ REQUEST_TYPE_CLOSE_SQL_QUERY = 4,
- REQUEST_TYPE_GET_TABLES_METADATA = 5
+ REQUEST_TYPE_GET_COLUMNS_METADATA = 5,
+
+ REQUEST_TYPE_GET_TABLES_METADATA = 6
};
enum ResponseStatus
@@ -55,6 +57,46 @@ namespace ignite
};
/**
+ * Handshake request.
+ */
+ class HandshakeRequest
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param version Protocol version.
+ */
+ HandshakeRequest(int64_t version) : version(version)
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ ~HandshakeRequest()
+ {
+ // No-op.
+ }
+
+ /**
+ * Write request using provided writer.
+ * @param writer Writer.
+ */
+ void Write(ignite::impl::binary::BinaryWriterImpl& writer) const
+ {
+ writer.WriteInt8(REQUEST_TYPE_HANDSHAKE);
+
+ writer.WriteInt64(version);
+ }
+
+ private:
+ /** Protocol version. */
+ int64_t version;
+ };
+
+ /**
* Query execute request.
*/
class QueryExecuteRequest
@@ -367,9 +409,11 @@ namespace ignite
protected:
/**
* Read data if response status is RESPONSE_STATUS_SUCCESS.
- * @param reader Reader.
*/
- virtual void ReadOnSuccess(ignite::impl::binary::BinaryReaderImpl& reader) = 0;
+ virtual void ReadOnSuccess(ignite::impl::binary::BinaryReaderImpl&)
+ {
+ // No-op.
+ }
private:
/** Request processing status. */
@@ -379,6 +423,83 @@ namespace ignite
std::string error;
};
+ /**
+ * Handshake response.
+ */
+ class HandshakeResponse : public QueryResponse
+ {
+ public:
+ /**
+ * Constructor.
+ */
+ HandshakeResponse() :
+ accepted(false),
+ protoVerSince(),
+ currentVer()
+ {
+ // No-op.
+ }
+
+ /**
+ * Destructor.
+ */
+ ~HandshakeResponse()
+ {
+ // No-op.
+ }
+
+ /**
+ * Check if the handshake has been accepted.
+ * @return True if the handshake has been accepted.
+ */
+ bool IsAccepted() const
+ {
+ return accepted;
+ }
+
+ /**
+ * Get host Apache Ignite version when protocol version has been introduced.
+ * @return Host Apache Ignite version when protocol version has been introduced.
+ */
+ const std::string& ProtoVerSince() const
+ {
+ return protoVerSince;
+ }
+
+ /**
+ * Current host Apache Ignite version.
+ * @return Current host Apache Ignite version.
+ */
+ const std::string& CurrentVer() const
+ {
+ return currentVer;
+ }
+
+ private:
+ /**
+ * Read response using provided reader.
+ * @param reader Reader.
+ */
+ virtual void ReadOnSuccess(ignite::impl::binary::BinaryReaderImpl& reader)
+ {
+ accepted = reader.ReadBool();
+
+ if (!accepted)
+ {
+ utility::ReadString(reader, protoVerSince);
+ utility::ReadString(reader, currentVer);
+ }
+ }
+
+ /** Handshake accepted. */
+ bool accepted;
+
+ /** Host Apache Ignite version when protocol version has been introduced. */
+ std::string protoVerSince;
+
+ /** Current host Apache Ignite version. */
+ std::string currentVer;
+ };
/**
* Query close response.
http://git-wip-us.apache.org/repos/asf/ignite/blob/63ed2851/modules/platforms/cpp/odbc/include/ignite/odbc/parser.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/parser.h b/modules/platforms/cpp/odbc/include/ignite/odbc/parser.h
index f589531..a6c80c8 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/parser.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/parser.h
@@ -92,9 +92,6 @@ namespace ignite
{
using namespace ignite::impl::binary;
- //for (size_t i = 0; i < buf.size(); ++i)
- // LOG_MSG("Data[%0.4d] : %0.3d, %c\n", i, (int)buf[i], buf[i] > 64 && buf[i] < 128 ? buf[i] : '.');
-
if (inMem.Capacity() < static_cast<int32_t>(buf.size()))
inMem.Reallocate(static_cast<int32_t>(buf.size()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/63ed2851/modules/platforms/cpp/odbc/src/connection.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/connection.cpp b/modules/platforms/cpp/odbc/src/connection.cpp
index b58b8f9..59a25f9 100644
--- a/modules/platforms/cpp/odbc/src/connection.cpp
+++ b/modules/platforms/cpp/odbc/src/connection.cpp
@@ -22,6 +22,7 @@
#include "ignite/odbc/utility.h"
#include "ignite/odbc/statement.h"
#include "ignite/odbc/connection.h"
+#include "ignite/odbc/message.h"
#include "ignite/odbc/config/configuration.h"
// TODO: implement appropriate protocol with de-/serialisation.
@@ -39,6 +40,8 @@ namespace ignite
{
namespace odbc
{
+ const std::string Connection::PROTOCOL_VERSION_SINCE = "1.6.0";
+
Connection::Connection() : socket(), connected(false), cache(), parser()
{
// No-op.
@@ -85,7 +88,7 @@ namespace ignite
if (server != config.GetDsn())
{
- AddStatusRecord(SQL_STATE_HY000_GENERAL_ERROR, "Unknown DNS.");
+ AddStatusRecord(SQL_STATE_HY000_GENERAL_ERROR, "Unknown server.");
return SQL_RESULT_ERROR;
}
@@ -118,7 +121,7 @@ namespace ignite
return SQL_RESULT_ERROR;
}
- return SQL_RESULT_SUCCESS;
+ return MakeRequestHandshake();
}
void Connection::Release()
@@ -165,77 +168,97 @@ namespace ignite
return SQL_RESULT_SUCCESS;
}
- bool Connection::Send(const int8_t* data, size_t len)
+ void Connection::Send(const int8_t* data, size_t len)
{
if (!connected)
- return false;
+ IGNITE_ERROR_1(IgniteError::IGNITE_ERR_ILLEGAL_STATE, "Connection is not established");
OdbcProtocolHeader hdr;
hdr.len = static_cast<int32_t>(len);
- int sent = socket.Send(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr));
-
- LOG_MSG("Sent: %d\n", sent);
+ size_t sent = SendAll(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr));
if (sent != sizeof(hdr))
- return false;
+ IGNITE_ERROR_1(IgniteError::IGNITE_ERR_GENERIC, "Can not send message header");
+
+ sent = SendAll(data, len);
+
+ if (sent != len)
+ IGNITE_ERROR_1(IgniteError::IGNITE_ERR_GENERIC, "Can not send message body");
+ }
- sent = 0;
+ size_t Connection::SendAll(const int8_t* data, size_t len)
+ {
+ int sent = 0;
- while (sent != len)
+ while (sent != len)
{
int res = socket.Send(data + sent, len - sent);
LOG_MSG("Sent: %d\n", res);
if (res <= 0)
- return false;
+ return sent;
sent += res;
}
- return true;
+ return sent;
}
- bool Connection::Receive(std::vector<int8_t>& msg)
+ void Connection::Receive(std::vector<int8_t>& msg)
{
if (!connected)
- return false;
+ IGNITE_ERROR_1(IgniteError::IGNITE_ERR_ILLEGAL_STATE, "Connection is not established");
msg.clear();
OdbcProtocolHeader hdr;
- int received = socket.Receive(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr));
-
- LOG_MSG("Received: %d\n", received);
+ size_t received = ReceiveAll(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr));
if (received != sizeof(hdr))
- return false;
+ IGNITE_ERROR_1(IgniteError::IGNITE_ERR_GENERIC, "Can not receive message header");
+
+ if (hdr.len < 0)
+ IGNITE_ERROR_1(IgniteError::IGNITE_ERR_GENERIC, "Message lenght is negative");
+
+ if (hdr.len == 0)
+ return;
+
+ msg.resize(hdr.len);
+
+ received = ReceiveAll(&msg[0], hdr.len);
- size_t remain = hdr.len;
- size_t receivedAtAll = 0;
+ if (received != hdr.len)
+ {
+ msg.resize(received);
+
+ IGNITE_ERROR_1(IgniteError::IGNITE_ERR_GENERIC, "Can not receive message body");
+ }
+ }
- msg.resize(remain);
+ size_t Connection::ReceiveAll(void* dst, size_t len)
+ {
+ size_t remain = len;
+ int8_t* buffer = reinterpret_cast<int8_t*>(dst);
while (remain)
{
- received = socket.Receive(&msg[receivedAtAll], remain);
- LOG_MSG("Received: %d\n", received);
- LOG_MSG("remain: %d\n", remain);
+ size_t received = len - remain;
- if (received <= 0)
- {
- msg.resize(receivedAtAll);
+ int res = socket.Receive(buffer + received, remain);
+ LOG_MSG("Receive res: %d\n", res);
+ LOG_MSG("remain: %d\n", remain);
- return false;
- }
+ if (res <= 0)
+ return received;
- remain -= static_cast<size_t>(received);
+ remain -= static_cast<size_t>(res);
}
- return true;
+ return len;
}
const std::string& Connection::GetCache() const
@@ -271,6 +294,54 @@ namespace ignite
return SQL_RESULT_ERROR;
}
+
+ SqlResult Connection::MakeRequestHandshake()
+ {
+ HandshakeRequest req(PROTOCOL_VERSION);
+ HandshakeResponse rsp;
+
+ try
+ {
+ SyncMessage(req, rsp);
+ }
+ catch (const IgniteError& err)
+ {
+ AddStatusRecord(SQL_STATE_HYT01_CONNECTIOIN_TIMEOUT, err.GetText());
+
+ return SQL_RESULT_ERROR;
+ }
+
+ if (rsp.GetStatus() != RESPONSE_STATUS_SUCCESS)
+ {
+ LOG_MSG("Error: %s\n", rsp.GetError().c_str());
+
+ AddStatusRecord(SQL_STATE_08001_CANNOT_CONNECT, rsp.GetError());
+
+ InternalRelease();
+
+ return SQL_RESULT_ERROR;
+ }
+
+ if (!rsp.IsAccepted())
+ {
+ LOG_MSG("Hanshake message has been rejected.\n");
+
+ std::stringstream constructor;
+
+ constructor << "Node rejected handshake message. "
+ << "Current node Apache Ignite version: " << rsp.CurrentVer() << ", "
+ << "node protocol version introduced in version: " << rsp.ProtoVerSince() << ", "
+ << "driver protocol version introduced in version: " << PROTOCOL_VERSION_SINCE << ".";
+
+ AddStatusRecord(SQL_STATE_08001_CANNOT_CONNECT, constructor.str());
+
+ InternalRelease();
+
+ return SQL_RESULT_ERROR;
+ }
+
+ return SQL_RESULT_SUCCESS;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/63ed2851/modules/platforms/cpp/odbc/src/query/column_metadata_query.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/query/column_metadata_query.cpp b/modules/platforms/cpp/odbc/src/query/column_metadata_query.cpp
index 69a08b1..1fa92b9 100644
--- a/modules/platforms/cpp/odbc/src/query/column_metadata_query.cpp
+++ b/modules/platforms/cpp/odbc/src/query/column_metadata_query.cpp
@@ -279,11 +279,13 @@ namespace ignite
QueryGetColumnsMetaRequest req(schema, table, column);
QueryGetColumnsMetaResponse rsp;
- bool success = connection.SyncMessage(req, rsp);
-
- if (!success)
+ try
+ {
+ connection.SyncMessage(req, rsp);
+ }
+ catch (const IgniteError& err)
{
- diag.AddStatusRecord(SQL_STATE_HYT01_CONNECTIOIN_TIMEOUT, "Connection terminated.");
+ diag.AddStatusRecord(SQL_STATE_HYT01_CONNECTIOIN_TIMEOUT, err.GetText());
return SQL_RESULT_ERROR;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/63ed2851/modules/platforms/cpp/odbc/src/query/data_query.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/query/data_query.cpp b/modules/platforms/cpp/odbc/src/query/data_query.cpp
index 6b4e741..8305ac7 100644
--- a/modules/platforms/cpp/odbc/src/query/data_query.cpp
+++ b/modules/platforms/cpp/odbc/src/query/data_query.cpp
@@ -173,11 +173,13 @@ namespace ignite
QueryExecuteRequest req(cacheName, sql, params);
QueryExecuteResponse rsp;
- bool success = connection.SyncMessage(req, rsp);
-
- if (!success)
+ try
+ {
+ connection.SyncMessage(req, rsp);
+ }
+ catch (const IgniteError& err)
{
- diag.AddStatusRecord(SQL_STATE_HYT01_CONNECTIOIN_TIMEOUT, "Connection terminated.");
+ diag.AddStatusRecord(SQL_STATE_HYT01_CONNECTIOIN_TIMEOUT, err.GetText());
return SQL_RESULT_ERROR;
}
@@ -215,11 +217,13 @@ namespace ignite
QueryCloseRequest req(cursor->GetQueryId());
QueryCloseResponse rsp;
- bool success = connection.SyncMessage(req, rsp);
-
- if (!success)
+ try
+ {
+ connection.SyncMessage(req, rsp);
+ }
+ catch (const IgniteError& err)
{
- diag.AddStatusRecord(SQL_STATE_HYT01_CONNECTIOIN_TIMEOUT, "Connection terminated.");
+ diag.AddStatusRecord(SQL_STATE_HYT01_CONNECTIOIN_TIMEOUT, err.GetText());
return SQL_RESULT_ERROR;
}
@@ -245,14 +249,13 @@ namespace ignite
QueryFetchRequest req(cursor->GetQueryId(), ResultPage::DEFAULT_SIZE);
QueryFetchResponse rsp(*resultPage);
- bool success = connection.SyncMessage(req, rsp);
-
- LOG_MSG("Query id: %lld\n", rsp.GetQueryId());
- LOG_MSG("Request status: %s\n", success ? "Success" : "Failure");
-
- if (!success)
+ try
+ {
+ connection.SyncMessage(req, rsp);
+ }
+ catch (const IgniteError& err)
{
- diag.AddStatusRecord(SQL_STATE_HYT01_CONNECTIOIN_TIMEOUT, "Connection terminated.");
+ diag.AddStatusRecord(SQL_STATE_HYT01_CONNECTIOIN_TIMEOUT, err.GetText());
return SQL_RESULT_ERROR;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/63ed2851/modules/platforms/cpp/odbc/src/query/table_metadata_query.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/query/table_metadata_query.cpp b/modules/platforms/cpp/odbc/src/query/table_metadata_query.cpp
index 42850cf..8c0cfd8 100644
--- a/modules/platforms/cpp/odbc/src/query/table_metadata_query.cpp
+++ b/modules/platforms/cpp/odbc/src/query/table_metadata_query.cpp
@@ -205,11 +205,13 @@ namespace ignite
QueryGetTablesMetaRequest req(catalog, schema, table, tableType);
QueryGetTablesMetaResponse rsp;
- bool success = connection.SyncMessage(req, rsp);
-
- if (!success)
+ try
+ {
+ connection.SyncMessage(req, rsp);
+ }
+ catch (const IgniteError& err)
{
- diag.AddStatusRecord(SQL_STATE_HYT01_CONNECTIOIN_TIMEOUT, "Connection terminated.");
+ diag.AddStatusRecord(SQL_STATE_HYT01_CONNECTIOIN_TIMEOUT, err.GetText());
return SQL_RESULT_ERROR;
}