You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2021/10/06 17:47:24 UTC

[asterixdb-clients] branch master updated: [NO ISSUE][JDBC] Support foreign key metadata, other improvements

This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new e51b9e4  [NO ISSUE][JDBC] Support foreign key metadata, other improvements
e51b9e4 is described below

commit e51b9e457635f2f57f37307481ba014ec1f392a1
Author: Dmitry Lychagin <dm...@couchbase.com>
AuthorDate: Wed Oct 6 10:36:53 2021 -0700

    [NO ISSUE][JDBC] Support foreign key metadata, other improvements
    
    Change-Id: I01dcaf1e9ade568363df51f58f412956c9e0da45
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb-clients/+/13584
    Reviewed-by: Dmitry Lychagin <dm...@couchbase.com>
    Reviewed-by: Ian Maxon <im...@uci.edu>
    Tested-by: Dmitry Lychagin <dm...@couchbase.com>
---
 .../apache/asterix/jdbc/core/ADBConnection.java    |  88 +++++-
 .../asterix/jdbc/core/ADBDatabaseMetaData.java     |  38 ++-
 .../apache/asterix/jdbc/core/ADBDriverBase.java    |  28 +-
 .../apache/asterix/jdbc/core/ADBDriverContext.java |   2 +-
 .../asterix/jdbc/core/ADBDriverProperty.java       |  28 +-
 .../apache/asterix/jdbc/core/ADBErrorReporter.java |  11 +-
 .../apache/asterix/jdbc/core/ADBMetaStatement.java | 343 ++++++++++++++++++---
 .../asterix/jdbc/core/ADBPreparedStatement.java    |  17 +-
 .../org/apache/asterix/jdbc/core/ADBProtocol.java  | 110 +++++--
 .../org/apache/asterix/jdbc/core/ADBRowStore.java  |  87 +++---
 .../org/apache/asterix/jdbc/core/ADBStatement.java | 226 ++++++++++----
 11 files changed, 759 insertions(+), 219 deletions(-)

diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java
index 46e38bc..3a0ae5e 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBConnection.java
@@ -49,35 +49,64 @@ import java.util.logging.Logger;
 public class ADBConnection extends ADBWrapperSupport implements Connection {
 
     final ADBProtocol protocol;
-
     final String url;
-
     final String databaseVersion;
-
+    final ADBDriverProperty.CatalogDataverseMode catalogDataverseMode;
+    final boolean catalogIncludesSchemaless;
+    final boolean sqlCompatMode;
     private final AtomicBoolean closed;
-
     private final ConcurrentLinkedQueue<ADBStatement> statements;
-
     private volatile SQLWarning warning;
-
     private volatile ADBMetaStatement metaStatement;
-
-    volatile String catalog;
-
-    volatile String schema;
+    private volatile String catalog;
+    private volatile String schema;
 
     // Lifecycle
 
-    protected ADBConnection(ADBProtocol protocol, String url, String databaseVersion, String catalog, String schema,
-            SQLWarning connectWarning) {
+    protected ADBConnection(ADBProtocol protocol, String url, String databaseVersion, String dataverseCanonicalName,
+            Map<ADBDriverProperty, Object> properties, SQLWarning connectWarning) throws SQLException {
         this.url = Objects.requireNonNull(url);
         this.protocol = Objects.requireNonNull(protocol);
         this.databaseVersion = databaseVersion;
         this.statements = new ConcurrentLinkedQueue<>();
         this.warning = connectWarning;
-        this.catalog = catalog;
-        this.schema = schema;
         this.closed = new AtomicBoolean(false);
+        this.sqlCompatMode = (Boolean) ADBDriverProperty.Common.SQL_COMPAT_MODE.fetchPropertyValue(properties);
+        this.catalogDataverseMode = getCatalogDataverseMode(protocol, properties);
+        this.catalogIncludesSchemaless =
+                (Boolean) ADBDriverProperty.Common.CATALOG_INCLUDES_SCHEMALESS.fetchPropertyValue(properties);
+        initCatalogSchema(protocol, dataverseCanonicalName);
+    }
+
+    private void initCatalogSchema(ADBProtocol protocol, String dataverseCanonicalName) throws SQLException {
+        switch (catalogDataverseMode) {
+            case CATALOG:
+                catalog = dataverseCanonicalName == null || dataverseCanonicalName.isEmpty()
+                        ? ADBProtocol.DEFAULT_DATAVERSE : dataverseCanonicalName;
+                // schema = null
+                break;
+            case CATALOG_SCHEMA:
+                if (dataverseCanonicalName == null || dataverseCanonicalName.isEmpty()) {
+                    catalog = ADBProtocol.DEFAULT_DATAVERSE;
+                    // schema = null
+                } else {
+                    String[] parts = dataverseCanonicalName.split("/");
+                    switch (parts.length) {
+                        case 1:
+                            catalog = parts[0];
+                            break;
+                        case 2:
+                            catalog = parts[0];
+                            schema = parts[1];
+                            break;
+                        default:
+                            throw protocol.getErrorReporter().errorInConnection(dataverseCanonicalName); //TODO:FIXME
+                    }
+                }
+                break;
+            default:
+                throw new IllegalStateException();
+        }
     }
 
     @Override
@@ -239,7 +268,7 @@ public class ADBConnection extends ADBWrapperSupport implements Connection {
     }
 
     private ADBStatement createStatementImpl() {
-        ADBStatement stmt = new ADBStatement(this, catalog, schema);
+        ADBStatement stmt = new ADBStatement(this);
         registerStatement(stmt);
         return stmt;
     }
@@ -280,7 +309,7 @@ public class ADBConnection extends ADBWrapperSupport implements Connection {
     }
 
     private ADBPreparedStatement prepareStatementImpl(String sql) throws SQLException {
-        ADBPreparedStatement stmt = new ADBPreparedStatement(this, sql, catalog, schema);
+        ADBPreparedStatement stmt = new ADBPreparedStatement(this, sql);
         registerStatement(stmt);
         return stmt;
     }
@@ -328,9 +357,36 @@ public class ADBConnection extends ADBWrapperSupport implements Connection {
     @Override
     public void setSchema(String schema) throws SQLException {
         checkClosed();
+        if (catalogDataverseMode == ADBDriverProperty.CatalogDataverseMode.CATALOG
+                && (schema != null && !schema.isEmpty())) {
+            throw getErrorReporter().errorInConnection(schema); //TODO:FIXME:REVIEW make no-op?
+        }
         this.schema = schema;
     }
 
+    String getDataverseCanonicalName() {
+        switch (catalogDataverseMode) {
+            case CATALOG:
+                return catalog;
+            case CATALOG_SCHEMA:
+                String c = catalog;
+                String s = schema;
+                return s == null ? c : c + "/" + s;
+            default:
+                throw new IllegalStateException();
+        }
+    }
+
+    private static ADBDriverProperty.CatalogDataverseMode getCatalogDataverseMode(ADBProtocol protocol,
+            Map<ADBDriverProperty, Object> properties) throws SQLException {
+        int mode = ((Number) ADBDriverProperty.Common.CATALOG_DATAVERSE_MODE.fetchPropertyValue(properties)).intValue();
+        try {
+            return ADBDriverProperty.CatalogDataverseMode.valueOf(mode);
+        } catch (IllegalArgumentException e) {
+            throw protocol.getErrorReporter().errorInConnection(String.valueOf(mode)); //TODO:FIXME
+        }
+    }
+
     // Statement lifecycle
 
     private void registerStatement(ADBStatement stmt) {
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java
index a090498..1d463dc 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDatabaseMetaData.java
@@ -122,7 +122,7 @@ final class ADBDatabaseMetaData extends ADBWrapperSupport implements DatabaseMet
 
     @Override
     public ADBResultSet getSchemas() throws SQLException {
-        return getSchemas(metaStatement.connection.catalog, null);
+        return metaStatement.executeGetSchemasQuery();
     }
 
     @Override
@@ -135,6 +135,15 @@ final class ADBDatabaseMetaData extends ADBWrapperSupport implements DatabaseMet
         return METADATA_OBJECT_NAME_LENGTH_LIMIT_UTF8;
     }
 
+    //TODO:document
+    private boolean supportsCatalogsInStatements() {
+        return false;
+    }
+
+    private boolean supportsSchemasInStatements() {
+        return false;
+    }
+
     // Tables
 
     @Override
@@ -214,18 +223,19 @@ final class ADBDatabaseMetaData extends ADBWrapperSupport implements DatabaseMet
 
     @Override
     public ADBResultSet getImportedKeys(String catalog, String schema, String table) throws SQLException {
-        return metaStatement.executeEmptyResultQuery();
+        return metaStatement.executeGetImportedKeysQuery(catalog, schema, table);
     }
 
     @Override
     public ADBResultSet getExportedKeys(String catalog, String schema, String table) throws SQLException {
-        return metaStatement.executeEmptyResultQuery();
+        return metaStatement.executeGetExportedKeysQuery(catalog, schema, table);
     }
 
     @Override
     public ADBResultSet getCrossReference(String parentCatalog, String parentSchema, String parentTable,
             String foreignCatalog, String foreignSchema, String foreignTable) throws SQLException {
-        return metaStatement.executeEmptyResultQuery();
+        return metaStatement.executeCrossReferenceQuery(parentCatalog, parentSchema, parentTable, foreignCatalog,
+                foreignSchema, foreignTable);
     }
 
     // Indexes
@@ -696,12 +706,12 @@ final class ADBDatabaseMetaData extends ADBWrapperSupport implements DatabaseMet
 
     @Override
     public boolean supportsCatalogsInDataManipulation() {
-        return true;
+        return supportsCatalogsInStatements();
     }
 
     @Override
     public boolean supportsSchemasInDataManipulation() {
-        return true;
+        return supportsSchemasInStatements();
     }
 
     @Override
@@ -725,12 +735,12 @@ final class ADBDatabaseMetaData extends ADBWrapperSupport implements DatabaseMet
 
     @Override
     public boolean supportsCatalogsInTableDefinitions() {
-        return true;
+        return supportsCatalogsInStatements();
     }
 
     @Override
     public boolean supportsSchemasInTableDefinitions() {
-        return true;
+        return supportsSchemasInStatements();
     }
 
     @Override
@@ -752,24 +762,24 @@ final class ADBDatabaseMetaData extends ADBWrapperSupport implements DatabaseMet
 
     @Override
     public boolean supportsCatalogsInIndexDefinitions() {
-        return true;
+        return supportsCatalogsInStatements();
     }
 
     @Override
     public boolean supportsSchemasInIndexDefinitions() {
-        return true;
+        return supportsSchemasInStatements();
     }
 
     // DDL: GRANT / REVOKE (not supported)
 
     @Override
     public boolean supportsCatalogsInPrivilegeDefinitions() {
-        return false;
+        return supportsCatalogsInStatements();
     }
 
     @Override
     public boolean supportsSchemasInPrivilegeDefinitions() {
-        return false;
+        return supportsSchemasInStatements();
     }
 
     @Override
@@ -786,12 +796,12 @@ final class ADBDatabaseMetaData extends ADBWrapperSupport implements DatabaseMet
 
     @Override
     public boolean supportsCatalogsInProcedureCalls() {
-        return false;
+        return supportsCatalogsInStatements();
     }
 
     @Override
     public boolean supportsSchemasInProcedureCalls() {
-        return false;
+        return supportsSchemasInStatements();
     }
 
     // Transactions
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java
index 5d89612..315f270 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverBase.java
@@ -154,21 +154,6 @@ public abstract class ADBDriverBase {
             port = defaultApiPort;
         }
 
-        String catalog = ADBProtocol.DEFAULT_DATAVERSE;
-        String schema = null;
-        String path = subUri.getPath();
-        if (path != null && path.length() > 1 && path.startsWith("/")) {
-            String[] dataverse = path.substring(1).split("/");
-            switch (dataverse.length) {
-                case 2:
-                    schema = dataverse[1];
-                    // fall thru to 1
-                case 1:
-                    catalog = dataverse[0];
-                    break;
-            }
-        }
-
         List<NameValuePair> urlParams = URLEncodedUtils.parse(subUri, StandardCharsets.UTF_8);
 
         ADBDriverContext driverContext = getOrCreateDriverContext();
@@ -177,10 +162,14 @@ public abstract class ADBDriverBase {
         parseConnectionProperties(urlParams, info, driverContext, properties, warning);
         warning = warning.getNextWarning() != null ? warning.getNextWarning() : null;
 
+        String path = subUri.getPath();
+        String dataverseCanonicalName =
+                path != null && path.length() > 1 && path.startsWith("/") ? path.substring(1) : null;
+
         ADBProtocol protocol = createProtocol(host, port, properties, driverContext);
         try {
             String databaseVersion = protocol.connect();
-            return createConnection(protocol, url, databaseVersion, catalog, schema, warning);
+            return createConnection(protocol, url, databaseVersion, dataverseCanonicalName, properties, warning);
         } catch (SQLException e) {
             try {
                 protocol.close();
@@ -248,8 +237,9 @@ public abstract class ADBDriverBase {
         return new ADBProtocol(host, port, properties, driverContext);
     }
 
-    protected ADBConnection createConnection(ADBProtocol protocol, String url, String databaseVersion, String catalog,
-            String schema, SQLWarning connectWarning) {
-        return new ADBConnection(protocol, url, databaseVersion, catalog, schema, connectWarning);
+    protected ADBConnection createConnection(ADBProtocol protocol, String url, String databaseVersion,
+            String dataverseCanonicalName, Map<ADBDriverProperty, Object> properties, SQLWarning connectWarning)
+            throws SQLException {
+        return new ADBConnection(protocol, url, databaseVersion, dataverseCanonicalName, properties, connectWarning);
     }
 }
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java
index 89f01e9..4b57f7d 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverContext.java
@@ -31,7 +31,7 @@ import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 
-final class ADBDriverContext {
+public class ADBDriverContext {
 
     final Class<? extends ADBDriverBase> driverClass;
 
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverProperty.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverProperty.java
index 1d1e67e..37cf57e 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverProperty.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBDriverProperty.java
@@ -19,10 +19,11 @@
 
 package org.apache.asterix.jdbc.core;
 
+import java.util.Map;
 import java.util.Objects;
 import java.util.function.Function;
 
-interface ADBDriverProperty {
+public interface ADBDriverProperty {
 
     String getPropertyName();
 
@@ -36,7 +37,10 @@ interface ADBDriverProperty {
         PASSWORD("password", Function.identity(), null),
         CONNECT_TIMEOUT("connectTimeout", Integer::parseInt, null),
         SOCKET_TIMEOUT("socketTimeout", Integer::parseInt, null),
-        MAX_WARNINGS("maxWarnings", Integer::parseInt, 10);
+        MAX_WARNINGS("maxWarnings", Integer::parseInt, 10),
+        CATALOG_DATAVERSE_MODE("catalogDataverseMode", Integer::parseInt, 1), // 1 -> CATALOG, 2 -> CATALOG_SCHEMA
+        CATALOG_INCLUDES_SCHEMALESS("catalogIncludesSchemaless", Boolean::parseBoolean, false),
+        SQL_COMPAT_MODE("sqlCompatMode", Boolean::parseBoolean, true); // Whether user statements are executed in 'SQL-compat' mode
 
         private final String propertyName;
 
@@ -67,5 +71,25 @@ interface ADBDriverProperty {
         public String toString() {
             return getPropertyName();
         }
+
+        public Object fetchPropertyValue(Map<ADBDriverProperty, Object> properties) {
+            return properties.getOrDefault(this, defaultValue);
+        }
+    }
+
+    enum CatalogDataverseMode {
+        CATALOG,
+        CATALOG_SCHEMA;
+
+        static CatalogDataverseMode valueOf(int n) {
+            switch (n) {
+                case 1:
+                    return CATALOG;
+                case 2:
+                    return CATALOG_SCHEMA;
+                default:
+                    throw new IllegalArgumentException(String.valueOf(n));
+            }
+        }
     }
 }
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBErrorReporter.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBErrorReporter.java
index f31e18a..4f409b7 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBErrorReporter.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBErrorReporter.java
@@ -20,6 +20,7 @@
 package org.apache.asterix.jdbc.core;
 
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.sql.SQLClientInfoException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
@@ -110,7 +111,7 @@ final class ADBErrorReporter {
 
     protected SQLInvalidAuthorizationSpecException errorAuth() {
         return new SQLInvalidAuthorizationSpecException("Authentication/authorization error",
-                SQLState.CONNECTION_REJECTED.code);
+                SQLState.INVALID_AUTH_SPEC.code);
     }
 
     protected SQLException errorColumnNotFound(String columnNameOrNumber) {
@@ -163,6 +164,10 @@ final class ADBErrorReporter {
         return new SQLException(String.format("Cannot create request. %s", getMessage(e)), e);
     }
 
+    protected SQLException errorInRequestURIGeneration(URISyntaxException e) {
+        return new SQLException(String.format("Cannot create request URI. %s", getMessage(e)), e);
+    }
+
     protected SQLException errorInResultHandling(IOException e) {
         return new SQLException(String.format("Cannot reading result. %s", getMessage(e)), e);
     }
@@ -193,8 +198,8 @@ final class ADBErrorReporter {
     }
 
     public enum SQLState {
-        CONNECTION_REJECTED("08004"),
-        CONNECTION_FAILURE("08006"),
+        CONNECTION_FAILURE("08001"), // TODO:08006??
+        INVALID_AUTH_SPEC("28000"),
         INVALID_DATE_TYPE("HY004"),
         INVALID_CURSOR_POSITION("HY108");
 
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java
index 7fa8127..bea8c26 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBMetaStatement.java
@@ -37,41 +37,95 @@ public class ADBMetaStatement extends ADBStatement {
     public static final String TABLE = "TABLE";
     public static final String VIEW = "VIEW";
 
+    private static final String PK_NAME_SUFFIX = "_pk";
+    private static final String FK_NAME_SUFFIX = "_fk";
+
     protected ADBMetaStatement(ADBConnection connection) {
-        super(connection, null, null);
+        super(connection);
+    }
+
+    protected void populateQueryProlog(StringBuilder sql, String comment) {
+        if (comment != null) {
+            sql.append("/* ").append(comment).append(" */\n");
+        }
+        //sql.append("set `compiler.min.memory.allocation` 'false';\n");
     }
 
     ADBResultSet executeGetCatalogsQuery() throws SQLException {
         checkClosed();
 
         StringBuilder sql = new StringBuilder(256);
+        populateQueryProlog(sql, "JDBC-GetCatalogs");
 
         sql.append("select TABLE_CAT ");
         sql.append("from Metadata.`Dataverse` ");
-        sql.append("let name = decode_dataverse_name(DataverseName), ");
-        sql.append("TABLE_CAT = name[0] ");
-        sql.append("where array_length(name) between 1 and 2 ");
-        sql.append("group by TABLE_CAT ");
+        switch (connection.catalogDataverseMode) {
+            case CATALOG:
+                sql.append("let TABLE_CAT = DataverseName ");
+                break;
+            case CATALOG_SCHEMA:
+                sql.append("let name = decode_dataverse_name(DataverseName), ");
+                sql.append("TABLE_CAT = name[0] ");
+                sql.append("where (array_length(name) between 1 and 2) ");
+                sql.append("group by TABLE_CAT ");
+                break;
+            default:
+                throw new IllegalStateException();
+        }
+
         sql.append("order by TABLE_CAT");
 
         return executeQueryImpl(sql.toString(), null);
     }
 
+    ADBResultSet executeGetSchemasQuery() throws SQLException {
+        String catalog;
+        switch (connection.catalogDataverseMode) {
+            case CATALOG:
+                catalog = connection.getDataverseCanonicalName();
+                break;
+            case CATALOG_SCHEMA:
+                catalog = connection.getCatalog();
+                break;
+            default:
+                throw new IllegalStateException();
+        }
+        return executeGetSchemasQuery(catalog, null, "0");
+    }
+
     ADBResultSet executeGetSchemasQuery(String catalog, String schemaPattern) throws SQLException {
+        return executeGetSchemasQuery(catalog, schemaPattern, "1");
+    }
+
+    ADBResultSet executeGetSchemasQuery(String catalog, String schemaPattern, String tag) throws SQLException {
         checkClosed();
 
         StringBuilder sql = new StringBuilder(512);
+        populateQueryProlog(sql, "JDBC-GetSchemas-" + tag);
+
         sql.append("select TABLE_SCHEM, TABLE_CATALOG ");
         sql.append("from Metadata.`Dataverse` ");
-        sql.append("let name = decode_dataverse_name(DataverseName), ");
-        sql.append("TABLE_CATALOG = name[0], ");
-        sql.append("TABLE_SCHEM = case array_length(name) when 1 then null else name[1] end ");
-        sql.append("where array_length(name) between 1 and 2 ");
+        sql.append("let ");
+        switch (connection.catalogDataverseMode) {
+            case CATALOG:
+                sql.append("TABLE_CATALOG = DataverseName, ");
+                sql.append("TABLE_SCHEM = null ");
+                sql.append("where true ");
+                break;
+            case CATALOG_SCHEMA:
+                sql.append("name = decode_dataverse_name(DataverseName), ");
+                sql.append("TABLE_CATALOG = name[0], ");
+                sql.append("TABLE_SCHEM = case array_length(name) when 1 then null else name[1] end ");
+                sql.append("where (array_length(name) between 1 and 2) ");
+                break;
+            default:
+                throw new IllegalStateException();
+        }
         if (catalog != null) {
-            sql.append("and TABLE_CATALOG = $1 ");
+            sql.append("and (TABLE_CATALOG = $1) ");
         }
         if (schemaPattern != null) {
-            sql.append("and if_null(TABLE_SCHEM, '') like $2 ");
+            sql.append("and (if_null(TABLE_SCHEM, '') like $2) ");
         }
         sql.append("order by TABLE_CATALOG, TABLE_SCHEM");
 
@@ -88,38 +142,65 @@ public class ADBMetaStatement extends ADBStatement {
         String viewTermNonTabular = getViewTerm(false);
 
         StringBuilder sql = new StringBuilder(1024);
+        populateQueryProlog(sql, "JDBC-GetTables");
+
         sql.append("select TABLE_CAT, TABLE_SCHEM, TABLE_NAME, TABLE_TYPE, null REMARKS, null TYPE_CAT, ");
         sql.append("null TYPE_SCHEM, null TYPE_NAME, null SELF_REFERENCING_COL_NAME, null REF_GENERATION ");
         sql.append("from Metadata.`Dataset` ds join Metadata.`Datatype` dt ");
         sql.append("on ds.DatatypeDataverseName = dt.DataverseName and ds.DatatypeName = dt.DatatypeName ");
-        sql.append("let dvname = decode_dataverse_name(ds.DataverseName), ");
+        sql.append("let ");
+        switch (connection.catalogDataverseMode) {
+            case CATALOG:
+                sql.append("TABLE_CAT = ds.DataverseName, ");
+                sql.append("TABLE_SCHEM = null, ");
+                break;
+            case CATALOG_SCHEMA:
+                sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
+                sql.append("TABLE_CAT = dvname[0], ");
+                sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+                break;
+            default:
+                throw new IllegalStateException();
+        }
+        sql.append("TABLE_NAME = ds.DatasetName, ");
         sql.append("isDataset = (ds.DatasetType = 'INTERNAL' or ds.DatasetType = 'EXTERNAL'), ");
         sql.append("isView = ds.DatasetType = 'VIEW', ");
         sql.append("hasFields = array_length(dt.Derived.Record.Fields) > 0, ");
-        sql.append("TABLE_CAT = dvname[0], ");
-        sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
-        sql.append("TABLE_NAME = ds.DatasetName, ");
         sql.append("TABLE_TYPE = case ");
         sql.append("when isDataset then (case when hasFields then '").append(datasetTermTabular).append("' else '")
                 .append(datasetTermNonTabular).append("' end) ");
         sql.append("when isView then (case when hasFields then '").append(viewTermTabular).append("' else '")
                 .append(viewTermNonTabular).append("' end) ");
         sql.append("else null end ");
-        sql.append("where array_length(dvname) between 1 and 2 ");
+
+        sql.append("where ");
+        sql.append("(TABLE_TYPE ").append(types != null ? "in $1" : "is not null").append(") ");
         if (catalog != null) {
-            sql.append("and TABLE_CAT = $1 ");
+            sql.append("and (TABLE_CAT = $2) ");
         }
         if (schemaPattern != null) {
-            sql.append("and if_null(TABLE_SCHEM, '') like $2 ");
+            sql.append("and (if_null(TABLE_SCHEM, '') like $3) ");
         }
         if (tableNamePattern != null) {
-            sql.append("and TABLE_NAME like $3 ");
+            sql.append("and (TABLE_NAME like $4) ");
+        }
+        switch (connection.catalogDataverseMode) {
+            case CATALOG:
+                break;
+            case CATALOG_SCHEMA:
+                sql.append("and (array_length(dvname) between 1 and 2) ");
+                break;
+            default:
+                throw new IllegalStateException();
         }
-        sql.append("and TABLE_TYPE ").append(types != null ? "in $4" : "is not null").append(" ");
+        if (!connection.catalogIncludesSchemaless) {
+            sql.append("and hasFields ");
+        }
+
         sql.append("order by TABLE_TYPE, TABLE_CAT, TABLE_SCHEM, TABLE_NAME");
 
-        return executeQueryImpl(sql.toString(),
-                Arrays.asList(catalog, schemaPattern, tableNamePattern, types != null ? Arrays.asList(types) : null));
+        List<String> typesList = types != null ? Arrays.asList(types) : null;
+        return executeQueryImpl(sql.toString(), Arrays.asList(typesList, catalog, schemaPattern, tableNamePattern));
     }
 
     ADBResultSet executeGetColumnsQuery(String catalog, String schemaPattern, String tableNamePattern,
@@ -127,6 +208,8 @@ public class ADBMetaStatement extends ADBStatement {
         checkClosed();
 
         StringBuilder sql = new StringBuilder(2048);
+        populateQueryProlog(sql, "JDBC-GetColumns");
+
         sql.append("select TABLE_CAT, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, DATA_TYPE, TYPE_NAME, COLUMN_SIZE, ");
         sql.append("1 BUFFER_LENGTH, null DECIMAL_DIGITS, 2 NUM_PREC_RADIX, NULLABLE, ");
         sql.append("null REMARKS, null COLUMN_DEF, DATA_TYPE SQL_DATA_TYPE,");
@@ -141,9 +224,20 @@ public class ADBMetaStatement extends ADBStatement {
         sql.append("left join Metadata.`Datatype` dt2 ");
         sql.append(
                 "on field.FieldType = dt2.DatatypeName and ds.DataverseName = dt2.DataverseName and dt2.Derived is known ");
-        sql.append("let dvname = decode_dataverse_name(ds.DataverseName), ");
-        sql.append("TABLE_CAT = dvname[0], ");
-        sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+        sql.append("let ");
+        switch (connection.catalogDataverseMode) {
+            case CATALOG:
+                sql.append("TABLE_CAT = ds.DataverseName, ");
+                sql.append("TABLE_SCHEM = null, ");
+                break;
+            case CATALOG_SCHEMA:
+                sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
+                sql.append("TABLE_CAT = dvname[0], ");
+                sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+                break;
+            default:
+                throw new IllegalStateException();
+        }
         sql.append("TABLE_NAME = ds.DatasetName, ");
         sql.append("COLUMN_NAME = field.FieldName, ");
         sql.append("TYPE_NAME = case ");
@@ -169,21 +263,30 @@ public class ADBMetaStatement extends ADBStatement {
         sql.append("COLUMN_SIZE = case field.FieldType when 'string' then 32767 else 8 end, "); // TODO:based on type
         sql.append("ORDINAL_POSITION = fieldpos, ");
         sql.append("NULLABLE = case when field.IsNullable or field.IsMissable then 1 else 0 end ");
-        sql.append("where array_length(dvname) between 1 and 2 ");
 
-        sql.append("and array_length(dt.Derived.Record.Fields) > 0 ");
+        sql.append("where (array_length(dt.Derived.Record.Fields) > 0) ");
         if (catalog != null) {
-            sql.append("and TABLE_CAT = $1 ");
+            sql.append("and (TABLE_CAT = $1) ");
         }
         if (schemaPattern != null) {
-            sql.append("and if_null(TABLE_SCHEM, '') like $2 ");
+            sql.append("and (if_null(TABLE_SCHEM, '') like $2) ");
         }
         if (tableNamePattern != null) {
-            sql.append("and TABLE_NAME like $3 ");
+            sql.append("and (TABLE_NAME like $3) ");
         }
         if (columnNamePattern != null) {
-            sql.append("and COLUMN_NAME like $4 ");
+            sql.append("and (COLUMN_NAME like $4) ");
+        }
+        switch (connection.catalogDataverseMode) {
+            case CATALOG:
+                break;
+            case CATALOG_SCHEMA:
+                sql.append("and (array_length(dvname) between 1 and 2) ");
+                break;
+            default:
+                throw new IllegalStateException();
         }
+
         sql.append("order by TABLE_CAT, TABLE_SCHEM, TABLE_NAME, ORDINAL_POSITION");
 
         return executeQueryImpl(sql.toString(),
@@ -194,39 +297,181 @@ public class ADBMetaStatement extends ADBStatement {
         checkClosed();
 
         StringBuilder sql = new StringBuilder(1024);
-        sql.append("select TABLE_CAT, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, KEY_SEQ, null PK_NAME ");
-        sql.append("from Metadata.`Dataset` ds unnest ds.InternalDetails.PrimaryKey pki at pkipos ");
-        sql.append("let dvname = decode_dataverse_name(ds.DataverseName), ");
-        sql.append("TABLE_CAT = dvname[0], ");
-        sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+        populateQueryProlog(sql, "JDBC-GetPrimaryKeys");
+
+        sql.append("select TABLE_CAT, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, KEY_SEQ, PK_NAME ");
+        sql.append("from Metadata.`Dataset` ds ");
+        sql.append("join Metadata.`Datatype` dt ");
+        sql.append("on ds.DatatypeDataverseName = dt.DataverseName and ds.DatatypeName = dt.DatatypeName ");
+        sql.append("unnest coalesce(ds.InternalDetails, ds.ExternalDetails, ds.ViewDetails).PrimaryKey pki at pkipos ");
+        sql.append("let ");
+        sql.append("hasFields = array_length(dt.Derived.Record.Fields) > 0, ");
+        switch (connection.catalogDataverseMode) {
+            case CATALOG:
+                sql.append("TABLE_CAT = ds.DataverseName, ");
+                sql.append("TABLE_SCHEM = null, ");
+                break;
+            case CATALOG_SCHEMA:
+                sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
+                sql.append("TABLE_CAT = dvname[0], ");
+                sql.append("TABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+                break;
+            default:
+                throw new IllegalStateException();
+        }
         sql.append("TABLE_NAME = ds.DatasetName, ");
         sql.append("COLUMN_NAME = pki[0], ");
-        sql.append("KEY_SEQ = pkipos ");
-        sql.append("where array_length(dvname) between 1 and 2 ");
-        sql.append("and (every pk in ds.InternalDetails.PrimaryKey satisfies array_length(pk) = 1 end) ");
-        sql.append("and (every si in ds.InternalDetails.KeySourceIndicator satisfies si = 0 end ) ");
+        sql.append("KEY_SEQ = pkipos, ");
+        sql.append("PK_NAME = TABLE_NAME || '").append(PK_NAME_SUFFIX).append("', ");
+        sql.append("dsDetails = coalesce(ds.InternalDetails, ds.ExternalDetails, ds.ViewDetails) ");
+        sql.append("where (every pk in dsDetails.PrimaryKey satisfies array_length(pk) = 1 end) ");
+        sql.append("and (every si in dsDetails.KeySourceIndicator satisfies si = 0 end ) ");
         if (catalog != null) {
-            sql.append("and TABLE_CAT = $1 ");
+            sql.append("and (TABLE_CAT = $1) ");
         }
         if (schema != null) {
-            sql.append("and if_null(TABLE_SCHEM, '') like $2 ");
+            sql.append("and (if_null(TABLE_SCHEM, '') like $2) ");
         }
         if (table != null) {
-            sql.append("and TABLE_NAME like $3 ");
+            sql.append("and (TABLE_NAME like $3) ");
+        }
+        switch (connection.catalogDataverseMode) {
+            case CATALOG:
+                break;
+            case CATALOG_SCHEMA:
+                sql.append("and (array_length(dvname) between 1 and 2) ");
+                break;
+            default:
+                throw new IllegalStateException();
         }
+        if (!connection.catalogIncludesSchemaless) {
+            sql.append("and hasFields ");
+        }
+
         sql.append("order by COLUMN_NAME");
 
         return executeQueryImpl(sql.toString(), Arrays.asList(catalog, schema, table));
     }
 
+    ADBResultSet executeGetImportedKeysQuery(String catalog, String schema, String table) throws SQLException {
+        return executeGetImportedExportedKeysQuery("JDBC-GetImportedKeys", null, null, null, catalog, schema, table,
+                false);
+    }
+
+    ADBResultSet executeGetExportedKeysQuery(String catalog, String schema, String table) throws SQLException {
+        return executeGetImportedExportedKeysQuery("JDBC-GetExportedKeys", catalog, schema, table, null, null, null,
+                true);
+    }
+
+    ADBResultSet executeCrossReferenceQuery(String parentCatalog, String parentSchema, String parentTable,
+            String foreignCatalog, String foreignSchema, String foreignTable) throws SQLException {
+        return executeGetImportedExportedKeysQuery("JDBC-CrossReference", parentCatalog, parentSchema, parentTable,
+                foreignCatalog, foreignSchema, foreignTable, true);
+    }
+
+    protected ADBResultSet executeGetImportedExportedKeysQuery(String comment, String pkCatalog, String pkSchema,
+            String pkTable, String fkCatalog, String fkSchema, String fkTable, boolean orderByFk) throws SQLException {
+        StringBuilder sql = new StringBuilder(2048);
+        populateQueryProlog(sql, comment);
+
+        sql.append("select PKTABLE_CAT, PKTABLE_SCHEM, PKTABLE_NAME, PKCOLUMN_NAME, ");
+        sql.append("FKTABLE_CAT, FKTABLE_SCHEM, FKTABLE_NAME, FKCOLUMN_NAME, KEY_SEQ, ");
+        sql.append(DatabaseMetaData.importedKeyNoAction).append(" UPDATE_RULE, ");
+        sql.append(DatabaseMetaData.importedKeyNoAction).append(" DELETE_RULE, ");
+        sql.append("FK_NAME, PK_NAME, ");
+        sql.append(DatabaseMetaData.importedKeyInitiallyDeferred).append(" DEFERRABILITY ");
+        sql.append("from Metadata.`Dataset` ds ");
+        sql.append("join Metadata.`Datatype` dt ");
+        sql.append("on ds.DatatypeDataverseName = dt.DataverseName and ds.DatatypeName = dt.DatatypeName ");
+        sql.append("unnest coalesce(ds.InternalDetails, ds.ExternalDetails, ds.ViewDetails).ForeignKeys fk at fkpos ");
+        sql.append("join Metadata.`Dataset` ds2 ");
+        sql.append("on fk.RefDataverseName = ds2.DataverseName and fk.RefDatasetName = ds2.DatasetName ");
+        sql.append("unnest fk.ForeignKey fki at fkipos ");
+        sql.append("let ");
+        sql.append("hasFields = array_length(dt.Derived.Record.Fields) > 0, ");
+        switch (connection.catalogDataverseMode) {
+            case CATALOG:
+                sql.append("FKTABLE_CAT = ds.DataverseName, ");
+                sql.append("FKTABLE_SCHEM = null, ");
+                sql.append("PKTABLE_CAT = ds2.DataverseName, ");
+                sql.append("PKTABLE_SCHEM = null, ");
+                break;
+            case CATALOG_SCHEMA:
+                sql.append("dvname = decode_dataverse_name(ds.DataverseName), ");
+                sql.append("FKTABLE_CAT = dvname[0], ");
+                sql.append("FKTABLE_SCHEM = case array_length(dvname) when 1 then null else dvname[1] end, ");
+                sql.append("dvname2 = decode_dataverse_name(ds2.DataverseName), ");
+                sql.append("PKTABLE_CAT = dvname2[0], ");
+                sql.append("PKTABLE_SCHEM = case array_length(dvname2) when 1 then null else dvname2[1] end, ");
+                break;
+            default:
+                throw new IllegalStateException();
+        }
+        sql.append("ds2Details = coalesce(ds2.InternalDetails, ds2.ExternalDetails, ds2.ViewDetails), ");
+        sql.append("FKTABLE_NAME = ds.DatasetName, ");
+        sql.append("PKTABLE_NAME = ds2.DatasetName, ");
+        sql.append("FKCOLUMN_NAME = fki[0], ");
+        sql.append("PKCOLUMN_NAME = ds2Details.PrimaryKey[fkipos-1][0], ");
+        sql.append("KEY_SEQ = fkipos, ");
+        sql.append("PK_NAME = PKTABLE_NAME || '").append(PK_NAME_SUFFIX).append("', ");
+        sql.append("FK_NAME = FKTABLE_NAME || '").append(FK_NAME_SUFFIX).append("_' || string(fkpos) ");
+        sql.append("where (every fki2 in fk.ForeignKey satisfies array_length(fki2) = 1 end) ");
+        sql.append("and (every fksi in fk.KeySourceIndicator satisfies fksi = 0 end ) ");
+        sql.append("and (every pki in ds2Details.PrimaryKey satisfies array_length(pki) = 1 end) ");
+        sql.append("and (every pksi in ds2Details.KeySourceIndicator satisfies pksi = 0 end) ");
+
+        if (pkCatalog != null) {
+            sql.append("and (").append("PKTABLE_CAT").append(" = $1) ");
+        }
+        if (pkSchema != null) {
+            sql.append("and (if_null(").append("PKTABLE_SCHEM").append(", '') like $2) ");
+        }
+        if (pkTable != null) {
+            sql.append("and (").append("PKTABLE_NAME").append(" like $3) ");
+        }
+
+        if (fkCatalog != null) {
+            sql.append("and (").append("FKTABLE_CAT").append(" = $4) ");
+        }
+        if (fkSchema != null) {
+            sql.append("and (if_null(").append("FKTABLE_SCHEM").append(", '') like $5) ");
+        }
+        if (fkTable != null) {
+            sql.append("and (").append("FKTABLE_NAME").append(" like $6) ");
+        }
+
+        switch (connection.catalogDataverseMode) {
+            case CATALOG:
+                break;
+            case CATALOG_SCHEMA:
+                sql.append("and (array_length(dvname) between 1 and 2) ");
+                sql.append("and (array_length(dvname2) between 1 and 2) ");
+                break;
+            default:
+                throw new IllegalStateException();
+        }
+        if (!connection.catalogIncludesSchemaless) {
+            sql.append("and hasFields ");
+        }
+
+        sql.append("order by ").append(
+                orderByFk ? "FKTABLE_CAT, FKTABLE_SCHEM, FKTABLE_NAME" : "PKTABLE_CAT, PKTABLE_SCHEM, PKTABLE_NAME")
+                .append(", KEY_SEQ");
+
+        return executeQueryImpl(sql.toString(),
+                Arrays.asList(pkCatalog, pkSchema, pkTable, fkCatalog, fkSchema, fkTable));
+    }
+
     ADBResultSet executeGetTableTypesQuery() throws SQLException {
         checkClosed();
 
         LinkedHashSet<String> tableTypes = new LinkedHashSet<>();
         tableTypes.add(getDatasetTerm(true));
-        tableTypes.add(getDatasetTerm(false));
         tableTypes.add(getViewTerm(true));
-        tableTypes.add(getViewTerm(false));
+        if (connection.catalogIncludesSchemaless) {
+            tableTypes.add(getDatasetTerm(false));
+            tableTypes.add(getViewTerm(false));
+        }
 
         List<ADBColumn> columns = Collections.singletonList(new ADBColumn("TABLE_TYPE", ADBDatatype.STRING, false));
 
@@ -340,6 +585,14 @@ public class ADBMetaStatement extends ADBStatement {
         return null;
     }
 
+    @Override
+    protected ADBProtocol.SubmitStatementOptions createSubmitStatementOptions() {
+        ADBProtocol.SubmitStatementOptions options = super.createSubmitStatementOptions();
+        // Metadata queries are always executed in SQL++ mode
+        options.sqlCompatMode = false;
+        return options;
+    }
+
     protected String getDatasetTerm(boolean tabular) {
         return tabular ? TABLE : SCHEMALESS + " " + TABLE;
     }
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java
index 98129fd..aa88f1c 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBPreparedStatement.java
@@ -49,11 +49,12 @@ final class ADBPreparedStatement extends ADBStatement implements PreparedStateme
 
     final List<ADBColumn> resultColumns;
 
-    ADBPreparedStatement(ADBConnection connection, String sql, String catalog, String schema) throws SQLException {
-        super(connection, catalog, schema);
-        // TODO:timeout
-        ADBProtocol.QueryServiceResponse response =
-                connection.protocol.submitStatement(sql, null, false, true, 0, catalog, schema);
+    ADBPreparedStatement(ADBConnection connection, String sql) throws SQLException {
+        super(connection);
+        ADBProtocol.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
+        stmtOptions.compileOnly = true;
+        stmtOptions.timeoutSeconds = 0; /* TODO:timeout */
+        ADBProtocol.QueryServiceResponse response = connection.protocol.submitStatement(sql, null, null, stmtOptions);
         int parameterCount = connection.protocol.getStatementParameterCount(response);
         boolean isQuery = connection.protocol.isStatementCategory(response,
                 ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
@@ -294,7 +295,7 @@ final class ADBPreparedStatement extends ADBStatement implements PreparedStateme
     @Override
     public void setDate(int parameterIndex, java.sql.Date v, Calendar cal) throws SQLException {
         checkClosed();
-        setDate(parameterIndex, v);
+        setArg(parameterIndex, cal != null ? new SqlCalendarDate(v, cal.getTimeZone()) : v);
     }
 
     @Override
@@ -306,7 +307,7 @@ final class ADBPreparedStatement extends ADBStatement implements PreparedStateme
     @Override
     public void setTime(int parameterIndex, java.sql.Time v, Calendar cal) throws SQLException {
         checkClosed();
-        setTime(parameterIndex, v);
+        setArg(parameterIndex, cal != null ? new SqlCalendarTime(v, cal.getTimeZone()) : v);
     }
 
     @Override
@@ -318,7 +319,7 @@ final class ADBPreparedStatement extends ADBStatement implements PreparedStateme
     @Override
     public void setTimestamp(int parameterIndex, java.sql.Timestamp v, Calendar cal) throws SQLException {
         checkClosed();
-        setTimestamp(parameterIndex, v);
+        setArg(parameterIndex, cal != null ? new SqlCalendarTimestamp(v, cal.getTimeZone()) : v);
     }
 
     // Generic (setObject)
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java
index e8a36f5..fed6d6a 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBProtocol.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -50,10 +51,12 @@ import org.apache.http.client.AuthCache;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpOptions;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.config.SocketConfig;
 import org.apache.http.conn.HttpClientConnectionManager;
 import org.apache.http.entity.ContentProducer;
@@ -84,8 +87,9 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 
 public class ADBProtocol {
 
-    private static final String QUERY_ENDPOINT_PATH = "/query/service";
+    private static final String QUERY_SERVICE_ENDPOINT_PATH = "/query/service";
     private static final String QUERY_RESULT_ENDPOINT_PATH = "/query/service/result";
+    private static final String ACTIVE_REQUESTS_ENDPOINT_PATH = "/admin/requests/running";
 
     private static final String STATEMENT = "statement";
     private static final String ARGS = "args";
@@ -98,6 +102,8 @@ public class ADBProtocol {
     private static final String CLIENT_TYPE = "client-type";
     private static final String PLAN_FORMAT = "plan-format";
     private static final String MAX_WARNINGS = "max-warnings";
+    private static final String SQL_COMPAT = "sql-compat";
+    private static final String CLIENT_CONTEXT_ID = "client_context_id";
 
     private static final String MODE_DEFERRED = "deferred";
     private static final String CLIENT_TYPE_JDBC = "jdbc";
@@ -116,20 +122,23 @@ public class ADBProtocol {
     final CloseableHttpClient httpClient;
     final URI queryEndpoint;
     final URI queryResultEndpoint;
+    final URI activeRequestsEndpoint;
     final String user;
     final int maxWarnings;
 
     protected ADBProtocol(String host, int port, Map<ADBDriverProperty, Object> params, ADBDriverContext driverContext)
             throws SQLException {
-        URI queryEndpoint = createEndpointUri(host, port, QUERY_ENDPOINT_PATH, driverContext.errorReporter);
+        URI queryEndpoint = createEndpointUri(host, port, getQueryServiceEndpointPath(), driverContext.errorReporter);
         URI queryResultEndpoint =
-                createEndpointUri(host, port, QUERY_RESULT_ENDPOINT_PATH, driverContext.errorReporter);
+                createEndpointUri(host, port, getQueryResultEndpointPath(), driverContext.errorReporter);
+        URI activeRequestsEndpoint =
+                createEndpointUri(host, port, getActiveRequestsEndpointPath(), driverContext.errorReporter);
         PoolingHttpClientConnectionManager httpConnectionManager = new PoolingHttpClientConnectionManager();
         int maxConnections = Math.max(16, Runtime.getRuntime().availableProcessors());
         httpConnectionManager.setDefaultMaxPerRoute(maxConnections);
         httpConnectionManager.setMaxTotal(maxConnections);
         SocketConfig.Builder socketConfigBuilder = null;
-        Number socketTimeoutMillis = (Number) params.get(ADBDriverProperty.Common.SOCKET_TIMEOUT);
+        Number socketTimeoutMillis = (Number) ADBDriverProperty.Common.SOCKET_TIMEOUT.fetchPropertyValue(params);
         if (socketTimeoutMillis != null) {
             socketConfigBuilder = SocketConfig.custom();
             socketConfigBuilder.setSoTimeout(socketTimeoutMillis.intValue());
@@ -138,7 +147,7 @@ public class ADBProtocol {
             httpConnectionManager.setDefaultSocketConfig(socketConfigBuilder.build());
         }
         RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
-        Number connectTimeoutMillis = (Number) params.get(ADBDriverProperty.Common.CONNECT_TIMEOUT);
+        Number connectTimeoutMillis = (Number) ADBDriverProperty.Common.CONNECT_TIMEOUT.fetchPropertyValue(params);
         if (connectTimeoutMillis != null) {
             requestConfigBuilder.setConnectionRequestTimeout(connectTimeoutMillis.intValue());
             requestConfigBuilder.setConnectTimeout(connectTimeoutMillis.intValue());
@@ -152,18 +161,17 @@ public class ADBProtocol {
         httpClientBuilder.setConnectionManager(httpConnectionManager);
         httpClientBuilder.setConnectionManagerShared(true);
         httpClientBuilder.setDefaultRequestConfig(requestConfig);
-        String user = (String) params.get(ADBDriverProperty.Common.USER);
+        String user = (String) ADBDriverProperty.Common.USER.fetchPropertyValue(params);
         if (user != null) {
-            String password = (String) params.get(ADBDriverProperty.Common.PASSWORD);
+            String password = (String) ADBDriverProperty.Common.PASSWORD.fetchPropertyValue(params);
             httpClientBuilder.setDefaultCredentialsProvider(createCredentialsProvider(user, password));
         }
-
-        Number maxWarnings = ((Number) params.getOrDefault(ADBDriverProperty.Common.MAX_WARNINGS,
-                ADBDriverProperty.Common.MAX_WARNINGS.getDefaultValue()));
+        Number maxWarnings = (Number) ADBDriverProperty.Common.MAX_WARNINGS.fetchPropertyValue(params);
 
         this.user = user;
         this.queryEndpoint = queryEndpoint;
         this.queryResultEndpoint = queryResultEndpoint;
+        this.activeRequestsEndpoint = activeRequestsEndpoint;
         this.httpConnectionManager = httpConnectionManager;
         this.httpClient = httpClientBuilder.build();
         this.httpClientContext = createHttpClientContext(queryEndpoint);
@@ -239,8 +247,8 @@ public class ADBProtocol {
         }
     }
 
-    QueryServiceResponse submitStatement(String sql, List<?> args, boolean forceReadOnly, boolean compileOnly,
-            int timeoutSeconds, String catalog, String schema) throws SQLException {
+    QueryServiceResponse submitStatement(String sql, List<?> args, UUID executionId, SubmitStatementOptions options)
+            throws SQLException {
         HttpPost httpPost = new HttpPost(queryEndpoint);
         httpPost.setHeader(HttpHeaders.ACCEPT, ContentType.APPLICATION_JSON
                 .withParameters(new BasicNameValuePair(FORMAT_LOSSLESS_ADM, Boolean.TRUE.toString())).toString());
@@ -255,17 +263,23 @@ public class ADBProtocol {
             jsonGen.writeBooleanField(SIGNATURE, true);
             jsonGen.writeStringField(PLAN_FORMAT, PLAN_FORMAT_STRING);
             jsonGen.writeNumberField(MAX_WARNINGS, maxWarnings);
-            if (compileOnly) {
+            if (options.compileOnly) {
                 jsonGen.writeBooleanField(COMPILE_ONLY, true);
             }
-            if (forceReadOnly) {
+            if (options.forceReadOnly) {
                 jsonGen.writeBooleanField(READ_ONLY, true);
             }
-            if (timeoutSeconds > 0) {
-                jsonGen.writeStringField(TIMEOUT, timeoutSeconds + "s");
+            if (options.sqlCompatMode) {
+                jsonGen.writeBooleanField(SQL_COMPAT, true);
+            }
+            if (options.timeoutSeconds > 0) {
+                jsonGen.writeStringField(TIMEOUT, options.timeoutSeconds + "s");
             }
-            if (catalog != null) {
-                jsonGen.writeStringField(DATAVERSE, schema != null ? catalog + "/" + schema : catalog);
+            if (options.dataverseName != null) {
+                jsonGen.writeStringField(DATAVERSE, options.dataverseName);
+            }
+            if (executionId != null) {
+                jsonGen.writeStringField(CLIENT_CONTEXT_ID, executionId.toString());
             }
             if (args != null && !args.isEmpty()) {
                 jsonGen.writeFieldName(ARGS);
@@ -279,11 +293,9 @@ public class ADBProtocol {
             throw getErrorReporter().errorInRequestGeneration(e);
         }
 
-        System.err.printf("<ADB_DRIVER_SQL>%n%s%n</ADB_DRIVER_SQL>%n", sql);
-
         if (getLogger().isLoggable(Level.FINE)) {
-            getLogger().log(Level.FINE, String.format("%s { %s } with args { %s }", compileOnly ? "compile" : "execute",
-                    sql, args != null ? args : ""));
+            getLogger().log(Level.FINE, String.format("%s { %s } with args { %s }",
+                    options.compileOnly ? "compile" : "execute", sql, args != null ? args : ""));
         }
 
         httpPost.setEntity(new EntityTemplateImpl(baos, ContentType.APPLICATION_JSON));
@@ -296,6 +308,21 @@ public class ADBProtocol {
         }
     }
 
+    public static class SubmitStatementOptions {
+        public String dataverseName;
+        public int timeoutSeconds;
+        public boolean forceReadOnly;
+        public boolean compileOnly;
+        public boolean sqlCompatMode;
+
+        private SubmitStatementOptions() {
+        }
+    }
+
+    SubmitStatementOptions createSubmitStatementOptions() {
+        return new SubmitStatementOptions();
+    }
+
     private QueryServiceResponse handlePostQueryResponse(CloseableHttpResponse httpResponse)
             throws SQLException, IOException {
         int httpStatus = httpResponse.getStatusLine().getStatusCode();
@@ -419,6 +446,33 @@ public class ADBProtocol {
         }
     }
 
+    void cancelStatementExecution(UUID executionId) throws SQLException {
+        HttpDelete httpDelete;
+        try {
+            URIBuilder uriBuilder = new URIBuilder(activeRequestsEndpoint);
+            uriBuilder.setParameter(CLIENT_CONTEXT_ID, String.valueOf(executionId));
+            httpDelete = new HttpDelete(uriBuilder.build());
+        } catch (URISyntaxException e) {
+            throw getErrorReporter().errorInRequestURIGeneration(e);
+        }
+
+        try (CloseableHttpResponse httpResponse = httpClient.execute(httpDelete, httpClientContext)) {
+            int httpStatus = httpResponse.getStatusLine().getStatusCode();
+            switch (httpStatus) {
+                case HttpStatus.SC_OK:
+                case HttpStatus.SC_NOT_FOUND:
+                    break;
+                case HttpStatus.SC_UNAUTHORIZED:
+                case HttpStatus.SC_FORBIDDEN:
+                    throw getErrorReporter().errorAuth();
+                default:
+                    throw getErrorReporter().errorInProtocol(httpResponse.getStatusLine().toString());
+            }
+        } catch (IOException e) {
+            throw getErrorReporter().errorInConnection(e);
+        }
+    }
+
     private HttpClientContext createHttpClientContext(URI uri) {
         HttpClientContext hcCtx = HttpClientContext.create();
         AuthCache ac = new BasicAuthCache();
@@ -542,6 +596,18 @@ public class ADBProtocol {
         return om;
     }
 
+    protected String getQueryServiceEndpointPath() {
+        return QUERY_SERVICE_ENDPOINT_PATH;
+    }
+
+    protected String getQueryResultEndpointPath() {
+        return QUERY_RESULT_ENDPOINT_PATH;
+    }
+
+    protected String getActiveRequestsEndpointPath() {
+        return ACTIVE_REQUESTS_ENDPOINT_PATH;
+    }
+
     private static void closeQuietly(Exception mainExc, java.io.Closeable... closeableList) {
         for (Closeable closeable : closeableList) {
             if (closeable != null) {
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java
index da9c5bd..800aeee 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBRowStore.java
@@ -35,8 +35,10 @@ import java.sql.Timestamp;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.Period;
+import java.time.ZoneId;
 import java.time.format.DateTimeParseException;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -45,6 +47,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -76,12 +79,16 @@ final class ADBRowStore {
 
     static final List<Class<?>> GET_OBJECT_NON_ATOMIC = Arrays.asList(Collection.class, List.class, Map.class);
 
+    private static final ZoneId TZ_UTC = ZoneId.of("UTC");
+
     private final ADBResultSet resultSet;
 
     private final ADBDatatype[] columnTypes;
     private final Object[] objectStore;
     private final long[] registerStore; // 2 registers per column
 
+    private final TimeZone tzSystem = TimeZone.getDefault();
+
     private int parsedLength;
     private long currentDateChronon;
     private JsonGenerator jsonGen;
@@ -591,16 +598,15 @@ final class ADBRowStore {
     }
 
     Date getDate(int columnIndex, Calendar cal) throws SQLException {
-        // TODO:cal is not used
         ADBDatatype valueType = getColumnType(columnIndex);
         switch (valueType) {
             case MISSING:
             case NULL:
                 return null;
             case DATE:
-                return toDateFromDateChronon(getColumnRegister(columnIndex, 0));
+                return toDateFromDateChronon(getColumnRegister(columnIndex, 0), getTimeZone(cal, tzSystem));
             case DATETIME:
-                return toDateFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+                return toDateFromDatetimeChronon(getColumnRegister(columnIndex, 0), getTimeZone(cal, tzSystem));
             case STRING:
                 try {
                     LocalDate d = LocalDate.parse(getStringFromObjectStore(columnIndex)); // TODO:review
@@ -639,20 +645,20 @@ final class ADBRowStore {
     }
 
     Time getTime(int columnIndex, Calendar cal) throws SQLException {
-        // TODO:cal not used
         ADBDatatype valueType = getColumnType(columnIndex);
         switch (valueType) {
             case MISSING:
             case NULL:
                 return null;
             case TIME:
-                return toTimeFromTimeChronon(getColumnRegister(columnIndex, 0));
+                return toTimeFromTimeChronon(getColumnRegister(columnIndex, 0), getTimeZone(cal, tzSystem));
             case DATETIME:
-                return toTimeFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+                return toTimeFromDatetimeChronon(getColumnRegister(columnIndex, 0), getTimeZone(cal, tzSystem));
             case STRING:
                 try {
                     LocalTime t = LocalTime.parse(getStringFromObjectStore(columnIndex)); // TODO:review
-                    return toTimeFromTimeChronon(TimeUnit.NANOSECONDS.toMillis(t.toNanoOfDay()));
+                    return toTimeFromTimeChronon(TimeUnit.NANOSECONDS.toMillis(t.toNanoOfDay()),
+                            getTimeZone(cal, tzSystem));
                 } catch (DateTimeParseException e) {
                     throw getErrorReporter().errorInvalidValueOfType(valueType);
                 }
@@ -687,16 +693,15 @@ final class ADBRowStore {
     }
 
     Timestamp getTimestamp(int columnIndex, Calendar cal) throws SQLException {
-        //TODO:FIXME:CAL NOT USED
         ADBDatatype valueType = getColumnType(columnIndex);
         switch (valueType) {
             case MISSING:
             case NULL:
                 return null;
             case DATE:
-                return toTimestampFromDateChronon(getColumnRegister(columnIndex, 0));
+                return toTimestampFromDateChronon(getColumnRegister(columnIndex, 0), getTimeZone(cal, tzSystem));
             case DATETIME:
-                return toTimestampFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+                return toTimestampFromDatetimeChronon(getColumnRegister(columnIndex, 0), getTimeZone(cal, tzSystem));
             case STRING:
                 try {
                     Instant i = Instant.parse(getStringFromObjectStore(columnIndex));
@@ -711,19 +716,19 @@ final class ADBRowStore {
         }
     }
 
-    Instant getInstant(int columnIndex) throws SQLException {
+    LocalDateTime getLocalDateTime(int columnIndex) throws SQLException {
         ADBDatatype valueType = getColumnType(columnIndex);
         switch (valueType) {
             case MISSING:
             case NULL:
                 return null;
             case DATE:
-                return toInstantFromDateChronon(getColumnRegister(columnIndex, 0));
+                return toLocalDateTimeFromDateChronon(getColumnRegister(columnIndex, 0));
             case DATETIME:
-                return toInstantFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+                return toLocalDateTimeFromDatetimeChronon(getColumnRegister(columnIndex, 0));
             case STRING:
                 try {
-                    return Instant.parse(getStringFromObjectStore(columnIndex)); // TODO:review
+                    return LocalDateTime.parse(getStringFromObjectStore(columnIndex)); // TODO:review
                 } catch (DateTimeParseException e) {
                     throw getErrorReporter().errorInvalidValueOfType(valueType);
                 }
@@ -822,7 +827,7 @@ final class ADBRowStore {
             case TIME:
                 return toLocalTimeFromTimeChronon(getColumnRegister(columnIndex, 0)).toString(); // TODO:review
             case DATETIME:
-                return toInstantFromDatetimeChronon(getColumnRegister(columnIndex, 0)).toString(); // TODO:review
+                return toLocalDateTimeFromDatetimeChronon(getColumnRegister(columnIndex, 0)).toString(); // TODO:review
             case YEARMONTHDURATION:
                 return getPeriodFromObjectStore(columnIndex).toString(); // TODO:review
             case DAYTIMEDURATION:
@@ -883,11 +888,11 @@ final class ADBRowStore {
             case DOUBLE:
                 return getNumberFromObjectStore(columnIndex);
             case DATE:
-                return toDateFromDateChronon(getColumnRegister(columnIndex, 0));
+                return toDateFromDateChronon(getColumnRegister(columnIndex, 0), tzSystem);
             case TIME:
-                return toTimeFromTimeChronon(getColumnRegister(columnIndex, 0));
+                return toTimeFromTimeChronon(getColumnRegister(columnIndex, 0), tzSystem);
             case DATETIME:
-                return toTimestampFromDatetimeChronon(getColumnRegister(columnIndex, 0));
+                return toTimestampFromDatetimeChronon(getColumnRegister(columnIndex, 0), tzSystem);
             case YEARMONTHDURATION:
                 return getPeriodFromObjectStore(columnIndex);
             case DAYTIMEDURATION:
@@ -952,7 +957,7 @@ final class ADBRowStore {
         map.put(Time.class, ADBRowStore::getTime);
         map.put(LocalTime.class, ADBRowStore::getLocalTime);
         map.put(Timestamp.class, ADBRowStore::getTimestamp);
-        map.put(Instant.class, ADBRowStore::getInstant);
+        map.put(LocalDateTime.class, ADBRowStore::getLocalDateTime);
         map.put(Period.class, ADBRowStore::getPeriod);
         map.put(Duration.class, ADBRowStore::getDuration);
         map.put(UUID.class, ADBRowStore::getUUID);
@@ -960,12 +965,12 @@ final class ADBRowStore {
         return map;
     }
 
-    private Date toDateFromDateChronon(long dateChrononInDays) {
-        return new Date(TimeUnit.DAYS.toMillis(dateChrononInDays));
+    private Date toDateFromDateChronon(long dateChrononInDays, TimeZone tz) {
+        return new Date(getDatetimeChrononAdjusted(TimeUnit.DAYS.toMillis(dateChrononInDays), tz));
     }
 
-    private Date toDateFromDatetimeChronon(long datetimeChrononInMillis) {
-        return new Date(datetimeChrononInMillis);
+    private Date toDateFromDatetimeChronon(long datetimeChrononInMillis, TimeZone tz) {
+        return new Date(getDatetimeChrononAdjusted(datetimeChrononInMillis, tz));
     }
 
     private LocalDate toLocalDateFromDateChronon(long dateChrononInDays) {
@@ -976,13 +981,13 @@ final class ADBRowStore {
         return LocalDate.ofEpochDay(TimeUnit.MILLISECONDS.toDays(datetimeChrononInMillis));
     }
 
-    private Time toTimeFromTimeChronon(long timeChrononInMillis) {
+    private Time toTimeFromTimeChronon(long timeChrononInMillis, TimeZone tz) {
         long datetimeChrononInMillis = getCurrentDateChrononInMillis() + timeChrononInMillis;
-        return new Time(datetimeChrononInMillis);
+        return new Time(getDatetimeChrononAdjusted(datetimeChrononInMillis, tz));
     }
 
-    private Time toTimeFromDatetimeChronon(long datetimeChrononInMillis) {
-        return new Time(datetimeChrononInMillis);
+    private Time toTimeFromDatetimeChronon(long datetimeChrononInMillis, TimeZone tz) {
+        return new Time(getDatetimeChrononAdjusted(datetimeChrononInMillis, tz));
     }
 
     private LocalTime toLocalTimeFromTimeChronon(long timeChrononInMillis) {
@@ -993,30 +998,38 @@ final class ADBRowStore {
         return LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(datetimeChrononInMillis));
     }
 
-    private Timestamp toTimestampFromDatetimeChronon(long datetimeChrononInMillis) {
-        return new Timestamp(datetimeChrononInMillis);
+    private Timestamp toTimestampFromDatetimeChronon(long datetimeChrononInMillis, TimeZone tz) {
+        return new Timestamp(getDatetimeChrononAdjusted(datetimeChrononInMillis, tz));
+    }
+
+    private Timestamp toTimestampFromDateChronon(long dateChrononInDays, TimeZone tz) {
+        return new Timestamp(getDatetimeChrononAdjusted(TimeUnit.DAYS.toMillis(dateChrononInDays), tz));
     }
 
-    private Timestamp toTimestampFromDateChronon(long dateChrononInDays) {
-        return new Timestamp(TimeUnit.DAYS.toMillis(dateChrononInDays));
+    private LocalDateTime toLocalDateTimeFromDatetimeChronon(long datetimeChrononInMillis) {
+        return LocalDateTime.ofInstant(Instant.ofEpochMilli(datetimeChrononInMillis), TZ_UTC);
     }
 
-    private Instant toInstantFromDatetimeChronon(long datetimeChrononInMillis) {
-        return Instant.ofEpochMilli(datetimeChrononInMillis);
+    private LocalDateTime toLocalDateTimeFromDateChronon(long dateChrononInDays) {
+        return LocalDate.ofEpochDay(dateChrononInDays).atStartOfDay();
     }
 
-    private Instant toInstantFromDateChronon(long dateChrononInDays) {
-        return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(dateChrononInDays));
+    private long getDatetimeChrononAdjusted(long datetimeChrononInMillis, TimeZone tz) {
+        int tzOffset = tz.getOffset(datetimeChrononInMillis);
+        return datetimeChrononInMillis - tzOffset;
     }
 
     private long getCurrentDateChrononInMillis() {
         if (currentDateChronon == 0) {
-            long chrononOfDay = TimeUnit.DAYS.toMillis(1);
-            currentDateChronon = System.currentTimeMillis() / chrononOfDay * chrononOfDay;
+            currentDateChronon = TimeUnit.DAYS.toMillis(TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis()));
         }
         return currentDateChronon;
     }
 
+    private TimeZone getTimeZone(Calendar cal, TimeZone tzDefault) {
+        return cal != null ? cal.getTimeZone() : tzDefault;
+    }
+
     private String printAsJson(Object value) throws SQLException {
         if (jsonGenBuffer == null) {
             jsonGenBuffer = new StringWriter();
diff --git a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java
index 94ac5a3..61072e8 100644
--- a/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java
+++ b/asterixdb-jdbc/asterix-jdbc-core/src/main/java/org/apache/asterix/jdbc/core/ADBStatement.java
@@ -31,10 +31,11 @@ import java.sql.Statement;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.time.Duration;
-import java.time.Instant;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.Period;
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -43,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -63,19 +65,21 @@ import com.fasterxml.jackson.databind.ser.BeanSerializerModifier;
 
 class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
 
+    static final List<Class<?>> SET_OBJECT_ATOMIC_EXTRA =
+            Arrays.asList(SqlCalendarDate.class, SqlCalendarTime.class, SqlCalendarTimestamp.class);
+
     static final List<Class<?>> SET_OBJECT_NON_ATOMIC = Arrays.asList(Object[].class, Collection.class, Map.class);
 
     static final Map<Class<?>, AbstractValueSerializer> SERIALIZER_MAP = createSerializerMap();
 
     protected final ADBConnection connection;
-    protected final String catalog;
-    protected final String schema;
 
     protected final AtomicBoolean closed = new AtomicBoolean(false);
     protected volatile boolean closeOnCompletion;
 
     protected int queryTimeoutSeconds;
     protected long maxRows;
+    private volatile UUID executionId;
 
     // common result fields
     protected int updateCount = -1;
@@ -91,12 +95,11 @@ class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
 
     // Lifecycle
 
-    ADBStatement(ADBConnection connection, String catalog, String schema) {
+    ADBStatement(ADBConnection connection) {
         this.connection = Objects.requireNonNull(connection);
-        this.catalog = catalog;
-        this.schema = schema;
         this.resultSetsWithResources = new ConcurrentLinkedQueue<>();
         this.resultSetsWithoutResources = new ConcurrentLinkedQueue<>();
+        resetExecutionId();
     }
 
     @Override
@@ -153,16 +156,22 @@ class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
 
     protected ADBResultSet executeQueryImpl(String sql, List<?> args) throws SQLException {
         // note: we're not assigning executeResponse field at this method
-        ADBProtocol.QueryServiceResponse response =
-                connection.protocol.submitStatement(sql, args, true, false, queryTimeoutSeconds, catalog, schema);
-        boolean isQuery = connection.protocol.isStatementCategory(response,
-                ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
-        if (!isQuery) {
-            throw getErrorReporter().errorInvalidStatementCategory();
+        try {
+            ADBProtocol.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
+            stmtOptions.forceReadOnly = true;
+            ADBProtocol.QueryServiceResponse response =
+                    connection.protocol.submitStatement(sql, args, executionId, stmtOptions);
+            boolean isQuery = connection.protocol.isStatementCategory(response,
+                    ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
+            if (!isQuery) {
+                throw getErrorReporter().errorInvalidStatementCategory();
+            }
+            warnings = connection.protocol.getWarningIfExists(response);
+            updateCount = -1;
+            return fetchResultSet(response);
+        } finally {
+            resetExecutionId();
         }
-        warnings = connection.protocol.getWarningIfExists(response);
-        updateCount = -1;
-        return fetchResultSet(response);
     }
 
     @Override
@@ -208,17 +217,22 @@ class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
     }
 
     protected int executeUpdateImpl(String sql, List<Object> args) throws SQLException {
-        ADBProtocol.QueryServiceResponse response =
-                connection.protocol.submitStatement(sql, args, false, false, queryTimeoutSeconds, catalog, schema);
-        boolean isQuery = connection.protocol.isStatementCategory(response,
-                ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
-        // TODO: remove result set on the server (both query and update returning cases)
-        if (isQuery) {
-            throw getErrorReporter().errorInvalidStatementCategory();
-        }
-        warnings = connection.protocol.getWarningIfExists(response);
-        updateCount = connection.protocol.getUpdateCount(response);
-        return updateCount;
+        try {
+            ADBProtocol.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
+            ADBProtocol.QueryServiceResponse response =
+                    connection.protocol.submitStatement(sql, args, executionId, stmtOptions);
+            boolean isQuery = connection.protocol.isStatementCategory(response,
+                    ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
+            // TODO: remove result set on the server (both query and update returning cases)
+            if (isQuery) {
+                throw getErrorReporter().errorInvalidStatementCategory();
+            }
+            warnings = connection.protocol.getWarningIfExists(response);
+            updateCount = connection.protocol.getUpdateCount(response);
+            return updateCount;
+        } finally {
+            resetExecutionId();
+        }
     }
 
     @Override
@@ -243,25 +257,31 @@ class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
     }
 
     protected boolean executeImpl(String sql, List<Object> args) throws SQLException {
-        ADBProtocol.QueryServiceResponse response =
-                connection.protocol.submitStatement(sql, args, false, false, queryTimeoutSeconds, catalog, schema);
-        warnings = connection.protocol.getWarningIfExists(response);
-        boolean isQuery = connection.protocol.isStatementCategory(response,
-                ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
-        if (isQuery) {
-            updateCount = -1;
-            executeResponse = response;
-            return true;
-        } else {
-            updateCount = connection.protocol.getUpdateCount(response);
-            executeResponse = null;
-            return false;
+        try {
+            ADBProtocol.SubmitStatementOptions stmtOptions = createSubmitStatementOptions();
+            ADBProtocol.QueryServiceResponse response =
+                    connection.protocol.submitStatement(sql, args, executionId, stmtOptions);
+            warnings = connection.protocol.getWarningIfExists(response);
+            boolean isQuery = connection.protocol.isStatementCategory(response,
+                    ADBProtocol.QueryServiceResponse.StatementCategory.QUERY);
+            if (isQuery) {
+                updateCount = -1;
+                executeResponse = response;
+                return true;
+            } else {
+                updateCount = connection.protocol.getUpdateCount(response);
+                executeResponse = null;
+                return false;
+            }
+        } finally {
+            resetExecutionId();
         }
     }
 
     @Override
     public void cancel() throws SQLException {
-        throw getErrorReporter().errorMethodNotSupported(Statement.class, "cancel");
+        checkClosed();
+        connection.protocol.cancelStatementExecution(executionId);
     }
 
     @Override
@@ -284,6 +304,18 @@ class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
         checkClosed();
     }
 
+    private void resetExecutionId() {
+        executionId = UUID.randomUUID();
+    }
+
+    protected ADBProtocol.SubmitStatementOptions createSubmitStatementOptions() {
+        ADBProtocol.SubmitStatementOptions stmtOptions = connection.protocol.createSubmitStatementOptions();
+        stmtOptions.dataverseName = connection.getDataverseCanonicalName();
+        stmtOptions.sqlCompatMode = connection.sqlCompatMode;
+        stmtOptions.timeoutSeconds = queryTimeoutSeconds;
+        return stmtOptions;
+    }
+
     // Batch execution
 
     @Override
@@ -635,7 +667,7 @@ class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
     }
 
     static boolean isSetObjectCompatible(Class<?> cls) {
-        if (ADBRowStore.OBJECT_ACCESSORS_ATOMIC.containsKey(cls)) {
+        if (ADBRowStore.OBJECT_ACCESSORS_ATOMIC.containsKey(cls) || SET_OBJECT_ATOMIC_EXTRA.contains(cls)) {
             return true;
         }
         for (Class<?> aClass : SET_OBJECT_NON_ATOMIC) {
@@ -657,11 +689,14 @@ class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
         registerSerializer(serializerMap, createDoubleSerializer());
         registerSerializer(serializerMap, createStringSerializer());
         registerSerializer(serializerMap, createSqlDateSerializer());
+        registerSerializer(serializerMap, createSqlDateWithCalendarSerializer());
         registerSerializer(serializerMap, createLocalDateSerializer());
         registerSerializer(serializerMap, createSqlTimeSerializer());
+        registerSerializer(serializerMap, createSqlCalendarTimeSerializer());
         registerSerializer(serializerMap, createLocalTimeSerializer());
         registerSerializer(serializerMap, createSqlTimestampSerializer());
-        registerSerializer(serializerMap, createInstantSerializer());
+        registerSerializer(serializerMap, createSqlCalendarTimestampSerializer());
+        registerSerializer(serializerMap, createLocalDateTimeSerializer());
         registerSerializer(serializerMap, createPeriodSerializer());
         registerSerializer(serializerMap, createDurationSerializer());
         return serializerMap;
@@ -720,7 +755,22 @@ class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
             @Override
             protected void serializeNonTaggedValue(Object value, StringBuilder out) {
                 long millis = ((Date) value).getTime();
-                out.append(millis);
+                long millisAdjusted = getDatetimeChrononAdjusted(millis, TimeZone.getDefault());
+                long days = TimeUnit.MILLISECONDS.toDays(millisAdjusted);
+                out.append(days);
+            }
+        };
+    }
+
+    private static ATaggedValueSerializer createSqlDateWithCalendarSerializer() {
+        return new ATaggedValueSerializer(SqlCalendarDate.class, ADBDatatype.DATE) {
+            @Override
+            protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+                SqlCalendarDate dateWithCalendar = (SqlCalendarDate) value;
+                long millis = dateWithCalendar.date.getTime();
+                long millisAdjusted = getDatetimeChrononAdjusted(millis, dateWithCalendar.timeZone);
+                long days = TimeUnit.MILLISECONDS.toDays(millisAdjusted);
+                out.append(days);
             }
         };
     }
@@ -729,8 +779,8 @@ class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
         return new ATaggedValueSerializer(java.time.LocalDate.class, ADBDatatype.DATE) {
             @Override
             protected void serializeNonTaggedValue(Object value, StringBuilder out) {
-                long millis = TimeUnit.DAYS.toMillis(((LocalDate) value).toEpochDay());
-                out.append(millis);
+                long days = ((LocalDate) value).toEpochDay();
+                out.append(days);
             }
         };
     }
@@ -740,7 +790,22 @@ class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
             @Override
             protected void serializeNonTaggedValue(Object value, StringBuilder out) {
                 long millis = ((Time) value).getTime();
-                out.append(millis);
+                long millisAdjusted = getDatetimeChrononAdjusted(millis, TimeZone.getDefault());
+                long timeMillis = millisAdjusted - TimeUnit.DAYS.toMillis(TimeUnit.MILLISECONDS.toDays(millisAdjusted));
+                out.append(timeMillis);
+            }
+        };
+    }
+
+    private static ATaggedValueSerializer createSqlCalendarTimeSerializer() {
+        return new ATaggedValueSerializer(SqlCalendarTime.class, ADBDatatype.TIME) {
+            @Override
+            protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+                SqlCalendarTime timeWithCalendar = (SqlCalendarTime) value;
+                long millis = timeWithCalendar.time.getTime();
+                long millisAdjusted = getDatetimeChrononAdjusted(millis, timeWithCalendar.timeZone);
+                long timeMillis = millisAdjusted - TimeUnit.DAYS.toMillis(TimeUnit.MILLISECONDS.toDays(millisAdjusted));
+                out.append(timeMillis);
             }
         };
     }
@@ -749,8 +814,9 @@ class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
         return new ATaggedValueSerializer(java.time.LocalTime.class, ADBDatatype.TIME) {
             @Override
             protected void serializeNonTaggedValue(Object value, StringBuilder out) {
-                long millis = TimeUnit.NANOSECONDS.toMillis(((LocalTime) value).toNanoOfDay());
-                out.append(millis);
+                long nanos = ((LocalTime) value).toNanoOfDay();
+                long timeMillis = TimeUnit.NANOSECONDS.toMillis(nanos);
+                out.append(timeMillis);
             }
         };
     }
@@ -760,16 +826,29 @@ class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
             @Override
             protected void serializeNonTaggedValue(Object value, StringBuilder out) {
                 long millis = ((Timestamp) value).getTime();
-                out.append(millis);
+                long millisAdjusted = getDatetimeChrononAdjusted(millis, TimeZone.getDefault());
+                out.append(millisAdjusted);
             }
         };
     }
 
-    private static ATaggedValueSerializer createInstantSerializer() {
-        return new ATaggedValueSerializer(java.time.Instant.class, ADBDatatype.DATETIME) {
+    private static ATaggedValueSerializer createSqlCalendarTimestampSerializer() {
+        return new ATaggedValueSerializer(SqlCalendarTimestamp.class, ADBDatatype.DATETIME) {
             @Override
             protected void serializeNonTaggedValue(Object value, StringBuilder out) {
-                long millis = ((Instant) value).toEpochMilli();
+                SqlCalendarTimestamp timestampWithCalendar = (SqlCalendarTimestamp) value;
+                long millis = timestampWithCalendar.timestamp.getTime();
+                long millisAdjusted = getDatetimeChrononAdjusted(millis, timestampWithCalendar.timeZone);
+                out.append(millisAdjusted);
+            }
+        };
+    }
+
+    private static ATaggedValueSerializer createLocalDateTimeSerializer() {
+        return new ATaggedValueSerializer(java.time.LocalDateTime.class, ADBDatatype.DATETIME) {
+            @Override
+            protected void serializeNonTaggedValue(Object value, StringBuilder out) {
+                long millis = ((LocalDateTime) value).atZone(TZ_UTC).toInstant().toEpochMilli();
                 out.append(millis);
             }
         };
@@ -812,6 +891,8 @@ class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
 
     private static abstract class ATaggedValueSerializer extends AbstractValueSerializer {
 
+        protected static ZoneId TZ_UTC = ZoneId.of("UTC");
+
         protected final ADBDatatype adbType;
 
         protected ATaggedValueSerializer(Class<?> javaType, ADBDatatype adbType) {
@@ -842,5 +923,46 @@ class ADBStatement extends ADBWrapperSupport implements java.sql.Statement {
         private static char hex(int i) {
             return (char) (i + (i < 10 ? '0' : ('A' - 10)));
         }
+
+        protected long getDatetimeChrononAdjusted(long datetimeChrononInMillis, TimeZone tz) {
+            int tzOffset = tz.getOffset(datetimeChrononInMillis);
+            return datetimeChrononInMillis + tzOffset;
+        }
+    }
+
+    static abstract class AbstractSqlCalendarDateTime {
+        final TimeZone timeZone;
+
+        AbstractSqlCalendarDateTime(TimeZone timeZone) {
+            this.timeZone = timeZone;
+        }
+    }
+
+    static final class SqlCalendarDate extends AbstractSqlCalendarDateTime {
+        final Date date;
+
+        SqlCalendarDate(Date date, TimeZone timeZone) {
+            super(timeZone);
+            this.date = date;
+        }
+    }
+
+    static final class SqlCalendarTime extends AbstractSqlCalendarDateTime {
+        final Time time;
+
+        SqlCalendarTime(Time time, TimeZone timeZone) {
+            super(timeZone);
+            this.time = time;
+        }
+    }
+
+    static final class SqlCalendarTimestamp extends AbstractSqlCalendarDateTime {
+        final Timestamp timestamp;
+
+        SqlCalendarTimestamp(Timestamp timestamp, TimeZone timeZone) {
+            super(timeZone);
+            this.timestamp = timestamp;
+
+        }
     }
 }