You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/05/24 12:38:33 UTC

[2/4] ignite git commit: IGNITE-5284: Splitted IgniteH2Indexing into several classes. This closes #1999.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c75e4de/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index d3ee6ff..8e6eeba 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -17,35 +17,22 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
-import java.math.BigDecimal;
 import java.sql.Connection;
-import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.sql.Time;
-import java.sql.Timestamp;
 import java.sql.Types;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -60,7 +47,6 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.QueryIndexType;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -97,9 +83,7 @@ import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
-import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
 import org.apache.ignite.internal.processors.query.GridQueryIndexing;
-import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -115,16 +99,11 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2SystemIndexFactory;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
-import org.apache.ignite.internal.processors.query.h2.opt.GridLuceneIndex;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
 import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
@@ -135,16 +114,12 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.GridStringBuilder;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
@@ -159,49 +134,18 @@ import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.h2.api.ErrorCode;
 import org.h2.api.JavaObjectSerializer;
-import org.h2.api.TableEngine;
-import org.h2.command.CommandInterface;
 import org.h2.command.Prepared;
-import org.h2.command.ddl.CreateTableData;
 import org.h2.command.dml.Insert;
 import org.h2.engine.Session;
 import org.h2.engine.SysProperties;
 import org.h2.index.Cursor;
 import org.h2.index.Index;
-import org.h2.jdbc.JdbcConnection;
 import org.h2.jdbc.JdbcPreparedStatement;
 import org.h2.jdbc.JdbcStatement;
-import org.h2.message.DbException;
-import org.h2.mvstore.cache.CacheLongKeyLIRS;
-import org.h2.result.SearchRow;
-import org.h2.result.SimpleRow;
-import org.h2.result.SortOrder;
 import org.h2.server.web.WebServer;
-import org.h2.table.Column;
 import org.h2.table.IndexColumn;
-import org.h2.table.TableBase;
 import org.h2.tools.Server;
 import org.h2.util.JdbcUtils;
-import org.h2.value.DataType;
-import org.h2.value.Value;
-import org.h2.value.ValueArray;
-import org.h2.value.ValueBoolean;
-import org.h2.value.ValueByte;
-import org.h2.value.ValueBytes;
-import org.h2.value.ValueDate;
-import org.h2.value.ValueDecimal;
-import org.h2.value.ValueDouble;
-import org.h2.value.ValueFloat;
-import org.h2.value.ValueGeometry;
-import org.h2.value.ValueInt;
-import org.h2.value.ValueJavaObject;
-import org.h2.value.ValueLong;
-import org.h2.value.ValueNull;
-import org.h2.value.ValueShort;
-import org.h2.value.ValueString;
-import org.h2.value.ValueTime;
-import org.h2.value.ValueTimestamp;
-import org.h2.value.ValueUuid;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -219,10 +163,6 @@ import static org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_N
 import static org.apache.ignite.internal.processors.query.QueryUtils.VER_FIELD_NAME;
 import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.DEFAULT_COLUMNS_COUNT;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VER_COL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
 
@@ -244,11 +184,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         PageIO.registerH2(H2InnerIO.VERSIONS, H2LeafIO.VERSIONS);
         H2ExtrasInnerIO.register();
         H2ExtrasLeafIO.register();
-    }
 
-    /** Spatial index class name. */
-    private static final String SPATIAL_IDX_CLS =
-        "org.apache.ignite.internal.processors.query.h2.opt.GridH2SpatialIndex";
+        // Initialize system properties for H2.
+        System.setProperty("h2.objectCache", "false");
+        System.setProperty("h2.serializeJavaObject", "false");
+        System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching.
+    }
 
     /** Default DB options. */
     private static final String DB_OPTIONS = ";LOCK_MODE=3;MULTI_THREADED=1;DB_CLOSE_ON_EXIT=FALSE" +
@@ -262,7 +203,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /** Dummy metadata for update result. */
     public static final List<GridQueryFieldMetadata> UPDATE_RESULT_META = Collections.<GridQueryFieldMetadata>
-        singletonList(new SqlFieldMetadata(null, null, "UPDATED", Long.class.getName()));
+        singletonList(new H2SqlFieldMetadata(null, null, "UPDATED", Long.class.getName()));
 
     /** */
     private static final int PREPARED_STMT_CACHE_SIZE = 256;
@@ -270,15 +211,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** */
     private static final int TWO_STEP_QRY_CACHE_SIZE = 1024;
 
-    /** */
-    private static final Field COMMAND_FIELD;
-
-    /** */
-    private static final char ESC_CH = '\"';
-
-    /** */
-    private static final String ESC_STR = ESC_CH + "" + ESC_CH;
-
     /** The period of clean up the {@link #stmtCache}. */
     private final Long CLEANUP_STMT_CACHE_PERIOD = Long.getLong(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD, 10_000);
 
@@ -289,25 +221,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** */
     private GridTimeoutProcessor.CancelableTask stmtCacheCleanupTask;
 
-    /*
-     * Command in H2 prepared statement.
-     */
-    static {
-        // Initialize system properties for H2.
-        System.setProperty("h2.objectCache", "false");
-        System.setProperty("h2.serializeJavaObject", "false");
-        System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching.
-
-        try {
-            COMMAND_FIELD = JdbcPreparedStatement.class.getDeclaredField("command");
-
-            COMMAND_FIELD.setAccessible(true);
-        }
-        catch (NoSuchFieldException e) {
-            throw new IllegalStateException("Check H2 version in classpath.", e);
-        }
-    }
-
     /** Logger. */
     @LoggerResource
     private IgniteLogger log;
@@ -319,7 +232,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private Marshaller marshaller;
 
     /** Collection of schemaNames and registered tables. */
-    private final ConcurrentMap<String, Schema> schemas = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<String, H2Schema> schemas = new ConcurrentHashMap8<>();
 
     /** */
     private String dbUrl = "jdbc:h2:mem:";
@@ -346,9 +259,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap8<>();
 
     /** */
-    private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() {
-        @Nullable @Override public ConnectionWrapper get() {
-            ConnectionWrapper c = super.get();
+    private final ThreadLocal<H2ConnectionWrapper> connCache = new ThreadLocal<H2ConnectionWrapper>() {
+        @Nullable @Override public H2ConnectionWrapper get() {
+            H2ConnectionWrapper c = super.get();
 
             boolean reconnect = true;
 
@@ -371,7 +284,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             return c;
         }
 
-        @Nullable @Override protected ConnectionWrapper initialValue() {
+        @Nullable @Override protected H2ConnectionWrapper initialValue() {
             Connection c;
 
             try {
@@ -383,7 +296,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             conns.add(c);
 
-            return new ConnectionWrapper(c);
+            return new H2ConnectionWrapper(c);
         }
     };
 
@@ -400,10 +313,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private final ConcurrentMap<QueryTable, GridH2Table> dataTables = new ConcurrentHashMap8<>();
 
     /** Statement cache. */
-    private final ConcurrentHashMap<Thread, StatementCache> stmtCache = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Thread, H2StatementCache> stmtCache = new ConcurrentHashMap<>();
 
     /** */
-    private final GridBoundedConcurrentLinkedHashMap<TwoStepCachedQueryKey, TwoStepCachedQuery> twoStepCache =
+    private final GridBoundedConcurrentLinkedHashMap<H2TwoStepCachedQueryKey, H2TwoStepCachedQuery> twoStepCache =
         new GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE);
 
     /** */
@@ -430,8 +343,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @return Connection.
      */
     public Connection connectionForCache(String cacheName) {
+        return connectionForSchema(schema(cacheName));
+    }
+
+    /**
+     * @param schema Schema.
+     * @return Connection.
+     */
+    public Connection connectionForSchema(String schema) {
         try {
-            return connectionForThread(schema(cacheName));
+            return connectionForThread(schema);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -449,10 +370,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (useStmtCache) {
             Thread curThread = Thread.currentThread();
 
-            StatementCache cache = stmtCache.get(curThread);
+            H2StatementCache cache = stmtCache.get(curThread);
 
             if (cache == null) {
-                StatementCache cache0 = new StatementCache(PREPARED_STMT_CACHE_SIZE);
+                H2StatementCache cache0 = new H2StatementCache(PREPARED_STMT_CACHE_SIZE);
 
                 cache = stmtCache.putIfAbsent(curThread, cache0);
 
@@ -518,7 +439,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws IgniteCheckedException In case of error.
      */
     private Connection connectionForThread(@Nullable String schema) throws IgniteCheckedException {
-        ConnectionWrapper c = connCache.get();
+        H2ConnectionWrapper c = connCache.get();
 
         if (c == null)
             throw new IgniteCheckedException("Failed to get DB connection for thread (check log for details).");
@@ -645,7 +566,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         GridCacheVersion ver,
         long expirationTime,
         long link) throws IgniteCheckedException {
-        TableDescriptor tbl = tableDescriptor(typeName, cacheName);
+        H2TableDescriptor tbl = tableDescriptor(typeName, cacheName);
 
         if (tbl == null)
             return; // Type was rejected.
@@ -653,39 +574,27 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (expirationTime == 0)
             expirationTime = Long.MAX_VALUE;
 
-        tbl.tbl.update(k, partId, v, ver, expirationTime, false, link);
+        tbl.table().update(k, partId, v, ver, expirationTime, false, link);
 
-        if (tbl.luceneIdx != null)
-            tbl.luceneIdx.store(k, v, ver, expirationTime);
-    }
-
-    /**
-     * @param o Object.
-     * @return {@code true} If it is a binary object.
-     */
-    private boolean isBinary(CacheObject o) {
-        if (ctx == null)
-            return false;
-
-        return ctx.cacheObjects().isBinaryObject(o);
+        if (tbl.luceneIndex() != null)
+            tbl.luceneIndex().store(k, v, ver, expirationTime);
     }
 
     /**
      * @param cacheName Cache name.
      * @return Cache object context.
      */
-    private CacheObjectContext objectContext(String cacheName) {
-        if (ctx == null)
-            return null;
+    public CacheObjectContext objectContext(String cacheName) {
+        GridCacheContext cctx = cacheContext(cacheName);
 
-        return ctx.cache().internalCache(cacheName).context().cacheObjectContext();
+        return cctx != null ? cctx.cacheObjectContext() : null;
     }
 
     /**
      * @param cacheName Cache name.
      * @return Cache object context.
      */
-    private GridCacheContext cacheContext(String cacheName) {
+    public GridCacheContext cacheContext(String cacheName) {
         if (ctx == null)
             return null;
 
@@ -702,14 +611,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (log.isDebugEnabled())
             log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ", val=" + val + ']');
 
-        TableDescriptor tbl = tableDescriptor(type.name(), cacheName);
+        H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName);
 
         if (tbl == null)
             return;
 
-        if (tbl.tbl.update(key, partId, val, ver, 0, true, 0)) {
-            if (tbl.luceneIdx != null)
-                tbl.luceneIdx.remove(key);
+        if (tbl.table().update(key, partId, val, ver, 0, true, 0)) {
+            if (tbl.luceneIndex() != null)
+                tbl.luceneIndex().remove(key);
         }
     }
 
@@ -719,7 +628,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param tbl Table to unregister.
      * @throws IgniteCheckedException If failed to unregister.
      */
-    private void removeTable(TableDescriptor tbl) throws IgniteCheckedException {
+    private void removeTable(H2TableDescriptor tbl) throws IgniteCheckedException {
         assert tbl != null;
 
         if (log.isDebugEnabled())
@@ -755,7 +664,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         tbl.onDrop();
 
-        tbl.schema.tbls.remove(tbl.typeName());
+        tbl.schema().tables().remove(tbl.typeName());
     }
 
     /**
@@ -766,14 +675,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param h2Idx User index.
      * @throws IgniteCheckedException If failed.
      */
-    private void addInitialUserIndex(String cacheName, TableDescriptor desc, GridH2IndexBase h2Idx)
+    private void addInitialUserIndex(String cacheName, H2TableDescriptor desc, GridH2IndexBase h2Idx)
         throws IgniteCheckedException {
-        GridH2Table h2Tbl = desc.tbl;
+        GridH2Table h2Tbl = desc.table();
 
         h2Tbl.proposeUserIndex(h2Idx);
 
         try {
-            String sql = indexCreateSql(desc.fullTableName(), h2Idx, false, desc.schema.escapeAll());
+            String sql = H2Utils.indexCreateSql(desc.fullTableName(), h2Idx, false, desc.schema().escapeAll());
 
             executeSql(cacheName, sql);
         }
@@ -792,15 +701,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         // Locate table.
         String schemaName = schema(cacheName);
 
-        Schema schema = schemas.get(schemaName);
+        H2Schema schema = schemas.get(schemaName);
 
-        TableDescriptor desc = (schema != null ? schema.tbls.get(tblName) : null);
+        H2TableDescriptor desc = (schema != null ? schema.tables().get(tblName) : null);
 
         if (desc == null)
             throw new IgniteCheckedException("Table not found in internal H2 database [schemaName=" + schemaName +
                 ", tblName=" + tblName + ']');
 
-        GridH2Table h2Tbl = desc.tbl;
+        GridH2Table h2Tbl = desc.table();
 
         // Create index.
         final GridH2IndexBase h2Idx = desc.createUserIndex(idxDesc);
@@ -829,7 +738,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             // At this point index is in consistent state, promote it through H2 SQL statement, so that cached
             // prepared statements are re-built.
-            String sql = indexCreateSql(desc.fullTableName(), h2Idx, ifNotExists, schema.escapeAll());
+            String sql = H2Utils.indexCreateSql(desc.fullTableName(), h2Idx, ifNotExists, schema.escapeAll());
 
             executeSql(cacheName, sql);
         }
@@ -847,9 +756,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         throws IgniteCheckedException{
         String schemaName = schema(cacheName);
 
-        Schema schema = schemas.get(schemaName);
+        H2Schema schema = schemas.get(schemaName);
 
-        String sql = indexDropSql(schemaName, idxName, ifExists, schema.escapeAll());
+        String sql = H2Utils.indexDropSql(schemaName, idxName, ifExists, schema.escapeAll());
 
         executeSql(cacheName, sql);
     }
@@ -875,54 +784,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * Generate {@code CREATE INDEX} SQL statement for given params.
-     * @param fullTblName Fully qualified table name.
-     * @param h2Idx H2 index.
-     * @param ifNotExists Quietly skip index creation if it exists.
-     * @return Statement string.
-     */
-    private static String indexCreateSql(String fullTblName, GridH2IndexBase h2Idx, boolean ifNotExists,
-        boolean escapeAll) {
-        boolean spatial = F.eq(SPATIAL_IDX_CLS, h2Idx.getClass().getName());
-
-        GridStringBuilder sb = new SB("CREATE ")
-            .a(spatial ? "SPATIAL " : "")
-            .a("INDEX ")
-            .a(ifNotExists ? "IF NOT EXISTS " : "")
-            .a(escapeName(h2Idx.getName(), escapeAll))
-            .a(" ON ")
-            .a(fullTblName)
-            .a(" (");
-
-        boolean first = true;
-
-        for (IndexColumn col : h2Idx.getIndexColumns()) {
-            if (first)
-                first = false;
-            else
-                sb.a(", ");
-
-            sb.a("\"" + col.columnName + "\"").a(" ").a(col.sortType == SortOrder.ASCENDING ? "ASC" : "DESC");
-        }
-
-        sb.a(')');
-
-        return sb.toString();
-    }
-
-    /**
-     * Generate {@code CREATE INDEX} SQL statement for given params.
-     * @param schemaName <b>Quoted</b> schema name.
-     * @param idxName Index name.
-     * @param ifExists Quietly skip index drop if it exists.
-     * @param escapeAll Escape flag.
-     * @return Statement string.
-     */
-    private static String indexDropSql(String schemaName, String idxName, boolean ifExists, boolean escapeAll) {
-        return "DROP INDEX " + (ifExists ? "IF EXISTS " : "") + schemaName + '.' + escapeName(idxName, escapeAll);
-    }
-
-    /**
      * Create sorted index.
      *
      * @param schema Schema.
@@ -932,7 +793,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param cols Columns.
      * @return Index.
      */
-    private GridH2IndexBase createSortedIndex(Schema schema, String name, GridH2Table tbl, boolean pk,
+    public GridH2IndexBase createSortedIndex(H2Schema schema, String name, GridH2Table tbl, boolean pk,
         List<IndexColumn> cols, int inlineSize) {
         try {
             GridCacheContext cctx = schema.cacheContext();
@@ -949,50 +810,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
     }
 
-    /**
-     * Create spatial index.
-     *
-     * @param tbl Table.
-     * @param idxName Index name.
-     * @param cols Columns.
-     */
-    private GridH2IndexBase createSpatialIndex(GridH2Table tbl, String idxName, IndexColumn[] cols
-    ) {
-        try {
-            Class<?> cls = Class.forName(SPATIAL_IDX_CLS);
-
-            Constructor<?> ctor = cls.getConstructor(
-                GridH2Table.class,
-                String.class,
-                Integer.TYPE,
-                IndexColumn[].class);
-
-            if (!ctor.isAccessible())
-                ctor.setAccessible(true);
-
-            final int segments = tbl.rowDescriptor().configuration().getQueryParallelism();
-
-            return (GridH2IndexBase)ctor.newInstance(tbl, idxName, segments, cols);
-        }
-        catch (Exception e) {
-            throw new IgniteException("Failed to instantiate: " + SPATIAL_IDX_CLS, e);
-        }
-    }
-
     @SuppressWarnings("unchecked")
     @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(
         String cacheName, String qry, String typeName,
         IndexingQueryFilter filters) throws IgniteCheckedException {
-        TableDescriptor tbl = tableDescriptor(typeName, cacheName);
+        H2TableDescriptor tbl = tableDescriptor(typeName, cacheName);
 
-        if (tbl != null && tbl.luceneIdx != null) {
+        if (tbl != null && tbl.luceneIndex() != null) {
             GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, cacheName,
                 U.currentTimeMillis(), null, true);
 
             try {
                 runs.put(run.id(), run);
 
-                return tbl.luceneIdx.query(qry, filters);
+                return tbl.luceneIndex().query(qry, filters);
             }
             finally {
                 runs.remove(run.id());
@@ -1005,7 +836,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** {@inheritDoc} */
     @Override public void unregisterType(String cacheName, String typeName)
         throws IgniteCheckedException {
-        TableDescriptor tbl = tableDescriptor(typeName, cacheName);
+        H2TableDescriptor tbl = tableDescriptor(typeName, cacheName);
 
         if (tbl != null)
             removeTable(tbl);
@@ -1028,9 +859,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     public GridQueryFieldsResult queryLocalSqlFields(final String cacheName, final String qry,
         @Nullable final Collection<Object> params, final IndexingQueryFilter filter, boolean enforceJoinOrder,
         final int timeout, final GridQueryCancel cancel) throws IgniteCheckedException {
-        final Connection conn = connectionForCache(cacheName);
+        final String schema = schema(cacheName);
+
+        final Connection conn = connectionForSchema(schema);
 
-        setupConnection(conn, false, enforceJoinOrder);
+        H2Utils.setupConnection(conn, false, enforceJoinOrder);
 
         final PreparedStatement stmt = preparedStatementWithParams(conn, qry, params, true);
 
@@ -1045,7 +878,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             fldsQry.setEnforceJoinOrder(enforceJoinOrder);
             fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
 
-            return dmlProc.updateSqlFieldsLocal(cacheName, stmt, fldsQry, filter, cancel);
+            return dmlProc.updateSqlFieldsLocal(schema, stmt, fldsQry, filter, cancel);
         }
         else if (DdlStatementsProcessor.isDdlStatement(p))
             throw new IgniteSQLException("DDL statements are supported for the whole cluster only",
@@ -1054,7 +887,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         List<GridQueryFieldMetadata> meta;
 
         try {
-            meta = meta(stmt.getMetaData());
+            meta = H2Utils.meta(stmt.getMetaData());
         }
         catch (SQLException e) {
             throw new IgniteCheckedException("Cannot prepare query metadata", e);
@@ -1075,9 +908,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 runs.putIfAbsent(run.id(), run);
 
                 try {
-                    ResultSet rs = executeSqlQueryWithTimer(cacheName, stmt, conn, qry, params, timeout, cancel);
+                    ResultSet rs = executeSqlQueryWithTimer(schema, stmt, conn, qry, params, timeout, cancel);
 
-                    return new FieldsIterator(rs);
+                    return new H2FieldsIterator(rs);
                 }
                 finally {
                     GridH2QueryContext.clearThreadLocal();
@@ -1106,58 +939,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * @param rsMeta Metadata.
-     * @return List of fields metadata.
-     * @throws SQLException If failed.
-     */
-    private static List<GridQueryFieldMetadata> meta(ResultSetMetaData rsMeta) throws SQLException {
-        List<GridQueryFieldMetadata> meta = new ArrayList<>(rsMeta.getColumnCount());
-
-        for (int i = 1; i <= rsMeta.getColumnCount(); i++) {
-            String schemaName = rsMeta.getSchemaName(i);
-            String typeName = rsMeta.getTableName(i);
-            String name = rsMeta.getColumnLabel(i);
-            String type = rsMeta.getColumnClassName(i);
-
-            if (type == null) // Expression always returns NULL.
-                type = Void.class.getName();
-
-            meta.add(new SqlFieldMetadata(schemaName, typeName, name, type));
-        }
-
-        return meta;
-    }
-
-    /**
-     * @param stmt Prepared statement.
-     * @return Command type.
-     */
-    private static int commandType(PreparedStatement stmt) {
-        try {
-            return ((CommandInterface)COMMAND_FIELD.get(stmt)).getCommandType();
-        }
-        catch (IllegalAccessException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    /**
-     * Stores rule for constructing schemaName according to cache configuration.
-     *
-     * @param ccfg Cache configuration.
-     * @return Proper schema name according to ANSI-99 standard.
-     */
-    private static String schemaNameFromCacheConf(CacheConfiguration<?, ?> ccfg) {
-        if (ccfg.getSqlSchema() == null)
-            return escapeName(ccfg.getName(), true);
-
-        if (ccfg.getSqlSchema().charAt(0) == ESC_CH)
-            return ccfg.getSqlSchema();
-
-        return ccfg.isSqlEscapeAll() ? escapeName(ccfg.getSqlSchema(), true) : ccfg.getSqlSchema().toUpperCase();
-    }
-
-    /**
      * Prepares sql statement.
      *
      * @param conn Connection.
@@ -1210,7 +991,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
 
         if (timeoutMillis > 0)
-            session(conn).setQueryTimeout(timeoutMillis);
+            H2Utils.session(conn).setQueryTimeout(timeoutMillis);
 
         try {
             return stmt.executeQuery();
@@ -1224,14 +1005,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
         finally {
             if (timeoutMillis > 0)
-                session(conn).setQueryTimeout(0);
+                H2Utils.session(conn).setQueryTimeout(0);
         }
     }
 
     /**
      * Executes sql query and prints warning if query is too slow..
      *
-     * @param cacheName Cache name.
+     * @param schema Schema.
      * @param conn Connection,.
      * @param sql Sql query.
      * @param params Parameters.
@@ -1240,21 +1021,21 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @return Result.
      * @throws IgniteCheckedException If failed.
      */
-    public ResultSet executeSqlQueryWithTimer(String cacheName,
+    public ResultSet executeSqlQueryWithTimer(String schema,
         Connection conn,
         String sql,
         @Nullable Collection<Object> params,
         boolean useStmtCache,
         int timeoutMillis,
         @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
-        return executeSqlQueryWithTimer(cacheName, preparedStatementWithParams(conn, sql, params, useStmtCache),
+        return executeSqlQueryWithTimer(schema, preparedStatementWithParams(conn, sql, params, useStmtCache),
             conn, sql, params, timeoutMillis, cancel);
     }
 
     /**
      * Executes sql query and prints warning if query is too slow.
      *
-     * @param cacheName Cache name.
+     * @param schema Schema.
      * @param stmt Prepared statement for query.
      * @param conn Connection.
      * @param sql Sql query.
@@ -1263,7 +1044,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @return Result.
      * @throws IgniteCheckedException If failed.
      */
-    private ResultSet executeSqlQueryWithTimer(String cacheName, PreparedStatement stmt,
+    private ResultSet executeSqlQueryWithTimer(String schema, PreparedStatement stmt,
         Connection conn,
         String sql,
         @Nullable Collection<Object> params,
@@ -1276,7 +1057,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             long time = U.currentTimeMillis() - start;
 
-            long longQryExecTimeout = schemas.get(schema(cacheName)).ccfg.getLongQueryWarningTimeout();
+            long longQryExecTimeout = schemas.get(schema).cacheContext().config().getLongQueryWarningTimeout();
 
             if (time > longQryExecTimeout) {
                 String msg = "Query execution is too long (" + time + " ms): " + sql;
@@ -1319,18 +1100,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
     }
 
-    /**
-     * @param conn Connection to use.
-     * @param distributedJoins If distributed joins are enabled.
-     * @param enforceJoinOrder Enforce join order of tables.
-     */
-    public static void setupConnection(Connection conn, boolean distributedJoins, boolean enforceJoinOrder) {
-        Session s = session(conn);
-
-        s.setForceJoinOrder(enforceJoinOrder);
-        s.setJoinBatchEnabled(distributedJoins);
-    }
-
     /** {@inheritDoc} */
     @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(final GridCacheContext<?, ?> cctx,
         final SqlFieldsQuery qry, final boolean keepBinary, final IndexingQueryFilter filter,
@@ -1369,6 +1138,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(final GridCacheContext<?, ?> cctx,
         final SqlQuery qry, final IndexingQueryFilter filter, final boolean keepBinary) throws IgniteCheckedException {
         if (cctx.config().getQueryParallelism() > 1) {
@@ -1390,7 +1160,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             final GridCloseableIterator<IgniteBiTuple<K, V>> i = queryLocalSql(cacheName, sqlQry, alias,
                 F.asList(params), type, filter, cancel);
 
-            return new QueryCursorImpl<Cache.Entry<K, V>>(new Iterable<Cache.Entry<K, V>>() {
+            return new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() {
                 @Override public Iterator<Cache.Entry<K, V>> iterator() {
                     return new ClIter<Cache.Entry<K, V>>() {
                         @Override public void close() throws Exception {
@@ -1430,10 +1200,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @return Queried rows.
      * @throws IgniteCheckedException If failed.
      */
+    @SuppressWarnings("unchecked")
     public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(String cacheName,
         final String qry, String alias, @Nullable final Collection<Object> params, String type,
         final IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException {
-        final TableDescriptor tbl = tableDescriptor(type, cacheName);
+        final H2TableDescriptor tbl = tableDescriptor(type, cacheName);
 
         if (tbl == null)
             throw new IgniteSQLException("Failed to find SQL table for type: " + type,
@@ -1443,7 +1214,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         Connection conn = connectionForThread(tbl.schemaName());
 
-        setupConnection(conn, false, false);
+        H2Utils.setupConnection(conn, false, false);
 
         GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter)
             .distributedJoinMode(OFF));
@@ -1454,9 +1225,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         runs.put(run.id(), run);
 
         try {
-            ResultSet rs = executeSqlQueryWithTimer(cacheName, conn, sql, params, true, 0, cancel);
+            ResultSet rs = executeSqlQueryWithTimer(schema(cacheName), conn, sql, params, true, 0, cancel);
 
-            return new KeyValIterator(rs);
+            return new H2KeyValueIterator(rs);
         }
         finally {
             GridH2QueryContext.clearThreadLocal();
@@ -1485,7 +1256,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     ) {
         return new Iterable<List<?>>() {
             @Override public Iterator<List<?>> iterator() {
-                return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params, parts);
+                return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params,
+                    parts);
             }
         };
     }
@@ -1497,7 +1269,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         String type = qry.getType();
         String cacheName = cctx.name();
 
-        TableDescriptor tblDesc = tableDescriptor(type, cacheName);
+        H2TableDescriptor tblDesc = tableDescriptor(type, cacheName);
 
         if (tblDesc == null)
             throw new IgniteSQLException("Failed to find SQL table for type: " + type,
@@ -1555,21 +1327,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         };
     }
 
-    /**
-     * @param c Connection.
-     * @return Session.
-     */
-    public static Session session(Connection c) {
-        return (Session)((JdbcConnection)c).getSession();
-    }
-
     /** {@inheritDoc} */
-    @Override public FieldsQueryCursor<List<?>> queryDistributedSqlFields(GridCacheContext<?, ?> cctx, SqlFieldsQuery qry,
-        boolean keepBinary, GridQueryCancel cancel) {
+    @Override public FieldsQueryCursor<List<?>> queryDistributedSqlFields(GridCacheContext<?, ?> cctx,
+        SqlFieldsQuery qry, boolean keepBinary, GridQueryCancel cancel) {
         final String cacheName = cctx.name();
         final String sqlQry = qry.getSql();
 
-        Connection c = connectionForCache(cacheName);
+        String schema = schema(cctx.name());
+
+        Connection c = connectionForSchema(schema);
 
         final boolean enforceJoinOrder = qry.isEnforceJoinOrder();
         final boolean distributedJoins = qry.isDistributedJoins();
@@ -1580,9 +1346,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         GridCacheTwoStepQuery twoStepQry = null;
         List<GridQueryFieldMetadata> meta;
 
-        final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(cacheName, sqlQry, grpByCollocated,
+        final H2TwoStepCachedQueryKey cachedQryKey = new H2TwoStepCachedQueryKey(cacheName, sqlQry, grpByCollocated,
             distributedJoins, enforceJoinOrder, qry.isLocal());
-        TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
+        H2TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
 
         if (cachedQry != null) {
             twoStepQry = cachedQry.twoStepQry.copy();
@@ -1592,7 +1358,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             final UUID locNodeId = ctx.localNodeId();
 
             // Here we will just parse the statement, no need to optimize it at all.
-            setupConnection(c, /*distributedJoins*/false, /*enforceJoinOrder*/true);
+            H2Utils.setupConnection(c, /*distributedJoins*/false, /*enforceJoinOrder*/true);
 
             GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, 0, PREPARE)
                 .distributedJoinMode(distributedJoinMode));
@@ -1606,7 +1372,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 try {
                     while (true) {
                         try {
-                            // Do not cache this statement because the whole two step query object will be cached later on.
+                            // Do not cache this statement because the whole query object will be cached later on.
                             stmt = prepareStatement(c, sqlQry, false);
 
                             break;
@@ -1655,7 +1421,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 if (twoStepQry == null) {
                     if (DmlStatementsProcessor.isDmlStatement(prepared)) {
                         try {
-                            return dmlProc.updateSqlFieldsDistributed(cctx.name(), stmt, qry, cancel);
+                            return dmlProc.updateSqlFieldsDistributed(schema, stmt, qry, cancel);
                         }
                         catch (IgniteCheckedException e) {
                             throw new IgniteSQLException("Failed to execute DML statement [stmt=" + sqlQry +
@@ -1702,7 +1468,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 twoStepQry.cacheIds(cacheIds);
                 twoStepQry.local(qry.isLocal());
 
-                meta = meta(stmt.getMetaData());
+                meta = H2Utils.meta(stmt.getMetaData());
             }
             catch (IgniteCheckedException e) {
                 throw new CacheException("Failed to bind parameters: [qry=" + sqlQry + ", params=" +
@@ -1731,7 +1497,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         cursor.fieldsMeta(meta);
 
         if (cachedQry == null && !twoStepQry.explain()) {
-            cachedQry = new TwoStepCachedQuery(meta, twoStepQry.copy());
+            cachedQry = new H2TwoStepCachedQuery(meta, twoStepQry.copy());
             twoStepCache.putIfAbsent(cachedQryKey, cachedQry);
         }
 
@@ -1752,16 +1518,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * @throws IllegalStateException if segmented indices used with non-segmented indices.
      */
-    private void checkCacheIndexSegmentation(List<Integer> caches) {
-        if (caches.isEmpty())
+    private void checkCacheIndexSegmentation(List<Integer> cacheIds) {
+        if (cacheIds.isEmpty())
             return; // Nothing to check
 
         GridCacheSharedContext sharedCtx = ctx.cache().context();
 
         int expectedParallelism = 0;
 
-        for (int i = 0; i < caches.size(); i++) {
-            GridCacheContext cctx = sharedCtx.cacheContext(caches.get(i));
+        for (Integer cacheId : cacheIds) {
+            GridCacheContext cctx = sharedCtx.cacheContext(cacheId);
 
             assert cctx != null;
 
@@ -1770,8 +1536,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             if (expectedParallelism == 0)
                 expectedParallelism = cctx.config().getQueryParallelism();
-            else if (cctx.config().getQueryParallelism() != expectedParallelism)
-                throw new IllegalStateException("Using indexes with different parallelism levels in same query is forbidden.");
+            else if (cctx.config().getQueryParallelism() != expectedParallelism) {
+                throw new IllegalStateException("Using indexes with different parallelism levels in same query is " +
+                    "forbidden.");
+            }
         }
     }
 
@@ -1784,7 +1552,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @return Prepared statement.
      * @throws IgniteCheckedException In case of error.
      */
-    private String generateQuery(String qry, String tableAlias, TableDescriptor tbl) throws IgniteCheckedException {
+    private String generateQuery(String qry, String tableAlias, H2TableDescriptor tbl) throws IgniteCheckedException {
         assert tbl != null;
 
         final String qry0 = qry;
@@ -1847,9 +1615,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         String schemaName = schema(cacheName);
 
-        Schema schema = schemas.get(schemaName);
+        H2Schema schema = schemas.get(schemaName);
 
-        TableDescriptor tbl = new TableDescriptor(schema, type);
+        H2TableDescriptor tbl = new H2TableDescriptor(this, schema, type);
 
         try {
             Connection conn = connectionForThread(schemaName);
@@ -1895,68 +1663,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
 
         if (type.keyFieldName() != null && !type.fields().containsKey(type.keyFieldName())) {
-            throw new IgniteCheckedException(
-                    MessageFormat.format("Name ''{0}'' must be amongst fields since it is configured as ''keyFieldName'' [type=" +
-                            type.name() + "]", type.keyFieldName()));
+            throw new IgniteCheckedException(MessageFormat.format("Name ''{0}'' must be amongst fields since it " +
+                "is configured as ''keyFieldName'' [type=" + type.name() + "]", type.keyFieldName()));
         }
 
         if (type.valueFieldName() != null && !type.fields().containsKey(type.valueFieldName())) {
-            throw new IgniteCheckedException(
-                    MessageFormat.format("Name ''{0}'' must be amongst fields since it is configured as ''valueFieldName'' [type=" +
-                            type.name() + "]", type.valueFieldName()));
-        }
-    }
-
-    /**
-     * Returns empty string, if {@code nullableString} is empty.
-     *
-     * @param nullableString String for convertion. Could be null.
-     * @return Non null string. Could be empty.
-     */
-    private static String emptyIfNull(String nullableString) {
-        return nullableString == null ? "" : nullableString;
-    }
-
-    /**
-     * Escapes name to be valid SQL identifier. Currently just replaces '.' and '$' sign with '_'.
-     *
-     * @param name Name.
-     * @param escapeAll Escape flag.
-     * @return Escaped name.
-     */
-    public static String escapeName(String name, boolean escapeAll) {
-        if (name == null) // It is possible only for a cache name.
-            return ESC_STR;
-
-        if (escapeAll)
-            return ESC_CH + name + ESC_CH;
-
-        SB sb = null;
-
-        for (int i = 0; i < name.length(); i++) {
-            char ch = name.charAt(i);
-
-            if (!Character.isLetter(ch) && !Character.isDigit(ch) && ch != '_' &&
-                !(ch == '"' && (i == 0 || i == name.length() - 1)) && ch != '-') {
-                // Class name can also contain '$' or '.' - these should be escaped.
-                assert ch == '$' || ch == '.';
-
-                if (sb == null)
-                    sb = new SB();
-
-                sb.a(name.substring(sb.length(), i));
-
-                // Replace illegal chars with '_'.
-                sb.a('_');
-            }
+            throw new IgniteCheckedException(MessageFormat.format("Name ''{0}'' must be amongst fields since it " +
+                "is configured as ''valueFieldName'' [type=" + type.name() + "]", type.valueFieldName()));
         }
-
-        if (sb == null)
-            return name;
-
-        sb.a(name.substring(sb.length(), name.length()));
-
-        return sb.toString();
     }
 
     /**
@@ -1969,7 +1683,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws SQLException If failed to create db table.
      * @throws IgniteCheckedException If failed.
      */
-    private void createTable(String cacheName, Schema schema, TableDescriptor tbl, Connection conn)
+    private void createTable(String cacheName, H2Schema schema, H2TableDescriptor tbl, Connection conn)
         throws SQLException, IgniteCheckedException {
         assert schema != null;
         assert tbl != null;
@@ -1990,14 +1704,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         sql.a(',').a(VER_FIELD_NAME).a(" OTHER INVISIBLE");
 
         for (Map.Entry<String, Class<?>> e : tbl.type().fields().entrySet())
-            sql.a(',').a(escapeName(e.getKey(), escapeAll)).a(' ').a(dbTypeFromClass(e.getValue()));
+            sql.a(',').a(H2Utils.escapeName(e.getKey(), escapeAll)).a(' ').a(dbTypeFromClass(e.getValue()));
 
         sql.a(')');
 
         if (log.isDebugEnabled())
             log.debug("Creating DB table with SQL: " + sql);
 
-        GridH2RowDescriptor rowDesc = new RowDescriptor(tbl.type(), schema);
+        GridH2RowDescriptor rowDesc = new H2RowDescriptor(this, tbl.type(), schema);
 
         H2RowFactory rowFactory = tbl.rowFactory(rowDesc);
 
@@ -2032,6 +1746,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * @param h2Tbl Remove data table.
+     */
+    public void removeDataTable(GridH2Table h2Tbl) {
+        dataTables.remove(h2Tbl.identifier(), h2Tbl);
+    }
+
+    /**
      * Find table for index.
      *
      * @param schemaName Schema name.
@@ -2058,7 +1779,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @return DB type name.
      */
     private String dbTypeFromClass(Class<?> cls) {
-        return DBTypeEnum.fromClass(cls).dBTypeAsString();
+        return H2DatabaseType.fromClass(cls).dBTypeAsString();
     }
 
     /**
@@ -2068,28 +1789,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param cacheName Cache name.
      * @return Table descriptor.
      */
-    @Nullable private TableDescriptor tableDescriptor(String type, String cacheName) {
-        Schema s = schemas.get(schema(cacheName));
-
-        if (s == null)
-            return null;
-
-        return s.tbls.get(type);
-    }
+    @Nullable private H2TableDescriptor tableDescriptor(String type, String cacheName) {
+        String schemaName = schema(cacheName);
 
-    /**
-     * Gets collection of table for given schema name.
-     *
-     * @param schema Schema name.
-     * @return Collection of table descriptors.
-     */
-    private Collection<TableDescriptor> tables(String schema) {
-        Schema s = schemas.get(schema);
+        H2Schema schema = schemas.get(schemaName);
 
-        if (s == null)
-            return Collections.emptySet();
+        if (schema == null)
+            return null;
 
-        return s.tbls.values();
+        return schema.tables().get(type);
     }
 
     /**
@@ -2098,8 +1806,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param cacheName Cache name. {@code null} would be converted to an empty string.
      * @return Schema name. Should not be null since we should not fail for an invalid cache name.
      */
-    private String schema(String cacheName) {
-        return emptyIfNull(cacheName2schema.get(emptyIfNull(cacheName)));
+    public String schema(String cacheName) {
+        String res = cacheName2schema.get(cacheName);
+
+        if (res == null)
+            res = "";
+
+        return res;
     }
 
     /**
@@ -2108,8 +1821,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private void cleanupStatementCache() {
         long cur = U.currentTimeMillis();
 
-        for (Iterator<Map.Entry<Thread, StatementCache>> it = stmtCache.entrySet().iterator(); it.hasNext(); ) {
-            Map.Entry<Thread, StatementCache> entry = it.next();
+        for (Iterator<Map.Entry<Thread, H2StatementCache>> it = stmtCache.entrySet().iterator(); it.hasNext(); ) {
+            Map.Entry<Thread, H2StatementCache> entry = it.next();
 
             Thread t = entry.getKey();
 
@@ -2123,16 +1836,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     @Override public String cacheName(String schemaName) {
         assert schemaName != null;
 
-        Schema schema = schemas.get(schemaName);
+        H2Schema schema = schemas.get(schemaName);
 
         // For the compatibility with conversion from """" to "" inside h2 lib
         if (schema == null) {
-            assert schemaName.isEmpty() || schemaName.charAt(0) != ESC_CH;
+            assert schemaName.isEmpty() || schemaName.charAt(0) != H2Utils.ESC_CH;
 
-            schema = schemas.get(escapeName(schemaName, true));
+            schema = schemas.get(H2Utils.escapeName(schemaName, true));
         }
 
-        return schema.cacheName;
+        return schema.cacheName();
     }
 
     /**
@@ -2142,22 +1855,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param type Type descriptor.
      * @throws IgniteCheckedException If failed.
      */
+    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
     @Override public void rebuildIndexesFromHash(String cacheName,
         GridQueryTypeDescriptor type) throws IgniteCheckedException {
-        TableDescriptor tbl = tableDescriptor(type.name(), cacheName);
+        H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName);
 
         if (tbl == null)
             return;
 
-        assert tbl.tbl != null;
+        assert tbl.table() != null;
 
-        assert tbl.tbl.rebuildFromHashInProgress();
+        assert tbl.table().rebuildFromHashInProgress();
 
-        H2PkHashIndex hashIdx = tbl.pkHashIdx;
+        H2PkHashIndex hashIdx = tbl.primaryKeyHashIndex();
 
         Cursor cursor = hashIdx.find((Session)null, null, null);
 
-        int cacheId = CU.cacheId(tbl.schema.ccfg.getName());
+        int cacheId = CU.cacheId(tbl.schema().cacheName());
 
         GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
 
@@ -2173,12 +1887,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     synchronized (entry) {
                         // TODO : How to correctly get current value and link here?
 
-                        GridH2Row row = tbl.tbl.rowDescriptor().createRow(entry.key(), entry.partition(),
+                        GridH2Row row = tbl.table().rowDescriptor().createRow(entry.key(), entry.partition(),
                             dataRow.value(), entry.version(), entry.expireTime());
 
                         row.link(dataRow.link());
 
-                        List<Index> indexes = tbl.tbl.getAllIndexes();
+                        List<Index> indexes = tbl.table().getAllIndexes();
 
                         for (int i = 2; i < indexes.size(); i++) {
                             Index idx = indexes.get(i);
@@ -2197,19 +1911,19 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         }
 
-        tbl.tbl.markRebuildFromHashInProgress(false);
+        tbl.table().markRebuildFromHashInProgress(false);
     }
 
     /** {@inheritDoc} */
     @Override public void markForRebuildFromHash(String cacheName, GridQueryTypeDescriptor type) {
-        TableDescriptor tbl = tableDescriptor(type.name(), cacheName);
+        H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName);
 
         if (tbl == null)
             return;
 
-        assert tbl.tbl != null;
+        assert tbl.table() != null;
 
-        tbl.tbl.markRebuildFromHashInProgress(true);
+        tbl.table().markRebuildFromHashInProgress(true);
     }
 
     /**
@@ -2221,18 +1935,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws IgniteCheckedException If failed or {@code -1} if the type is unknown.
      */
     long size(String cacheName, String typeName) throws IgniteCheckedException {
-        TableDescriptor tbl = tableDescriptor(typeName, cacheName);
+        H2TableDescriptor tbl = tableDescriptor(typeName, cacheName);
 
         if (tbl == null)
             return -1;
 
         Connection conn = connectionForCache(cacheName);
 
-        setupConnection(conn, false, false);
+        H2Utils.setupConnection(conn, false, false);
 
         try {
-            ResultSet rs = executeSqlQuery(conn, prepareStatement(conn, "SELECT COUNT(*) FROM " + tbl.fullTableName(), false),
-                0, null);
+            ResultSet rs = executeSqlQuery(conn, prepareStatement(conn, "SELECT COUNT(*) FROM " + tbl.fullTableName(),
+                false), 0, null);
 
             if (!rs.next())
                 throw new IllegalStateException();
@@ -2409,6 +2123,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         // Local node goes the last to allow parallel execution.
         if (locNode != null) {
+            assert locNodeHnd != null;
+
             if (specialize != null)
                 msg = specialize.apply(locNode, msg);
 
@@ -2503,7 +2219,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
 //        unregisterMBean(); TODO https://issues.apache.org/jira/browse/IGNITE-2139
         if (ctx != null && !ctx.cache().context().database().persistenceEnabled()) {
-            for (Schema schema : schemas.values())
+            for (H2Schema schema : schemas.values())
                 schema.onDrop();
         }
 
@@ -2534,12 +2250,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** {@inheritDoc} */
     @Override public void registerCache(String cacheName, GridCacheContext<?, ?> cctx, CacheConfiguration<?, ?> ccfg)
         throws IgniteCheckedException {
-        String schema = schemaNameFromCacheConf(ccfg);
+        String schema = H2Utils.schemaNameFromCacheConfiguration(ccfg);
 
-        if (schemas.putIfAbsent(schema, new Schema(cacheName, schema, cctx, ccfg)) != null)
+        if (schemas.putIfAbsent(schema, new H2Schema(cacheName, schema, cctx, ccfg)) != null)
             throw new IgniteCheckedException("Cache already registered: " + U.maskName(cacheName));
 
-        cacheName2schema.put(emptyIfNull(cacheName), schema);
+        cacheName2schema.put(cacheName, schema);
 
         createSchema(schema);
 
@@ -2549,10 +2265,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** {@inheritDoc} */
     @Override public void unregisterCache(String cacheName) {
         String schema = schema(cacheName);
-        Schema rmv = schemas.remove(schema);
+        H2Schema rmv = schemas.remove(schema);
 
         if (rmv != null) {
-            cacheName2schema.remove(emptyIfNull(rmv.cacheName));
+            cacheName2schema.remove(rmv.cacheName());
             mapQryExec.onCacheStop(cacheName);
             dmlProc.onCacheStop(cacheName);
 
@@ -2562,28 +2278,26 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 dropSchema(schema);
             }
             catch (IgniteCheckedException e) {
-                U.error(log, "Failed to drop schema on cache stop (will ignore): " + U.maskName(cacheName), e);
+                U.error(log, "Failed to drop schema on cache stop (will ignore): " + cacheName, e);
             }
 
-            for (TableDescriptor tblDesc : rmv.tbls.values())
-                for (Index idx : tblDesc.tbl.getIndexes())
+            for (H2TableDescriptor tblDesc : rmv.tables().values())
+                for (Index idx : tblDesc.table().getIndexes())
                     idx.close(null);
 
-            for (Iterator<Map.Entry<TwoStepCachedQueryKey, TwoStepCachedQuery>> it = twoStepCache.entrySet().iterator();
-                it.hasNext(); ) {
-                Map.Entry<TwoStepCachedQueryKey, TwoStepCachedQuery> e = it.next();
+            for (Iterator<Map.Entry<H2TwoStepCachedQueryKey, H2TwoStepCachedQuery>> it =
+                twoStepCache.entrySet().iterator(); it.hasNext();) {
+                Map.Entry<H2TwoStepCachedQueryKey, H2TwoStepCachedQuery> e = it.next();
 
-                if (F.eq(e.getKey().cacheName, cacheName))
+                if (F.eq(e.getKey().cacheName(), cacheName))
                     it.remove();
             }
         }
     }
 
     /** {@inheritDoc} */
-    @Override public IndexingQueryFilter backupFilter(
-        @Nullable final AffinityTopologyVersion topVer,
-        @Nullable final int[] parts
-    ) {
+    @Override public IndexingQueryFilter backupFilter(@Nullable final AffinityTopologyVersion topVer,
+        @Nullable final int[] parts) {
         final AffinityTopologyVersion topVer0 = topVer != null ? topVer : AffinityTopologyVersion.NONE;
 
         return new IndexingQueryFilter() {
@@ -2665,172 +2379,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         rdcQryExec.onDisconnected(reconnectFut);
     }
 
-    /**
-     * Key for cached two-step query.
-     */
-    private static final class TwoStepCachedQueryKey {
-        /** */
-        private final String cacheName;
-
-        /** */
-        private final String sql;
-
-        /** */
-        private final boolean grpByCollocated;
-
-        /** */
-        private final boolean distributedJoins;
-
-        /** */
-        private final boolean enforceJoinOrder;
-
-        /** */
-        private final boolean isLocal;
-
-        /**
-         * @param cacheName Cache name.
-         * @param sql Sql.
-         * @param grpByCollocated Collocated GROUP BY.
-         * @param distributedJoins Distributed joins enabled.
-         * @param enforceJoinOrder Enforce join order of tables.
-         * @param isLocal Query is local flag.
-         */
-        private TwoStepCachedQueryKey(String cacheName,
-            String sql,
-            boolean grpByCollocated,
-            boolean distributedJoins,
-            boolean enforceJoinOrder,
-            boolean isLocal) {
-            this.cacheName = cacheName;
-            this.sql = sql;
-            this.grpByCollocated = grpByCollocated;
-            this.distributedJoins = distributedJoins;
-            this.enforceJoinOrder = enforceJoinOrder;
-            this.isLocal = isLocal;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            TwoStepCachedQueryKey that = (TwoStepCachedQueryKey)o;
-
-            if (grpByCollocated != that.grpByCollocated)
-                return false;
-
-            if (distributedJoins != that.distributedJoins)
-                return false;
-
-            if (enforceJoinOrder != that.enforceJoinOrder)
-                return false;
-
-            if (cacheName != null ? !cacheName.equals(that.cacheName) : that.cacheName != null)
-                return false;
-
-            return isLocal == that.isLocal && sql.equals(that.sql);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = cacheName != null ? cacheName.hashCode() : 0;
-            res = 31 * res + sql.hashCode();
-            res = 31 * res + (grpByCollocated ? 1 : 0);
-            res = res + (distributedJoins ? 2 : 0);
-            res = res + (enforceJoinOrder ? 4 : 0);
-            res = res + (isLocal ? 8 : 0);
-
-            return res;
-        }
-    }
-
-    /**
-     * Cached two-step query.
-     */
-    private static final class TwoStepCachedQuery {
-        /** */
-        final List<GridQueryFieldMetadata> meta;
-
-        /** */
-        final GridCacheTwoStepQuery twoStepQry;
-
-        /**
-         * @param meta Fields metadata.
-         * @param twoStepQry Query.
-         */
-        public TwoStepCachedQuery(List<GridQueryFieldMetadata> meta, GridCacheTwoStepQuery twoStepQry) {
-            this.meta = meta;
-            this.twoStepQry = twoStepQry;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(TwoStepCachedQuery.class, this);
-        }
-    }
-
-    /**
-     * @param c1 First column.
-     * @param c2 Second column.
-     * @return {@code true} If they are the same.
-     */
-    private static boolean equal(IndexColumn c1, IndexColumn c2) {
-        return c1.column.getColumnId() == c2.column.getColumnId();
-    }
-
-    /**
-     * @param cols Columns list.
-     * @param col Column to find.
-     * @return {@code true} If found.
-     */
-    private static boolean containsColumn(List<IndexColumn> cols, IndexColumn col) {
-        for (int i = cols.size() - 1; i >= 0; i--) {
-            if (equal(cols.get(i), col))
-                return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * Check whether columns list contains key or key alias column.
-     *
-     * @param desc Row descriptor.
-     * @param cols Columns list.
-     * @return Result.
-     */
-    private static boolean containsKeyColumn(GridH2RowDescriptor desc, List<IndexColumn> cols) {
-        for (int i = cols.size() - 1; i >= 0; i--) {
-            if (desc.isKeyColumn(cols.get(i).column.getColumnId()))
-                return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * @param desc Row descriptor.
-     * @param cols Columns list.
-     * @param keyCol Primary key column.
-     * @param affCol Affinity key column.
-     * @return The same list back.
-     */
-    private static List<IndexColumn> treeIndexColumns(GridH2RowDescriptor desc, List<IndexColumn> cols, IndexColumn keyCol, IndexColumn affCol) {
-        assert keyCol != null;
-
-        if (!containsKeyColumn(desc, cols))
-            cols.add(keyCol);
-
-        if (affCol != null && !containsColumn(cols, affCol))
-            cols.add(affCol);
-
-        return cols;
-    }
-
-
     /** {@inheritDoc} */
     @Override public Collection<GridRunningQueryInfo> runningQueries(long duration) {
         Collection<GridRunningQueryInfo> res = new ArrayList<>();
@@ -2862,1153 +2410,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * Wrapper to store connection and flag is schema set or not.
+     * Closeable iterator.
      */
-    private static class ConnectionWrapper {
-        /** */
-        private Connection conn;
-
-        /** */
-        private volatile String schema;
-
-        /**
-         * @param conn Connection to use.
-         */
-        ConnectionWrapper(Connection conn) {
-            this.conn = conn;
-        }
-
-        /**
-         * @return Schema name if schema is set, null otherwise.
-         */
-        public String schema() {
-            return schema;
-        }
-
-        /**
-         * @param schema Schema name set on this connection.
-         */
-        public void schema(@Nullable String schema) {
-            this.schema = schema;
-        }
-
-        /**
-         * @return Connection.
-         */
-        public Connection connection() {
-            return conn;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ConnectionWrapper.class, this);
-        }
+    private interface ClIter<X> extends AutoCloseable, Iterator<X> {
+        // No-op.
     }
 
-    /**
-     * Enum that helps to map java types to database types.
-     */
-    private enum DBTypeEnum {
-        /** */
-        INT("INT"),
-
-        /** */
-        BOOL("BOOL"),
-
-        /** */
-        TINYINT("TINYINT"),
-
-        /** */
-        SMALLINT("SMALLINT"),
-
-        /** */
-        BIGINT("BIGINT"),
-
-        /** */
-        DECIMAL("DECIMAL"),
-
-        /** */
-        DOUBLE("DOUBLE"),
-
-        /** */
-        REAL("REAL"),
-
-        /** */
-        TIME("TIME"),
-
-        /** */
-        TIMESTAMP("TIMESTAMP"),
-
-        /** */
-        DATE("DATE"),
-
-        /** */
-        VARCHAR("VARCHAR"),
-
-        /** */
-        CHAR("CHAR"),
-
-        /** */
-        BINARY("BINARY"),
-
-        /** */
-        UUID("UUID"),
-
-        /** */
-        ARRAY("ARRAY"),
-
-        /** */
-        GEOMETRY("GEOMETRY"),
-
-        /** */
-        OTHER("OTHER");
-
-        /** Map of Class to enum. */
-        private static final Map<Class<?>, DBTypeEnum> map = new HashMap<>();
-
-        /**
-         * Initialize map of DB types.
-         */
-        static {
-            map.put(int.class, INT);
-            map.put(Integer.class, INT);
-            map.put(boolean.class, BOOL);
-            map.put(Boolean.class, BOOL);
-            map.put(byte.class, TINYINT);
-            map.put(Byte.class, TINYINT);
-            map.put(short.class, SMALLINT);
-            map.put(Short.class, SMALLINT);
-            map.put(long.class, BIGINT);
-            map.put(Long.class, BIGINT);
-            map.put(BigDecimal.class, DECIMAL);
-            map.put(double.class, DOUBLE);
-            map.put(Double.class, DOUBLE);
-            map.put(float.class, REAL);
-            map.put(Float.class, REAL);
-            map.put(Time.class, TIME);
-            map.put(Timestamp.class, TIMESTAMP);
-            map.put(java.util.Date.class, TIMESTAMP);
-            map.put(java.sql.Date.class, DATE);
-            map.put(String.class, VARCHAR);
-            map.put(UUID.class, UUID);
-            map.put(byte[].class, BINARY);
-        }
-
-        /** */
-        private final String dbType;
-
-        /**
-         * Constructs new instance.
-         *
-         * @param dbType DB type name.
-         */
-        DBTypeEnum(String dbType) {
-            this.dbType = dbType;
-        }
-
-        /**
-         * Resolves enum by class.
-         *
-         * @param cls Class.
-         * @return Enum value.
-         */
-        public static DBTypeEnum fromClass(Class<?> cls) {
-            DBTypeEnum res = map.get(cls);
-
-            if (res != null)
-                return res;
-
-            if (DataType.isGeometryClass(cls))
-                return GEOMETRY;
-
-            return cls.isArray() && !cls.getComponentType().isPrimitive() ? ARRAY : OTHER;
-        }
-
-        /**
-         * Gets DB type name.
-         *
-         * @return DB type name.
-         */
-        public String dBTypeAsString() {
-            return dbType;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(DBTypeEnum.class, this);
-        }
-    }
-
-    /**
-     * Information about table in database.
-     */
-    private class TableDescriptor implements GridH2SystemIndexFactory {
-        /** */
-        private final String fullTblName;
-
-        /** */
-        private final GridQueryTypeDescriptor type;
-
-        /** */
-        private final Schema schema;
-
-        /** */
-        private GridH2Table tbl;
-
-        /** */
-        private GridLuceneIndex luceneIdx;
-
-        /** */
-        private H2PkHashIndex pkHashIdx;
-
-        /**
-         * @param schema Schema.
-         * @param type Type descriptor.
-         */
-        TableDescriptor(Schema schema, GridQueryTypeDescriptor type) {
-            this.type = type;
-            this.schema = schema;
-
-            String tblName = escapeName(type.tableName(), schema.escapeAll());
-
-            fullTblName = schema.schemaName + "." + tblName;
-        }
-
-        /**
-         * @return Schema name.
-         */
-        public String schemaName() {
-            return schema.schemaName;
-        }
-
-        /**
-         * @return Database full table name.
-         */
-        String fullTableName() {
-            return fullTblName;
-        }
-
-        /**
-         * @return type name.
-         */
-        String typeName() {
-            return type.name();
-        }
-
-        /**
-         * @return Type.
-         */
-        GridQueryTypeDescriptor type() {
-            return type;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(TableDescriptor.class, this);
-        }
-
-        /**
-         * Create H2 row factory.
-         *
-         * @param rowDesc Row descriptor.
-         * @return H2 row factory.
-         */
-        H2RowFactory rowFactory(GridH2RowDescriptor rowDesc) {
-            GridCacheContext cctx = schema.cacheContext();
-
-            if (cctx.affinityNode() && cctx.offheapIndex())
-                return new H2RowFactory(rowDesc, cctx);
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public ArrayList<Index> createSystemIndexes(GridH2Table tbl) {
-            ArrayList<Index> idxs = new ArrayList<>();
-
-            IndexColumn keyCol = tbl.indexColumn(KEY_COL, SortOrder.ASCENDING);
-            IndexColumn affCol = tbl.getAffinityKeyColumn();
-
-            if (affCol != null && equal(affCol, keyCol))
-                affCol = null;
-
-            GridH2RowDescriptor desc = tbl.rowDescriptor();
-
-            Index hashIdx = createHashIndex(
-                schema,
-                tbl,
-                "_key_PK_hash",
-                treeIndexColumns(desc, new ArrayList<IndexColumn>(2), keyCol, affCol)
-            );
-
-            if (hashIdx != null)
-                idxs.add(hashIdx);
-
-            // Add primary key index.
-            Index pkIdx = createSortedIndex(
-                schema,
-                "_key_PK",
-                tbl,
-                true,
-                treeIndexColumns(desc, new ArrayList<IndexColumn>(2), keyCol, affCol),
-                -1
-            );
-
-            idxs.add(pkIdx);
-
-            if (type().valueClass() == String.class) {
-                try {
-                    luceneIdx = new GridLuceneIndex(ctx, schema.offheap, schema.cacheName, type);
-                }
-                catch (IgniteCheckedException e1) {
-                    throw new IgniteException(e1);
-                }
-            }
-
-            boolean affIdxFound = false;
-
-            GridQueryIndexDescriptor textIdx = type.textIndex();
-
-            if (textIdx != null) {
-                try {
-                    luceneIdx = new GridLuceneIndex(ctx, schema.offheap, schema.cacheName, type);
-                }
-                catch (IgniteCheckedException e1) {
-                    throw new IgniteException(e1);
-                }
-            }
-
-            // Locate index where affinity column is first (if any).
-            if (affCol != null) {
-                for (GridQueryIndexDescriptor idxDesc : type.indexes().values()) {
-                    if (idxDesc.type() != QueryIndexType.SORTED)
-                        continue;
-
-                    String firstField = idxDesc.fields().iterator().next();
-
-                    String firstFieldName =
-                        schema.escapeAll() ? firstField : escapeName(firstField, false).toUpperCase();
-
-                    Column col = tbl.getColumn(firstFieldName);
-
-                    IndexColumn idxCol = tbl.indexColumn(col.getColumnId(),
-                        idxDesc.descending(firstField) ? SortOrder.DESCENDING : SortOrder.ASCENDING);
-
-                    affIdxFound |= equal(idxCol, affCol);
-                }
-            }
-
-            // Add explicit affinity key index if nothing alike was found.
-            if (affCol != null && !affIdxFound) {
-                idxs.add(createSortedIndex(schema, "AFFINITY_KEY", tbl, false,
-                    treeIndexColumns(desc, new ArrayList<IndexColumn>(2), affCol, keyCol), -1));
-            }
-
-            return idxs;
-        }
-
-        /**
-         * Get collection of user indexes.
-         *
-         * @return User indexes.
-         */
-        public Collection<GridH2IndexBase> createUserIndexes() {
-            assert tbl != null;
-
-            ArrayList<GridH2IndexBase> res = new ArrayList<>();
-
-            for (GridQueryIndexDescriptor idxDesc : type.indexes().values()) {
-                GridH2IndexBase idx = createUserIndex(idxDesc);
-
-                res.add(idx);
-            }
-
-            return res;
-        }
-
-        /**
-         * Create user index.
-         *
-         * @param idxDesc Index descriptor.
-         * @return Index.
-         */
-        private GridH2IndexBase createUserIndex(GridQueryIndexDescriptor idxDesc) {
-            String name = schema.escapeAll() ? idxDesc.name() : escapeName(idxDesc.name(), false).toUpperCase();
-
-            IndexColumn keyCol = tbl.indexColumn(KEY_COL, SortOrder.ASCENDING);
-            IndexColumn affCol = tbl.getAffinityKeyColumn();
-
-            List<IndexColumn> cols = new ArrayList<>(idxDesc.fields().size() + 2);
-
-            boolean escapeAll = schema.escapeAll();
-
-            for (String field : idxDesc.fields()) {
-                String fieldName = escapeAll ? field : escapeName(field, false).toUpperCase();
-
-                Column col = tbl.getColumn(fieldName);
-
-                cols.add(tbl.indexColumn(col.getColumnId(),
-                    idxDesc.descending(field) ? SortOrder.DESCENDING : SortOrder.ASCENDING));
-            }
-
-            GridH2RowDescriptor desc = tbl.rowDescriptor();
-            if (idxDesc.type() == QueryIndexType.SORTED) {
-                cols = treeIndexColumns(desc, cols, keyCol, affCol);
-                return createSortedIndex(schema, name, tbl, false, cols, idxDesc.inlineSize());
-            }
-            else if (idxDesc.type() == QueryIndexType.GEOSPATIAL) {
-                return createSpatialIndex(tbl, name, cols.toArray(new IndexColumn[cols.size()]));
-            }
-
-            throw new IllegalStateException("Index type: " + idxDesc.type());
-        }
-
-        /**
-         * Create hash index.
-         *
-         * @param schema Schema.
-         * @param tbl Table.
-         * @param idxName Index name.
-         * @param cols Columns.
-         * @return Index.
-         */
-        private Index createHashIndex(Schema schema, GridH2Table tbl, String idxName, List<IndexColumn> cols) {
-            GridCacheContext cctx = schema.cacheContext();
-
-            if (cctx.affinityNode() && cctx.offheapIndex()) {
-                assert pkHashIdx == null : pkHashIdx;
-
-                pkHashIdx = new H2PkHashIndex(cctx, tbl, idxName, cols);
-
-                return pkHashIdx;
-            }
-
-            return null;
-        }
-
-        /**
-         *
-         */
-        void onDrop() {
-            dataTables.remove(tbl.identifier(), tbl);
-
-            tbl.destroy();
-
-            U.closeQuiet(luceneIdx);
-        }
-    }
-
-    /**
-     * Special field set iterator based on database result set.
-     */
-    public static class FieldsIterator extends GridH2ResultSetIterator<List<?>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * @param data Data.
-         * @throws IgniteCheckedException If failed.
-         */
-        public FieldsIterator(ResultSet data) throws IgniteCheckedException {
-            super(data, false, true);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected List<?> createRow() {
-            ArrayList<Object> res = new ArrayList<>(row.length);
-
-            Collections.addAll(res, row);
-
-            return res;
-        }
-    }
-
-    /**
-     * Special key/value iterator based on database result set.
-     */
-    private static class KeyValIterator<K, V> extends GridH2ResultSetIterator<IgniteBiTuple<K, V>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * @param data Data array.
-         * @throws IgniteCheckedException If failed.
-         */
-        protected KeyValIterator(ResultSet data) throws IgniteCheckedException {
-            super(data, false, true);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override protected IgniteBiTuple<K, V> createRow() {
-            K key = (K)row[0];
-            V val = (V)row[1];
-
-            return new IgniteBiTuple<>(key, val);
-        }
-    }
-
-    /**
-     * Closeable iterator.
-     */
-    private interface ClIter<X> extends AutoCloseable, Iterator<X> {
-        // No-op.
-    }
-
-    /**
-     * Field descriptor.
-     */
-    static class SqlFieldMetadata implements GridQueryFieldMetadata {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Schema name. */
-        private String schemaName;
-
-        /** Type name. */
-        private String typeName;
-
-        /** Name. */
-        private String name;
-
-        /** Type. */
-        private String type;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public SqlFieldMetadata() {
-            // No-op
-        }
-
-        /**
-         * @param schemaName Schema name.
-         * @param typeName Type name.
-         * @param name Name.
-         * @param type Type.
-         */
-        SqlFieldMetadata(@Nullable String schemaName, @Nullable String typeName, String name, String type) {
-            assert name != null && type != null : schemaName + " | " + typeName + " | " + name + " | " + type;
-
-            this.schemaName = schemaName;
-            this.typeName = typeName;
-            this.name = name;
-            this.type = type;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String schemaName() {
-            return schemaName;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String typeName() {
-            return typeName;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String fieldName() {
-            return name;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String fieldTypeName() {
-            return type;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeString(out, schemaName);
-            U.writeString(out, typeName);
-            U.writeString(out, name);
-            U.writeString(out, type);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            schemaName = U.readString(in);
-            typeName = U.readString(in);
-            name = U.readString(in);
-            type = U.readString(in);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(SqlFieldMetadata.class, this);
-        }
-    }
-
-    /**
-     * Database schema object.
-     */
-    private class Schema {
-        /** */
-        private final String cacheName;
-
-        /** */
-        private final String schemaName;
-
-        /** */
-        private final GridUnsafeMemory offheap = null;
-
-        /** */
-        private final ConcurrentMap<String, TableDescriptor> tbls = new ConcurrentHashMap8<>();
-
-        /** Cache for deserialized offheap rows. */
-        private final CacheLongKeyLIRS<GridH2Row> rowCache;
-
-        /** */
-        private final GridCacheContext<?, ?> cctx;
-
-        /** */
-        private final CacheConfiguration<?, ?> ccfg;
-
-        /**
-         * @param cacheName Cache name.
-         * @param schemaName Schema name.
-         * @param cctx Cache context.
-         * @param ccfg Cache configuration.
-         */
-        private Schema(String cacheName, String schemaName, GridCacheContext<?, ?> cctx,
-            CacheConfiguration<?, ?> ccfg) {
-            this.cacheName = cacheName;
-            this.cctx = cctx;
-            this.schemaName = schemaName;
-            this.ccfg = ccfg;
-
-            rowCache = null;
-        }
-
-        /**
-         * @return Cache context.
-         */
-        public GridCacheContext cacheContext() {
-            return cctx;
-        }
-
-        /**
-         * @param tbl Table descriptor.
-         */
-        public void add(TableDescriptor tbl) {
-            if (tbls.putIfAbsent(tbl.typeName(), tbl) != null)
-                throw new IllegalStateException("Table already registered: " + tbl.fullTableName());
-        }
-
-        /**
-         * @return Escape all.
-         */
-        public boolean escapeAll() {
-            return ccfg.isSqlEscapeAll();
-        }
-
-        /**
-         * Called after the schema was dropped.
-         */
-        public void onDrop() {
-            for (TableDescriptor tblDesc : tbls.values())
-                tblDesc.onDrop();
-        }
-    }
-
-    /**
-     * Row descriptor.
-     */
-    private class RowDescriptor implements GridH2RowDescriptor {
-        /** */
-        private final GridQueryTypeDescriptor type;
-
-        /** */
-        private final String[] fields;
-
-        /** */
-        private final int[] fieldTypes;
-
-        /** */
-        private final int keyType;
-
-        /** */
-        private final int valType;
-
-        /** */
-        private final Schema schema;
-
-        /** */
-        private final GridUnsafeGuard guard;
-
-        /** */
-        private final boolean snapshotableIdx;
-
-        /** */
-        private final GridQueryProperty[] props;
-
-        /** Id of user-defined key column */
-        private final int keyAliasColumnId;
-
-        /** Id of user-defined value column */
-        private final int valueAliasColumnId;
-
-        /**
-         * @param type Type descriptor.
-         * @param schema Schema.
-         */
-        RowDescriptor(GridQueryTypeDescriptor type, Schema schema) {
-            assert type != null;
-            assert schema != null;
-
-            this.type = type;
-            this.schema = schema;
-
-            guard = schema.offheap == null ? null : new GridUnsafeGuard();
-
-            Map<String, Class<?>> allFields = new LinkedHashMap<>();
-
-            allFields.putAll(type.fields());
-
-            fields = allFields.keySet().toArray(new String[allFields.size()]);
-
-            fieldTypes = new int[fields.length];
-
-            Class[] classes = allFields.values().toArray(new Class[fields.length]);
-
-            for (int i = 0; i < fieldTypes.length; i++)
-                fieldTypes[i] = DataType.getTypeFromClass(classes[i]);
-
-            keyType = DataType.getTypeFromClass(type.keyClass());
-            valType = DataType.getTypeFromClass(type.valueClass());
-
-            props = new GridQueryProperty[fields.length];
-
-            for (int i = 0; i < fields.length; i++) {
-                GridQueryProperty p = type.property(fields[i]);
-
-                assert p != null : fields[i];
-
-                props[i] = p;
-            }
-
-            final List<String> fieldsList = Arrays.asList(fields);
-            keyAliasColumnId = (type.keyFieldName() != null) ? DEFAULT_COLUMNS_COUNT + fieldsList.indexOf(type.keyFieldName()) : -1;
-            valueAliasColumnId = (type.valueFieldName() != null) ? DEFAULT_COLUMNS_COUNT + fieldsList.indexOf(type.valueFieldName()) : -1;
-
-            // Index is not snapshotable in db-x.
-            snapshotableIdx = false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteH2Indexing indexing() {
-            return IgniteH2Indexing.this;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridQueryTypeDescriptor type() {
-            return type;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridCacheContext<?, ?> context() {
-            return schema.cacheContext();
-        }
-
-        /** {@inheritDoc} */
-        @Override public CacheConfiguration configuration() {
-            return schema.ccfg;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridUnsafeGuard guard() {
-            return guard;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cache(GridH2Row row) {
-            long ptr = row.pointer();
-
-            assert ptr > 0 : ptr;
-
-            schema.rowCache.put(ptr, row);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void uncache(long ptr) {
-            schema.rowCache.remove(ptr);
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridUnsafeMemory memory() {
-            return schema.offheap;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Value wrap(Object obj, int type) throws IgniteCheckedException {
-            assert obj != null;
-
-            if (obj instanceof CacheObject) { // Handle cache object.
-                CacheObject co = (CacheObject)obj;
-
-                if (type == Value.JAVA_OBJECT)
-                    return new GridH2ValueCacheObject(cacheContext(schema.cacheName), co);
-
-                obj = co.value(objectContext(schema.cacheName), false);
-            }
-
-            switch (type) {
-                case Value.BOOLEAN:
-                    return ValueBoolean.get((Boolean)obj);
-                case Value.BYTE:
-                    return ValueByte.get((Byte)obj);
-                case Value.SHORT:
-                    return ValueShort.get((Short)obj);
-                case Value.INT:
-                    return ValueInt.get((Integer)obj);
-                case Value.FLOAT:
-                    return ValueFloat.get((Float)obj);
-                case Value.LONG:
-                    return ValueLong.get((Long)obj);
-                case Value.DOUBLE:
-                    return ValueDouble.get((Double)obj);
-                case Value.UUID:
-                    UUID uuid = (UUID)obj;
-                    return ValueUuid.get(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
-                case Value.DATE:
-                    return ValueDate.get((Date)obj);
-                case Value.TIME:
-                    return ValueTime.get((Time)obj);
-                case Value.TIMESTAMP:
-                    if (obj instanceof java.util.Date && !(obj instanceof Timestamp))
-                        obj = new Timestamp(((java.util.Date)obj).getTime());
-
-                    return ValueTimestamp.get((Timestamp)obj);
-                case Value.DECIMAL:
-                    return ValueDecimal.get((BigDecimal)obj);
-                case Val

<TRUNCATED>