You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/26 09:46:42 UTC
[03/28] ignite git commit: IGNITE-5163: Implemented infrastructure
for the new JDBC driver. This closes #1912.
IGNITE-5163: Implemented infrastructure for the new JDBC driver. This closes #1912.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6f1dc3ac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6f1dc3ac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6f1dc3ac
Branch: refs/heads/ignite-5075-cc
Commit: 6f1dc3ac65d403a634331515cd1f279010d0d092
Parents: c04b39a
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Tue May 23 15:55:48 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue May 23 15:55:48 2017 +0300
----------------------------------------------------------------------
.../jdbc/suite/IgniteJdbcDriverTestSuite.java | 3 +
.../jdbc/thin/JdbcConnectionSelfTest.java | 195 +++++++
.../org/apache/ignite/IgniteJdbcThinDriver.java | 312 +++++++++++
.../ignite/internal/GridKernalContext.java | 8 +-
.../ignite/internal/GridKernalContextImpl.java | 12 +-
.../apache/ignite/internal/IgniteKernal.java | 4 +-
.../internal/binary/BinaryWriterExImpl.java | 16 +-
.../internal/jdbc/thin/JdbcConnection.java | 529 +++++++++++++++++++
.../ignite/internal/jdbc/thin/JdbcTcpIo.java | 207 ++++++++
.../processors/odbc/OdbcNioListener.java | 242 ---------
.../internal/processors/odbc/OdbcProcessor.java | 199 -------
.../odbc/SqlListenerAbstractMessageParser.java | 265 ++++++++++
.../odbc/SqlListenerAbstractObjectReader.java | 137 +++++
.../odbc/SqlListenerAbstractObjectWriter.java | 111 ++++
.../processors/odbc/SqlListenerNioListener.java | 263 +++++++++
.../processors/odbc/SqlListenerProcessor.java | 191 +++++++
.../odbc/SqlListenerRequestHandlerImpl.java | 494 +++++++++++++++++
.../processors/odbc/jdbc/JdbcMessageParser.java | 50 ++
.../processors/odbc/jdbc/JdbcObjectReader.java | 33 ++
.../processors/odbc/jdbc/JdbcObjectWriter.java | 33 ++
.../processors/odbc/odbc/OdbcMessageParser.java | 249 +--------
.../processors/odbc/odbc/OdbcObjectReader.java | 33 ++
.../processors/odbc/odbc/OdbcObjectWriter.java | 32 ++
.../odbc/odbc/OdbcRequestHandler.java | 513 ------------------
.../odbc/OdbcProcessorValidationSelfTest.java | 182 -------
.../SqlListenerProcessorValidationSelfTest.java | 184 +++++++
.../ignite/testframework/GridTestUtils.java | 2 +-
.../ignite/testsuites/IgniteBasicTestSuite.java | 4 +-
.../cpp/odbc/include/ignite/odbc/message.h | 10 +
29 files changed, 3124 insertions(+), 1389 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index 75671de..e2f09ba 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -81,6 +81,9 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexTransactionalPartitionedSelfTest.class));
suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexTransactionalReplicatedSelfTest.class));
+ // New thin JDBC
+ suite.addTest(new TestSuite(org.apache.ignite.jdbc.thin.JdbcConnectionSelfTest.class));
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcConnectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcConnectionSelfTest.java
new file mode 100644
index 0000000..d7e2bef
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcConnectionSelfTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.concurrent.Callable;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.OdbcConfiguration;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Connection test.
+ */
+public class JdbcConnectionSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** URL prefix. */
+ private static final String URL_PREFIX = "jdbc:ignite:thin://";
+
+ /** Host. */
+ private static final String HOST = "127.0.0.1";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCacheConfiguration(cacheConfiguration(DEFAULT_CACHE_NAME));
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ cfg.setMarshaller(new BinaryMarshaller());
+
+ cfg.setOdbcConfiguration(new OdbcConfiguration());
+
+ return cfg;
+ }
+
+ /**
+ * @param name Cache name.
+ * @return Cache configuration.
+ * @throws Exception In case of error.
+ */
+ private CacheConfiguration cacheConfiguration(@NotNull String name) throws Exception {
+ CacheConfiguration cfg = defaultCacheConfiguration();
+
+ cfg.setName(name);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ try {
+ Driver drv = DriverManager.getDriver("jdbc:ignite://");
+
+ if (drv != null)
+ DriverManager.deregisterDriver(drv);
+ } catch (SQLException ignored) {
+ // No-op.
+ }
+
+ startGridsMultiThreaded(2);
+
+ Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDefaults() throws Exception {
+ String url = URL_PREFIX + HOST;
+
+ assert DriverManager.getConnection(url) != null;
+ assert DriverManager.getConnection(url + "/") != null;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvalidUrls() throws Exception {
+ GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ DriverManager.getConnection(URL_PREFIX + "127.0.0.1:80");
+
+ return null;
+ }
+ }, SQLException.class, "Failed to connect to Ignite cluster [host=127.0.0.1, port=80]");
+
+ GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ DriverManager.getConnection("q");
+
+ return null;
+ }
+ }, SQLException.class, "URL is invalid");
+
+ GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ DriverManager.getConnection(URL_PREFIX + "127.0.0.1:-1");
+
+ return null;
+ }
+ }, SQLException.class, "Invalid port:");
+
+ GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ DriverManager.getConnection(URL_PREFIX + "127.0.0.1:0");
+
+ return null;
+ }
+ }, SQLException.class, "Invalid port:");
+
+ GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ DriverManager.getConnection(URL_PREFIX + "127.0.0.1:100000");
+
+ return null;
+ }
+ }, SQLException.class, "Invalid port:");
+
+ GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ DriverManager.getConnection(URL_PREFIX + " :10000");
+
+ return null;
+ }
+ }, SQLException.class, "Host name is empty");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClose() throws Exception {
+ String url = URL_PREFIX + HOST;
+
+ final Connection conn = DriverManager.getConnection(url);
+
+ assert conn != null;
+ assert !conn.isClosed();
+
+ conn.close();
+
+ assert conn.isClosed();
+
+ assert !conn.isValid(2): "Connection must be closed";
+
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ conn.isValid(-2);
+
+ return null;
+ }
+ },
+ SQLException.class,
+ "Invalid timeout"
+ );
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java
new file mode 100644
index 0000000..19e1edd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.logging.Logger;
+import org.apache.ignite.cache.affinity.AffinityKey;
+import org.apache.ignite.configuration.OdbcConfiguration;
+import org.apache.ignite.internal.IgniteVersionUtils;
+import org.apache.ignite.internal.jdbc.JdbcDriverPropertyInfo;
+import org.apache.ignite.internal.jdbc.thin.JdbcConnection;
+
+/**
+ * JDBC driver thin implementation for In-Memory Data Grid.
+ * <p>
+ * Driver allows to get distributed data from Ignite cache using standard
+ * SQL queries and standard JDBC API. It will automatically get only fields that
+ * you actually need from objects stored in cache.
+ * <h1 class="header">Limitations</h1>
+ * Data in Ignite cache is usually distributed across several nodes,
+ * so some queries may not work as expected since the query will be sent to each
+ * individual node and results will then be collected and returned as JDBC result set.
+ * Keep in mind following limitations (not applied if data is queried from one node only,
+ * or data is fully co-located or fully replicated on multiple nodes):
+ * <ul>
+ * <li>
+ * Joins will work correctly only if joined objects are stored in
+ * collocated mode. Refer to
+ * {@link AffinityKey}
+ * javadoc for more details.
+ * </li>
+ * <li>
+ * Note that if you are connected to local or replicated cache, all data will
+ * be queried only on one node, not depending on what caches participate in
+ * the query (some data from partitioned cache can be lost). And visa versa,
+ * if you are connected to partitioned cache, data from replicated caches
+ * will be duplicated.
+ * </li>
+ * </ul>
+ * <h1 class="header">SQL Notice</h1>
+ * Driver allows to query data from several caches. Cache that driver is connected to is
+ * treated as default schema in this case. Other caches can be referenced by their names.\
+ *
+ * <h1 class="header">Dependencies</h1>
+ * JDBC driver is located in main Ignite JAR in {@code IGNITE_HOME/libs} folder.
+ * <h1 class="header">Configuration</h1>
+ *
+ * <p>
+ * JDBC connection URL has the following pattern:
+ * {@code jdbc:ignite://<hostname>:<port>/}<br>
+ * Note the following:
+ * <ul>
+ * <li>Hostname is required.</li>
+ * <li>If port is not defined, {@code 10800} is used (default for Ignite thin client).</li>
+ * </ul>
+ * Other properties can be defined in {@link Properties} object passed to
+ * {@link DriverManager#getConnection(String, Properties)} method:
+ * <table class="doctable">
+ * <tr>
+ * <th>Name</th>
+ * <th>Description</th>
+ * <th>Default</th>
+ * <th>Optional</th>
+ * </tr>
+ * <tr>
+ * <td><b>ignite.jdbc.distributedJoins</b></td>
+ * <td>Flag to enable distributed joins.</td>
+ * <td>{@code false} (distributed joins are disabled)</td>
+ * <td>Yes</td>
+ * </tr>
+ * <tr>
+ * <td><b>ignite.jdbc.enforceJoinOrder</b></td>
+ * <td>Flag to enforce join order of tables in the query.</td>
+ * <td>{@code false} (enforcing join order is disabled)</td>
+ * <td>Yes</td>
+ * </tr>
+ * </table>
+ * <h1 class="header">Example</h1>
+ * <pre name="code" class="java">
+ * // Register JDBC driver.
+ * Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
+ *
+ * // Open JDBC connection.
+ * Connection conn = DriverManager.getConnection("jdbc:ignite:thin//localhost:10800");
+ *
+ * // Query persons' names
+ * ResultSet rs = conn.createStatement().executeQuery("select name from Person");
+ *
+ * while (rs.next()) {
+ * String name = rs.getString(1);
+ *
+ * ...
+ * }
+ *
+ * // Query persons with specific age
+ * PreparedStatement stmt = conn.prepareStatement("select name, age from Person where age = ?");
+ *
+ * stmt.setInt(1, 30);
+ *
+ * ResultSet rs = stmt.executeQuery();
+ *
+ * while (rs.next()) {
+ * String name = rs.getString("name");
+ * int age = rs.getInt("age");
+ *
+ * ...
+ * }
+ * </pre>
+ */
+@SuppressWarnings("JavadocReference")
+public class IgniteJdbcThinDriver implements Driver {
+ /** Prefix for property names. */
+ private static final String PROP_PREFIX = "ignite.jdbc";
+
+ /** Distributed joins parameter name. */
+ private static final String PARAM_DISTRIBUTED_JOINS = "distributedJoins";
+
+ /** Enforce join order parameter name. */
+ private static final String ENFORCE_JOIN_ORDER = "enforceJoinOrder";
+
+ /** Hostname property name. */
+ public static final String PROP_HOST = PROP_PREFIX + "host";
+
+ /** Port number property name. */
+ public static final String PROP_PORT = PROP_PREFIX + "port";
+
+ /** Distributed joins property name. */
+ public static final String PROP_DISTRIBUTED_JOINS = PROP_PREFIX + PARAM_DISTRIBUTED_JOINS;
+
+ /** Transactions allowed property name. */
+ public static final String PROP_ENFORCE_JOIN_ORDER = PROP_PREFIX + ENFORCE_JOIN_ORDER;
+
+ /** URL prefix. */
+ public static final String URL_PREFIX = "jdbc:ignite:thin://";
+
+ /** Default port. */
+ public static final int DFLT_PORT = OdbcConfiguration.DFLT_TCP_PORT_FROM;
+
+ /** Major version. */
+ private static final int MAJOR_VER = IgniteVersionUtils.VER.major();
+
+ /** Minor version. */
+ private static final int MINOR_VER = IgniteVersionUtils.VER.minor();
+
+ /*
+ * Register driver.
+ */
+ static {
+ try {
+ DriverManager.registerDriver(new IgniteJdbcThinDriver());
+ }
+ catch (SQLException e) {
+ throw new RuntimeException("Failed to register Ignite JDBC driver.", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Connection connect(String url, Properties props) throws SQLException {
+ if (!parseUrl(url, props))
+ throw new SQLException("URL is invalid: " + url);
+
+ return new JdbcConnection(url, props);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean acceptsURL(String url) throws SQLException {
+ return url.startsWith(URL_PREFIX);
+ }
+
+ /** {@inheritDoc} */
+ @Override public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
+ if (!parseUrl(url, info))
+ throw new SQLException("URL is invalid: " + url);
+
+ List<DriverPropertyInfo> props = Arrays.<DriverPropertyInfo>asList(
+ new JdbcDriverPropertyInfo("Hostname", info.getProperty(PROP_HOST), ""),
+ new JdbcDriverPropertyInfo("Port number", info.getProperty(PROP_PORT), ""),
+ new JdbcDriverPropertyInfo("Distributed Joins", info.getProperty(PROP_DISTRIBUTED_JOINS), ""),
+ new JdbcDriverPropertyInfo("Enforce Join Order", info.getProperty(PROP_ENFORCE_JOIN_ORDER), "")
+ );
+
+ return props.toArray(new DriverPropertyInfo[0]);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMajorVersion() {
+ return MAJOR_VER;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMinorVersion() {
+ return MINOR_VER;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean jdbcCompliant() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ throw new SQLFeatureNotSupportedException("java.util.logging is not used.");
+ }
+
+ /**
+ * Validates and parses connection URL.
+ *
+ * @param props Properties.
+ * @param url URL.
+ * @return Whether URL is valid.
+ */
+ private boolean parseUrl(String url, Properties props) {
+ if (url == null)
+ return false;
+
+ if (url.startsWith(URL_PREFIX) && url.length() > URL_PREFIX.length())
+ return parseJdbcUrl(url, props);
+
+ return false;
+ }
+
+ /**
+ * @param url Url.
+ * @param props Properties.
+ * @return Whether URL is valid.
+ */
+ private boolean parseJdbcUrl(String url, Properties props) {
+ url = url.substring(URL_PREFIX.length());
+
+ String[] parts = url.split("\\?");
+
+ if (parts.length > 2)
+ return false;
+
+ if (parts.length == 2)
+ if (!parseParameters(parts[1], "&", props))
+ return false;
+
+ parts = parts[0].split("/");
+
+ assert parts.length > 0;
+
+ if (parts.length > 1)
+ return false;
+
+ url = parts[0];
+
+ parts = url.split(":");
+
+ assert parts.length > 0;
+
+ if (parts.length > 2)
+ return false;
+
+ props.setProperty(PROP_HOST, parts[0]);
+
+ try {
+ props.setProperty(PROP_PORT, String.valueOf(parts.length == 2 ? Integer.valueOf(parts[1]) : DFLT_PORT));
+ }
+ catch (NumberFormatException ignored) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Validates and parses URL parameters.
+ *
+ * @param val Parameters string.
+ * @param delim Delimiter.
+ * @param props Properties.
+ * @return Whether URL parameters string is valid.
+ */
+ private boolean parseParameters(String val, String delim, Properties props) {
+ String[] params = val.split(delim);
+
+ for (String param : params) {
+ String[] pair = param.split("=");
+
+ if (pair.length != 2 || pair[0].isEmpty() || pair[1].isEmpty())
+ return false;
+
+ props.setProperty(PROP_PREFIX + pair[0], pair[1]);
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 010bd21..7a01200 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -48,7 +48,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
-import org.apache.ignite.internal.processors.odbc.OdbcProcessor;
+import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
@@ -330,11 +330,11 @@ public interface GridKernalContext extends Iterable<GridComponent> {
public GridQueryProcessor query();
/**
- * Gets ODBC processor.
+ * Gets SQL listener processor.
*
- * @return ODBC processor.
+ * @return SQL listener processor.
*/
- public OdbcProcessor odbc();
+ public SqlListenerProcessor sqlListener();
/**
* @return Plugin processor.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index bbc9846..262c5eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -64,7 +64,7 @@ import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
-import org.apache.ignite.internal.processors.odbc.OdbcProcessor;
+import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
import org.apache.ignite.internal.processors.platform.plugin.PlatformPluginProcessor;
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
@@ -160,7 +160,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** */
@GridToStringInclude
- private OdbcProcessor odbcProc;
+ private SqlListenerProcessor sqlListenerProc;
/** */
@GridToStringInclude
@@ -567,8 +567,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
pluginProc = (IgnitePluginProcessor)comp;
else if (comp instanceof GridQueryProcessor)
qryProc = (GridQueryProcessor)comp;
- else if (comp instanceof OdbcProcessor)
- odbcProc = (OdbcProcessor)comp;
+ else if (comp instanceof SqlListenerProcessor)
+ sqlListenerProc = (SqlListenerProcessor)comp;
else if (comp instanceof DataStructuresProcessor)
dataStructuresProc = (DataStructuresProcessor)comp;
else if (comp instanceof ClusterProcessor)
@@ -824,8 +824,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
- @Override public OdbcProcessor odbc() {
- return odbcProc;
+ @Override public SqlListenerProcessor sqlListener() {
+ return sqlListenerProc;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 40476a7..c36fd7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -125,7 +125,7 @@ import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.nodevalidation.OsDiscoveryNodeValidationProcessor;
-import org.apache.ignite.internal.processors.odbc.OdbcProcessor;
+import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor;
import org.apache.ignite.internal.processors.platform.PlatformNoopProcessor;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
import org.apache.ignite.internal.processors.platform.plugin.PlatformPluginProcessor;
@@ -924,7 +924,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));
startProcessor(new GridCacheProcessor(ctx));startProcessor(new GridClusterStateProcessor(ctx));
startProcessor(new GridQueryProcessor(ctx));
- startProcessor(new OdbcProcessor(ctx));
+ startProcessor(new SqlListenerProcessor(ctx));
startProcessor(new GridServiceProcessor(ctx));
startProcessor(new GridTaskSessionProcessor(ctx));
startProcessor(new GridJobProcessor(ctx));
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
index 7b5e9d3..7efe4b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
@@ -938,7 +938,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
/**
* @param val Value.
*/
- void writeByteFieldPrimitive(byte val) {
+ public void writeByteFieldPrimitive(byte val) {
out.unsafeEnsure(1 + 1);
out.unsafeWriteByte(GridBinaryMarshaller.BYTE);
@@ -965,7 +965,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
/**
* @param val Value.
*/
- void writeShortFieldPrimitive(short val) {
+ public void writeShortFieldPrimitive(short val) {
out.unsafeEnsure(1 + 2);
out.unsafeWriteByte(GridBinaryMarshaller.SHORT);
@@ -985,7 +985,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
/**
* @param val Value.
*/
- void writeIntFieldPrimitive(int val) {
+ public void writeIntFieldPrimitive(int val) {
out.unsafeEnsure(1 + 4);
out.unsafeWriteByte(GridBinaryMarshaller.INT);
@@ -1005,7 +1005,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
/**
* @param val Value.
*/
- void writeLongFieldPrimitive(long val) {
+ public void writeLongFieldPrimitive(long val) {
out.unsafeEnsure(1 + 8);
out.unsafeWriteByte(GridBinaryMarshaller.LONG);
@@ -1025,7 +1025,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
/**
* @param val Value.
*/
- void writeFloatFieldPrimitive(float val) {
+ public void writeFloatFieldPrimitive(float val) {
out.unsafeEnsure(1 + 4);
out.unsafeWriteByte(GridBinaryMarshaller.FLOAT);
@@ -1045,7 +1045,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
/**
* @param val Value.
*/
- void writeDoubleFieldPrimitive(double val) {
+ public void writeDoubleFieldPrimitive(double val) {
out.unsafeEnsure(1 + 8);
out.unsafeWriteByte(GridBinaryMarshaller.DOUBLE);
@@ -1065,7 +1065,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
/**
* @param val Value.
*/
- void writeCharFieldPrimitive(char val) {
+ public void writeCharFieldPrimitive(char val) {
out.unsafeEnsure(1 + 2);
out.unsafeWriteByte(GridBinaryMarshaller.CHAR);
@@ -1085,7 +1085,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
/**
* @param val Value.
*/
- void writeBooleanFieldPrimitive(boolean val) {
+ public void writeBooleanFieldPrimitive(boolean val) {
out.unsafeEnsure(1 + 1);
out.unsafeWriteByte(GridBinaryMarshaller.BOOLEAN);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcConnection.java
new file mode 100644
index 0000000..25d62b4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcConnection.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc.thin;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.logging.Logger;
+
+import static java.sql.ResultSet.CONCUR_READ_ONLY;
+import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT;
+import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
+import static org.apache.ignite.IgniteJdbcThinDriver.PROP_DISTRIBUTED_JOINS;
+import static org.apache.ignite.IgniteJdbcThinDriver.PROP_ENFORCE_JOIN_ORDER;
+import static org.apache.ignite.IgniteJdbcThinDriver.PROP_HOST;
+import static org.apache.ignite.IgniteJdbcThinDriver.PROP_PORT;
+
+/**
+ * JDBC connection implementation.
+ *
+ * See documentation of {@link org.apache.ignite.IgniteJdbcThinDriver} for details.
+ */
+public class JdbcConnection implements Connection {
+ /** Logger. */
+ private static final Logger LOG = Logger.getLogger(JdbcConnection.class.getName());
+
+ /** Cache name. */
+ private String schemaName;
+
+ /** Closed flag. */
+ private boolean closed;
+
+ /** Current transaction isolation. */
+ private int txIsolation;
+
+ /** Auto commit flag. */
+ private boolean autoCommit;
+
+ /** Current transaction holdability. */
+ private int holdability;
+
+ /** Timeout. */
+ private int timeout;
+
+ /** Ignite endpoint. */
+ private JdbcTcpIo cliIo;
+
+ /**
+ * Creates new connection.
+ *
+ * @param url Connection URL.
+ * @param props Additional properties.
+ * @throws SQLException In case Ignite client failed to start.
+ */
+ public JdbcConnection(String url, Properties props) throws SQLException {
+ assert url != null;
+ assert props != null;
+
+ holdability = HOLD_CURSORS_OVER_COMMIT;
+ autoCommit = true;
+ txIsolation = Connection.TRANSACTION_NONE;
+
+ boolean distributedJoins = Boolean.parseBoolean(props.getProperty(PROP_DISTRIBUTED_JOINS, "false"));
+ boolean enforceJoinOrder = Boolean.parseBoolean(props.getProperty(PROP_ENFORCE_JOIN_ORDER, "false"));
+
+ String host = props.getProperty(PROP_HOST);
+ String portStr = props.getProperty(PROP_PORT);
+
+ try {
+ int port = Integer.parseInt(portStr);
+
+ if (port <= 0 || port > 0xFFFF)
+ throw new SQLException("Invalid port: " + portStr);
+ }
+ catch (NumberFormatException e) {
+ throw new SQLException("Invalid port: " + portStr, e);
+ }
+
+ if (host == null || host.trim().isEmpty())
+ throw new SQLException("Host name is empty.");
+
+ String endpoint = host.trim() + ":" + portStr.trim();
+
+ try {
+ cliIo = new JdbcTcpIo(endpoint, distributedJoins, enforceJoinOrder);
+
+ cliIo.start();
+ }
+ catch (Exception e) {
+ cliIo.close();
+
+ throw new SQLException("Failed to connect to Ignite cluster [host=" + host + ", port=" + portStr + ']', e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Statement createStatement() throws SQLException {
+ return createStatement(TYPE_FORWARD_ONLY, CONCUR_READ_ONLY, HOLD_CURSORS_OVER_COMMIT);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Statement createStatement(int resSetType, int resSetConcurrency) throws SQLException {
+ return createStatement(resSetType, resSetConcurrency, HOLD_CURSORS_OVER_COMMIT);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Statement createStatement(int resSetType, int resSetConcurrency,
+ int resSetHoldability) throws SQLException {
+ ensureNotClosed();
+
+ checkCursorOptions(resSetType, resSetConcurrency, resSetHoldability);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PreparedStatement prepareStatement(String sql) throws SQLException {
+ return prepareStatement(sql, TYPE_FORWARD_ONLY, CONCUR_READ_ONLY, HOLD_CURSORS_OVER_COMMIT);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PreparedStatement prepareStatement(String sql, int resSetType,
+ int resSetConcurrency) throws SQLException {
+ return prepareStatement(sql, resSetType, resSetConcurrency, HOLD_CURSORS_OVER_COMMIT);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PreparedStatement prepareStatement(String sql, int resSetType, int resSetConcurrency,
+ int resSetHoldability) throws SQLException {
+ ensureNotClosed();
+
+ checkCursorOptions(resSetType, resSetConcurrency, resSetHoldability);
+
+ return null;
+ }
+
+ /**
+ * @param resSetType Cursor option.
+ * @param resSetConcurrency Cursor option.
+ * @param resSetHoldability Cursor option.
+ * @throws SQLException If options unsupported.
+ */
+ private void checkCursorOptions(int resSetType, int resSetConcurrency,
+ int resSetHoldability) throws SQLException {
+ if (resSetType != TYPE_FORWARD_ONLY)
+ throw new SQLFeatureNotSupportedException("Invalid result set type (only forward is supported.)");
+
+ if (resSetConcurrency != CONCUR_READ_ONLY)
+ throw new SQLFeatureNotSupportedException("Invalid concurrency (updates are not supported).");
+
+ if (resSetHoldability != HOLD_CURSORS_OVER_COMMIT)
+ LOG.warning("Transactions are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public CallableStatement prepareCall(String sql) throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("Callable functions are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public CallableStatement prepareCall(String sql, int resSetType, int resSetConcurrency)
+ throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("Callable functions are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public String nativeSQL(String sql) throws SQLException {
+ ensureNotClosed();
+
+ return sql;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setAutoCommit(boolean autoCommit) throws SQLException {
+ ensureNotClosed();
+
+ this.autoCommit = autoCommit;
+
+ if (!autoCommit)
+ LOG.warning("Transactions are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean getAutoCommit() throws SQLException {
+ ensureNotClosed();
+
+ if (!autoCommit)
+ LOG.warning("Transactions are not supported.");
+
+ return autoCommit;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void commit() throws SQLException {
+ ensureNotClosed();
+
+ LOG.warning("Transactions are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rollback() throws SQLException {
+ ensureNotClosed();
+
+ LOG.warning("Transactions are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws SQLException {
+ if (closed)
+ return;
+
+ closed = true;
+
+ cliIo.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isClosed() throws SQLException {
+ return closed;
+ }
+
+ /** {@inheritDoc} */
+ @Override public DatabaseMetaData getMetaData() throws SQLException {
+ ensureNotClosed();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setReadOnly(boolean readOnly) throws SQLException {
+ ensureNotClosed();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isReadOnly() throws SQLException {
+ ensureNotClosed();
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setCatalog(String catalog) throws SQLException {
+ ensureNotClosed();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getCatalog() throws SQLException {
+ ensureNotClosed();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setTransactionIsolation(int level) throws SQLException {
+ ensureNotClosed();
+
+ LOG.warning("Transactions are not supported.");
+
+ txIsolation = level;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getTransactionIsolation() throws SQLException {
+ ensureNotClosed();
+
+ LOG.warning("Transactions are not supported.");
+
+ return txIsolation;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SQLWarning getWarnings() throws SQLException {
+ ensureNotClosed();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clearWarnings() throws SQLException {
+ ensureNotClosed();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Class<?>> getTypeMap() throws SQLException {
+ throw new SQLFeatureNotSupportedException("Types mapping is not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("Types mapping is not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setHoldability(int holdability) throws SQLException {
+ ensureNotClosed();
+
+ if (holdability != HOLD_CURSORS_OVER_COMMIT)
+ LOG.warning("Transactions are not supported.");
+
+ this.holdability = holdability;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getHoldability() throws SQLException {
+ ensureNotClosed();
+
+ return holdability;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Savepoint setSavepoint() throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("Savepoints are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Savepoint setSavepoint(String name) throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("Savepoints are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rollback(Savepoint savepoint) throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("Savepoints are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("Savepoints are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public CallableStatement prepareCall(String sql, int resSetType, int resSetConcurrency,
+ int resSetHoldability) throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("Callable functions are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public PreparedStatement prepareStatement(String sql, int[] colIndexes) throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public PreparedStatement prepareStatement(String sql, String[] colNames) throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Clob createClob() throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("SQL-specific types are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Blob createBlob() throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("SQL-specific types are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public NClob createNClob() throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("SQL-specific types are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public SQLXML createSQLXML() throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("SQL-specific types are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isValid(int timeout) throws SQLException {
+ if (timeout < 0)
+ throw new SQLException("Invalid timeout: " + timeout);
+
+ return !closed;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setClientInfo(String name, String val) throws SQLClientInfoException {
+ throw new UnsupportedOperationException("Client info is not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setClientInfo(Properties props) throws SQLClientInfoException {
+ throw new UnsupportedOperationException("Client info is not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getClientInfo(String name) throws SQLException {
+ ensureNotClosed();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Properties getClientInfo() throws SQLException {
+ ensureNotClosed();
+
+ return new Properties();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("SQL-specific types are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Struct createStruct(String typeName, Object[] attrs) throws SQLException {
+ ensureNotClosed();
+
+ throw new SQLFeatureNotSupportedException("SQL-specific types are not supported.");
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <T> T unwrap(Class<T> iface) throws SQLException {
+ if (!isWrapperFor(iface))
+ throw new SQLException("Connection is not a wrapper for " + iface.getName());
+
+ return (T)this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return iface != null && iface == Connection.class;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setSchema(String schema) throws SQLException {
+ schemaName = schema;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getSchema() throws SQLException {
+ return schemaName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void abort(Executor executor) throws SQLException {
+ close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setNetworkTimeout(Executor executor, int ms) throws SQLException {
+ if (ms < 0)
+ throw new IllegalArgumentException("Timeout is below zero: " + ms);
+
+ timeout = ms;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getNetworkTimeout() throws SQLException {
+ return timeout;
+ }
+
+ /**
+ * Ensures that connection is not closed.
+ *
+ * @throws SQLException If connection is closed.
+ */
+ private void ensureNotClosed() throws SQLException {
+ if (closed)
+ throw new SQLException("Connection is closed.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcTcpIo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcTcpIo.java
new file mode 100644
index 0000000..4946b41
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcTcpIo.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc.thin;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.logging.Logger;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.processors.odbc.SqlListenerProtocolVersion;
+import org.apache.ignite.internal.processors.odbc.SqlListenerRequest;
+import org.apache.ignite.internal.processors.odbc.SqlListenerNioListener;
+import org.apache.ignite.internal.util.ipc.IpcEndpoint;
+import org.apache.ignite.internal.util.ipc.IpcEndpointFactory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * JDBC IO layer implementation based on blocking IPC streams.
+ */
+public class JdbcTcpIo {
+ /** Current version. */
+ private static final SqlListenerProtocolVersion CURRENT_VER = SqlListenerProtocolVersion.create(2, 1, 0);
+
+ /** Initial output stream capacity. */
+ private static final int HANDSHAKE_MSG_SIZE = 10;
+
+ /** Logger. */
+ private static final Logger log = Logger.getLogger(JdbcTcpIo.class.getName());
+
+ /** Server endpoint address. */
+ private final String endpointAddr;
+
+ /** Endpoint. */
+ private IpcEndpoint endpoint;
+
+ /** Output stream. */
+ private BufferedOutputStream out;
+
+ /** Input stream. */
+ private BufferedInputStream in;
+
+ /** Distributed joins. */
+ private boolean distributedJoins;
+
+ /** Enforce join order. */
+ private boolean enforceJoinOrder;
+
+ /** Closed flag. */
+ private boolean closed;
+
+ /**
+ * @param endpointAddr Endpoint.
+ * @param distributedJoins Distributed joins flag.
+ * @param enforceJoinOrder Enforce join order flag.
+ */
+ JdbcTcpIo(String endpointAddr, boolean distributedJoins, boolean enforceJoinOrder) {
+ assert endpointAddr != null;
+
+ this.endpointAddr = endpointAddr;
+ this.distributedJoins = distributedJoins;
+ this.enforceJoinOrder= enforceJoinOrder;
+ }
+
+ /**
+ * @throws IgniteCheckedException On error.
+ * @throws IOException On IO error in handshake.
+ */
+ public void start() throws IgniteCheckedException, IOException {
+ endpoint = IpcEndpointFactory.connectEndpoint(endpointAddr, null);
+
+ out = new BufferedOutputStream(endpoint.outputStream());
+ in = new BufferedInputStream(endpoint.inputStream());
+
+ handshake();
+ }
+
+ /**
+ * @throws IOException On error.
+ * @throws IgniteCheckedException On error.
+ */
+ public void handshake() throws IOException, IgniteCheckedException {
+ BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(HANDSHAKE_MSG_SIZE),
+ null, null);
+
+ writer.writeByte((byte)SqlListenerRequest.HANDSHAKE);
+
+ writer.writeShort(CURRENT_VER.major());
+ writer.writeShort(CURRENT_VER.minor());
+ writer.writeShort(CURRENT_VER.maintenance());
+
+ writer.writeByte(SqlListenerNioListener.JDBC_CLIENT);
+
+ writer.writeBoolean(distributedJoins);
+ writer.writeBoolean(enforceJoinOrder);
+
+ send(writer.array());
+
+ BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new BinaryHeapInputStream(read()),
+ null, null, false);
+
+ boolean accepted = reader.readBoolean();
+
+ if (accepted)
+ return;
+
+ short maj = reader.readShort();
+ short min = reader.readShort();
+ short maintenance = reader.readShort();
+
+ String err = reader.readString();
+
+ SqlListenerProtocolVersion ver = SqlListenerProtocolVersion.create(maj, min, maintenance);
+
+ throw new IgniteCheckedException("Handshake failed [driverProtocolVer=" + CURRENT_VER +
+ ", remoteNodeProtocolVer=" + ver + ", err=" + err + ']');
+ }
+
+ /**
+ * @param req ODBC request.
+ * @throws IOException On error.
+ */
+ private void send(byte[] req) throws IOException {
+ int size = req.length;
+
+ out.write(size & 0xFF);
+ out.write((size >> 8) & 0xFF);
+ out.write((size >> 16) & 0xFF);
+ out.write((size >> 24) & 0xFF);
+
+ out.write(req);
+
+ out.flush();
+ }
+
+ /**
+ * @return Bytes of a response from server.
+ * @throws IOException On error.
+ * @throws IgniteCheckedException On error.
+ */
+ private byte[] read() throws IOException, IgniteCheckedException {
+ byte[] sizeBytes = read(4);
+
+ int msgSize = (((0xFF & sizeBytes[3]) << 24) | ((0xFF & sizeBytes[2]) << 16)
+ | ((0xFF & sizeBytes[1]) << 8) + (0xFF & sizeBytes[0]));
+
+ return read(msgSize);
+ }
+
+ /**
+ * @param size Count of bytes to read from stream.
+ * @return Read bytes.
+ * @throws IOException On error.
+ * @throws IgniteCheckedException On error.
+ */
+ private byte [] read(int size) throws IOException, IgniteCheckedException {
+ int off = 0;
+
+ byte[] data = new byte[size];
+
+ while (off != size) {
+ int res = in.read(data, off, size - off);
+
+ if (res == -1)
+ throw new IgniteCheckedException("Failed to read incoming message (not enough data).");
+
+ off += res;
+ }
+
+ return data;
+ }
+
+ /**
+ * Close the client IO.
+ */
+ public void close() {
+ if (closed)
+ return;
+
+ // Clean up resources.
+ U.closeQuiet(out);
+ U.closeQuiet(in);
+
+ if (endpoint != null)
+ endpoint.close();
+
+ closed = true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
deleted file mode 100644
index cdb3de3..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.odbc;
-
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.binary.BinaryReaderExImpl;
-import org.apache.ignite.internal.binary.BinaryWriterExImpl;
-import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
-import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
-import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.processors.odbc.odbc.OdbcMessageParser;
-import org.apache.ignite.internal.processors.odbc.odbc.OdbcRequestHandler;
-import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * ODBC message listener.
- */
-public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
- /** Current version. */
- private static final SqlListenerProtocolVersion CURRENT_VER = SqlListenerProtocolVersion.create(2, 1, 0);
-
- /** Supported versions. */
- private static final Set<SqlListenerProtocolVersion> SUPPORTED_VERS = new HashSet<>();
-
- /** Connection-related metadata key. */
- private static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
-
- /** Request ID generator. */
- private static final AtomicLong REQ_ID_GEN = new AtomicLong();
-
- /** Busy lock. */
- private final GridSpinBusyLock busyLock;
-
- /** Kernal context. */
- private final GridKernalContext ctx;
-
- /** Maximum allowed cursors. */
- private final int maxCursors;
-
- /** Logger. */
- private final IgniteLogger log;
-
- static {
- SUPPORTED_VERS.add(CURRENT_VER);
- }
-
- /**
- * Constructor.
- *
- * @param ctx Context.
- * @param busyLock Shutdown busy lock.
- * @param maxCursors Maximum allowed cursors.
- */
- public OdbcNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors) {
- this.ctx = ctx;
- this.busyLock = busyLock;
- this.maxCursors = maxCursors;
-
- log = ctx.log(getClass());
- }
-
- /** {@inheritDoc} */
- @Override public void onConnected(GridNioSession ses) {
- if (log.isDebugEnabled())
- log.debug("SQL client connected: " + ses.remoteAddress());
- }
-
- /** {@inheritDoc} */
- @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
- if (log.isDebugEnabled()) {
- if (e == null)
- log.debug("SQL client disconnected: " + ses.remoteAddress());
- else
- log.debug("SQL client disconnected due to an error [addr=" + ses.remoteAddress() + ", err=" + e + ']');
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onMessage(GridNioSession ses, byte[] msg) {
- assert msg != null;
-
- SqlListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY);
-
- if (connCtx == null) {
- onHandshake(ses, msg);
-
- return;
- }
-
- SqlListenerMessageParser parser = connCtx.parser();
-
- SqlListenerRequest req;
-
- try {
- req = parser.decode(msg);
- }
- catch (Exception e) {
- log.error("Failed to parse SQL client request [err=" + e + ']');
-
- ses.close();
-
- return;
- }
-
- assert req != null;
-
- req.requestId(REQ_ID_GEN.incrementAndGet());
-
- try {
- long startTime = 0;
-
- if (log.isDebugEnabled()) {
- startTime = System.nanoTime();
-
- log.debug("SQL client request received [reqId=" + req.requestId() + ", addr=" + ses.remoteAddress() +
- ", req=" + req + ']');
- }
-
- SqlListenerRequestHandler handler = connCtx.handler();
-
- SqlListenerResponse resp = handler.handle(req);
-
- if (log.isDebugEnabled()) {
- long dur = (System.nanoTime() - startTime) / 1000;
-
- log.debug("SQL client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur +
- ", resp=" + resp.status() + ']');
- }
-
- byte[] outMsg = parser.encode(resp);
-
- ses.send(outMsg);
- }
- catch (Exception e) {
- log.error("Failed to process SQL client request [reqId=" + req.requestId() + ", err=" + e + ']');
-
- ses.send(parser.encode(new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString())));
- }
- }
-
- /**
- * Perform handshake.
- *
- * @param ses Session.
- * @param msg Message bytes.
- */
- private void onHandshake(GridNioSession ses, byte[] msg) {
- BinaryInputStream stream = new BinaryHeapInputStream(msg);
-
- BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null, true);
-
- byte cmd = reader.readByte();
-
- if (cmd != SqlListenerRequest.HANDSHAKE) {
- log.error("Unexpected SQL client request (will close session): " + ses.remoteAddress());
-
- ses.close();
-
- return;
- }
-
- short verMajor = reader.readShort();
- short verMinor = reader.readShort();
- short verMaintenance = reader.readShort();
-
- SqlListenerProtocolVersion ver = SqlListenerProtocolVersion.create(verMajor, verMinor, verMaintenance);
-
- String errMsg = null;
-
- if (SUPPORTED_VERS.contains(ver)) {
- // Prepare context.
- SqlListenerConnectionContext connCtx = prepareContext(ver, reader);
-
- ses.addMeta(CONN_CTX_META_KEY, connCtx);
- }
- else {
- log.warning("Unsupported version: " + ver.toString());
-
- errMsg = "Unsupported version.";
- }
-
- // Send response.
- BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(8), null, null);
-
- if (errMsg == null) {
- writer.writeBoolean(true);
- }
- else {
- writer.writeBoolean(false);
- writer.writeShort(CURRENT_VER.major());
- writer.writeShort(CURRENT_VER.minor());
- writer.writeShort(CURRENT_VER.maintenance());
- writer.doWriteString(errMsg);
- }
-
- ses.send(writer.array());
- }
-
- /**
- * Prepare context.
- *
- * @param ver Version.
- * @param reader Reader.
- * @return Context.
- */
- private SqlListenerConnectionContext prepareContext(SqlListenerProtocolVersion ver, BinaryReaderExImpl reader) {
- // TODO: Switch between ODBC and JDBC.
- boolean distributedJoins = reader.readBoolean();
- boolean enforceJoinOrder = reader.readBoolean();
-
- OdbcRequestHandler handler =
- new OdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, enforceJoinOrder);
-
- OdbcMessageParser parser = new OdbcMessageParser(ctx);
-
- return new SqlListenerConnectionContext(handler, parser);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
deleted file mode 100644
index 6b8b5a3..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.odbc;
-
-import java.net.InetAddress;
-import java.nio.ByteOrder;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.OdbcConfiguration;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.HostAndPortRange;
-import org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter;
-import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
-import org.apache.ignite.internal.util.nio.GridNioFilter;
-import org.apache.ignite.internal.util.nio.GridNioServer;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.spi.IgnitePortProtocol;
-import org.apache.ignite.thread.IgniteThreadPoolExecutor;
-
-/**
- * ODBC processor.
- */
-public class OdbcProcessor extends GridProcessorAdapter {
- /** Default number of selectors. */
- private static final int DFLT_SELECTOR_CNT = Math.min(4, Runtime.getRuntime().availableProcessors());
-
- /** Default TCP_NODELAY flag. */
- private static final boolean DFLT_TCP_NODELAY = true;
-
- /** Default TCP direct buffer flag. */
- private static final boolean DFLT_TCP_DIRECT_BUF = false;
-
- /** Busy lock. */
- private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
- /** ODBC TCP Server. */
- private GridNioServer<byte[]> srv;
-
- /** ODBC executor service. */
- private ExecutorService odbcExecSvc;
-
- /**
- * @param ctx Kernal context.
- */
- public OdbcProcessor(GridKernalContext ctx) {
- super(ctx);
- }
-
- /** {@inheritDoc} */
- @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
- IgniteConfiguration cfg = ctx.config();
-
- OdbcConfiguration odbcCfg = cfg.getOdbcConfiguration();
-
- if (odbcCfg != null) {
- try {
- Marshaller marsh = cfg.getMarshaller();
-
- if (marsh != null && !(marsh instanceof BinaryMarshaller))
- throw new IgniteCheckedException("ODBC can only be used with BinaryMarshaller (please set it " +
- "through IgniteConfiguration.setMarshaller())");
-
- HostAndPortRange hostPort;
-
- if (F.isEmpty(odbcCfg.getEndpointAddress())) {
- hostPort = new HostAndPortRange(OdbcConfiguration.DFLT_TCP_HOST,
- OdbcConfiguration.DFLT_TCP_PORT_FROM,
- OdbcConfiguration.DFLT_TCP_PORT_TO
- );
- }
- else {
- hostPort = HostAndPortRange.parse(odbcCfg.getEndpointAddress(),
- OdbcConfiguration.DFLT_TCP_PORT_FROM,
- OdbcConfiguration.DFLT_TCP_PORT_TO,
- "Failed to parse ODBC endpoint address"
- );
- }
-
- assertParameter(odbcCfg.getThreadPoolSize() > 0, "threadPoolSize > 0");
-
- odbcExecSvc = new IgniteThreadPoolExecutor(
- "odbc",
- cfg.getIgniteInstanceName(),
- odbcCfg.getThreadPoolSize(),
- odbcCfg.getThreadPoolSize(),
- 0,
- new LinkedBlockingQueue<Runnable>());
-
- InetAddress host;
-
- try {
- host = InetAddress.getByName(hostPort.host());
- }
- catch (Exception e) {
- throw new IgniteCheckedException("Failed to resolve ODBC host: " + hostPort.host(), e);
- }
-
- Exception lastErr = null;
-
- for (int port = hostPort.portFrom(); port <= hostPort.portTo(); port++) {
- try {
- GridNioFilter[] filters = new GridNioFilter[] {
- new GridNioAsyncNotifyFilter(ctx.igniteInstanceName(), odbcExecSvc, log) {
- @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException {
- proceedSessionOpened(ses);
- }
- },
- new GridNioCodecFilter(new SqlListenerBufferedParser(), log, false)
- };
-
- GridNioServer<byte[]> srv0 = GridNioServer.<byte[]>builder()
- .address(host)
- .port(port)
- .listener(new OdbcNioListener(ctx, busyLock, odbcCfg.getMaxOpenCursors()))
- .logger(log)
- .selectorCount(DFLT_SELECTOR_CNT)
- .igniteInstanceName(ctx.igniteInstanceName())
- .serverName("odbc")
- .tcpNoDelay(DFLT_TCP_NODELAY)
- .directBuffer(DFLT_TCP_DIRECT_BUF)
- .byteOrder(ByteOrder.nativeOrder())
- .socketSendBufferSize(odbcCfg.getSocketSendBufferSize())
- .socketReceiveBufferSize(odbcCfg.getSocketReceiveBufferSize())
- .filters(filters)
- .directMode(false)
- .build();
-
- srv0.start();
-
- srv = srv0;
-
- ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass());
-
- log.info("ODBC processor has started on TCP port " + port);
-
- lastErr = null;
-
- break;
- }
- catch (Exception e) {
- lastErr = e;
- }
- }
-
- assert (srv != null && lastErr == null) || (srv == null && lastErr != null);
-
- if (lastErr != null)
- throw new IgniteCheckedException("Failed to bind to any [host:port] from the range [" +
- "address=" + hostPort + ", lastErr=" + lastErr + ']');
- }
- catch (Exception e) {
- throw new IgniteCheckedException("Failed to start ODBC processor.", e);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- if (srv != null) {
- busyLock.block();
-
- srv.stop();
-
- ctx.ports().deregisterPorts(getClass());
-
- if (odbcExecSvc != null) {
- U.shutdownNow(getClass(), odbcExecSvc, log);
-
- odbcExecSvc = null;
- }
-
- if (log.isDebugEnabled())
- log.debug("ODBC processor stopped.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractMessageParser.java
new file mode 100644
index 0000000..9d731ab
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractMessageParser.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+
+/**
+ * ODBC message parser.
+ */
+public abstract class SqlListenerAbstractMessageParser implements SqlListenerMessageParser {
+ /** Initial output stream capacity. */
+ protected static final int INIT_CAP = 1024;
+
+ /** Kernal context. */
+ protected GridKernalContext ctx;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Object reader. */
+ private SqlListenerAbstractObjectReader objReader;
+
+ /** Object writer. */
+ private SqlListenerAbstractObjectWriter objWriter;
+
+ /**
+ * @param ctx Context.
+ * @param objReader Object reader.
+ * @param objWriter Object writer.
+ */
+ protected SqlListenerAbstractMessageParser(final GridKernalContext ctx, SqlListenerAbstractObjectReader objReader,
+ SqlListenerAbstractObjectWriter objWriter) {
+ this.ctx = ctx;
+
+ log = ctx.log(getClass());
+
+ this.objReader = objReader;
+ this.objWriter = objWriter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SqlListenerRequest decode(byte[] msg) {
+ assert msg != null;
+
+ BinaryReaderExImpl reader = createReader(msg);
+
+ byte cmd = reader.readByte();
+
+ SqlListenerRequest res;
+
+ switch (cmd) {
+ case SqlListenerRequest.QRY_EXEC: {
+ String cache = reader.readString();
+ String sql = reader.readString();
+ int argsNum = reader.readInt();
+
+ Object[] params = new Object[argsNum];
+
+ for (int i = 0; i < argsNum; ++i)
+ params[i] = objReader.readObject(reader);
+
+ res = new SqlListenerQueryExecuteRequest(cache, sql, params);
+
+ break;
+ }
+
+ case SqlListenerRequest.QRY_FETCH: {
+ long queryId = reader.readLong();
+ int pageSize = reader.readInt();
+
+ res = new SqlListenerQueryFetchRequest(queryId, pageSize);
+
+ break;
+ }
+
+ case SqlListenerRequest.QRY_CLOSE: {
+ long queryId = reader.readLong();
+
+ res = new SqlListenerQueryCloseRequest(queryId);
+
+ break;
+ }
+
+ case SqlListenerRequest.META_COLS: {
+ String cache = reader.readString();
+ String table = reader.readString();
+ String column = reader.readString();
+
+ res = new OdbcQueryGetColumnsMetaRequest(cache, table, column);
+
+ break;
+ }
+
+ case SqlListenerRequest.META_TBLS: {
+ String catalog = reader.readString();
+ String schema = reader.readString();
+ String table = reader.readString();
+ String tableType = reader.readString();
+
+ res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType);
+
+ break;
+ }
+
+ case SqlListenerRequest.META_PARAMS: {
+ String cacheName = reader.readString();
+ String sqlQuery = reader.readString();
+
+ res = new OdbcQueryGetParamsMetaRequest(cacheName, sqlQuery);
+
+ break;
+ }
+
+ default:
+ throw new IgniteException("Unknown ODBC command: [cmd=" + cmd + ']');
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] encode(SqlListenerResponse msg) {
+ assert msg != null;
+
+ // Creating new binary writer
+ BinaryWriterExImpl writer = createWriter(INIT_CAP);
+
+ // Writing status.
+ writer.writeByte((byte) msg.status());
+
+ if (msg.status() != SqlListenerResponse.STATUS_SUCCESS) {
+ writer.writeString(msg.error());
+
+ return writer.array();
+ }
+
+ Object res0 = msg.response();
+
+ if (res0 == null)
+ return writer.array();
+ else if (res0 instanceof SqlListenerQueryExecuteResult) {
+ SqlListenerQueryExecuteResult res = (SqlListenerQueryExecuteResult) res0;
+
+ if (log.isDebugEnabled())
+ log.debug("Resulting query ID: " + res.getQueryId());
+
+ writer.writeLong(res.getQueryId());
+
+ Collection<SqlListenerColumnMeta> metas = res.getColumnsMetadata();
+
+ assert metas != null;
+
+ writer.writeInt(metas.size());
+
+ for (SqlListenerColumnMeta meta : metas)
+ meta.write(writer);
+ }
+ else if (res0 instanceof SqlListenerQueryFetchResult) {
+ SqlListenerQueryFetchResult res = (SqlListenerQueryFetchResult) res0;
+
+ if (log.isDebugEnabled())
+ log.debug("Resulting query ID: " + res.queryId());
+
+ writer.writeLong(res.queryId());
+
+ Collection<?> items0 = res.items();
+
+ assert items0 != null;
+
+ writer.writeBoolean(res.last());
+
+ writer.writeInt(items0.size());
+
+ for (Object row0 : items0) {
+ if (row0 != null) {
+ Collection<?> row = (Collection<?>)row0;
+
+ writer.writeInt(row.size());
+
+ for (Object obj : row)
+ objWriter.writeObject(writer, obj);
+ }
+ }
+ }
+ else if (res0 instanceof SqlListenerQueryCloseResult) {
+ SqlListenerQueryCloseResult res = (SqlListenerQueryCloseResult) res0;
+
+ if (log.isDebugEnabled())
+ log.debug("Resulting query ID: " + res.getQueryId());
+
+ writer.writeLong(res.getQueryId());
+ }
+ else if (res0 instanceof OdbcQueryGetColumnsMetaResult) {
+ OdbcQueryGetColumnsMetaResult res = (OdbcQueryGetColumnsMetaResult) res0;
+
+ Collection<SqlListenerColumnMeta> columnsMeta = res.meta();
+
+ assert columnsMeta != null;
+
+ writer.writeInt(columnsMeta.size());
+
+ for (SqlListenerColumnMeta columnMeta : columnsMeta)
+ columnMeta.write(writer);
+ }
+ else if (res0 instanceof OdbcQueryGetTablesMetaResult) {
+ OdbcQueryGetTablesMetaResult res = (OdbcQueryGetTablesMetaResult) res0;
+
+ Collection<OdbcTableMeta> tablesMeta = res.meta();
+
+ assert tablesMeta != null;
+
+ writer.writeInt(tablesMeta.size());
+
+ for (OdbcTableMeta tableMeta : tablesMeta)
+ tableMeta.writeBinary(writer);
+ }
+ else if (res0 instanceof OdbcQueryGetParamsMetaResult) {
+ OdbcQueryGetParamsMetaResult res = (OdbcQueryGetParamsMetaResult) res0;
+
+ byte[] typeIds = res.typeIds();
+
+ objWriter.writeObject(writer, typeIds);
+ }
+ else
+ assert false : "Should not reach here.";
+
+ return writer.array();
+ }
+
+ /**
+ * Create reader.
+ *
+ * @param msg Input message.
+ * @return Reader.
+ */
+ protected abstract BinaryReaderExImpl createReader(byte[] msg);
+
+ /**
+ * Create writer.
+ *
+ * @param cap Initial capacity.
+ * @return Binary writer instance.
+ */
+ protected abstract BinaryWriterExImpl createWriter(int cap);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectReader.java
new file mode 100644
index 0000000..18162e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectReader.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Binary reader with marshaling non-primitive and non-embedded objects with JDK marshaller.
+ */
+@SuppressWarnings("unchecked")
+public abstract class SqlListenerAbstractObjectReader {
+ /**
+ * @param reader Reader.
+ * @return Read object.
+ * @throws BinaryObjectException On error.
+ */
+ @Nullable public Object readObject(BinaryReaderExImpl reader) throws BinaryObjectException {
+ byte type = reader.readByte();
+
+ switch (type) {
+ case GridBinaryMarshaller.NULL:
+ return null;
+
+ case GridBinaryMarshaller.BOOLEAN:
+ return reader.readBoolean();
+
+ case GridBinaryMarshaller.BYTE:
+ return reader.readByte();
+
+ case GridBinaryMarshaller.CHAR:
+ return reader.readChar();
+
+ case GridBinaryMarshaller.SHORT:
+ return reader.readShort();
+
+ case GridBinaryMarshaller.INT:
+ return reader.readInt();
+
+ case GridBinaryMarshaller.LONG:
+ return reader.readLong();
+
+ case GridBinaryMarshaller.FLOAT:
+ return reader.readFloat();
+
+ case GridBinaryMarshaller.DOUBLE:
+ return reader.readDouble();
+
+ case GridBinaryMarshaller.STRING:
+ return BinaryUtils.doReadString(reader.in());
+
+ case GridBinaryMarshaller.DECIMAL:
+ return BinaryUtils.doReadDecimal(reader.in());
+
+ case GridBinaryMarshaller.UUID:
+ return BinaryUtils.doReadUuid(reader.in());
+
+ case GridBinaryMarshaller.TIME:
+ return BinaryUtils.doReadTime(reader.in());
+
+ case GridBinaryMarshaller.TIMESTAMP:
+ return BinaryUtils.doReadTimestamp(reader.in());
+
+ case GridBinaryMarshaller.DATE:
+ return BinaryUtils.doReadDate(reader.in());
+
+ case GridBinaryMarshaller.BOOLEAN_ARR:
+ return BinaryUtils.doReadBooleanArray(reader.in());
+
+ case GridBinaryMarshaller.BYTE_ARR:
+ return BinaryUtils.doReadByteArray(reader.in());
+
+ case GridBinaryMarshaller.CHAR_ARR:
+ return BinaryUtils.doReadCharArray(reader.in());
+
+ case GridBinaryMarshaller.SHORT_ARR:
+ return BinaryUtils.doReadShortArray(reader.in());
+
+ case GridBinaryMarshaller.INT_ARR:
+ return BinaryUtils.doReadIntArray(reader.in());
+
+ case GridBinaryMarshaller.FLOAT_ARR:
+ return BinaryUtils.doReadFloatArray(reader.in());
+
+ case GridBinaryMarshaller.DOUBLE_ARR:
+ return BinaryUtils.doReadDoubleArray(reader.in());
+
+ case GridBinaryMarshaller.STRING_ARR:
+ return BinaryUtils.doReadStringArray(reader.in());
+
+ case GridBinaryMarshaller.DECIMAL_ARR:
+ return BinaryUtils.doReadDecimalArray(reader.in());
+
+ case GridBinaryMarshaller.UUID_ARR:
+ return BinaryUtils.doReadUuidArray(reader.in());
+
+ case GridBinaryMarshaller.TIME_ARR:
+ return BinaryUtils.doReadTimeArray(reader.in());
+
+ case GridBinaryMarshaller.TIMESTAMP_ARR:
+ return BinaryUtils.doReadTimestampArray(reader.in());
+
+ case GridBinaryMarshaller.DATE_ARR:
+ return BinaryUtils.doReadDateArray(reader.in());
+
+ default:
+ reader.in().position(reader.in().position() - 1);
+
+ return readCustomObject(reader);
+ }
+ }
+
+ /**
+ * @param reader Reader.
+ * @return An object is unmarshaled by marshaller.
+ * @throws BinaryObjectException On error.
+ */
+ protected abstract Object readCustomObject(BinaryReaderExImpl reader) throws BinaryObjectException;
+}