You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2014/12/10 16:59:17 UTC
[08/17] incubator-ignite git commit: ignite-qry - merged
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
new file mode 100644
index 0000000..3a850cc
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
@@ -0,0 +1,1998 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+* __ ____/___________(_)______ /__ ____/______ ____(_)_______
+* _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+* / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+* \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+*/
+
+package org.gridgain.grid.kernal.processors.query.h2;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.indexing.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.cache.query.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.query.*;
+import org.gridgain.grid.kernal.processors.query.h2.opt.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.offheap.unsafe.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.h2.api.*;
+import org.h2.command.*;
+import org.h2.constant.*;
+import org.h2.index.*;
+import org.h2.jdbc.*;
+import org.h2.message.*;
+import org.h2.mvstore.cache.*;
+import org.h2.server.web.*;
+import org.h2.table.*;
+import org.h2.tools.*;
+import org.h2.util.*;
+import org.h2.value.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.lang.reflect.*;
+import java.math.*;
+import java.sql.*;
+import java.text.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.IgniteSystemProperties.*;
+import static org.gridgain.grid.kernal.processors.query.GridQueryIndexType.*;
+import static org.gridgain.grid.kernal.processors.query.h2.opt.GridH2AbstractKeyValueRow.*;
+import static org.h2.result.SortOrder.*;
+
+/**
+ * Indexing implementation based on H2 database engine. In this implementation main query language is SQL,
+ * fulltext indexing can be performed using Lucene. For each registered space
+ * the SPI will create respective schema, for default space (where space name is null) schema
+ * with name {@code PUBLIC} will be used. To avoid name conflicts user should not explicitly name
+ * a schema {@code PUBLIC}.
+ * <p>
+ * For each registered {@link GridQueryTypeDescriptor} this SPI will create respective SQL table with
+ * {@code '_key'} and {@code '_val'} fields for key and value, and fields from
+ * {@link GridQueryTypeDescriptor#keyFields()} and {@link GridQueryTypeDescriptor#valueFields()}.
+ * For each table it will create indexes declared in {@link GridQueryTypeDescriptor#indexes()}.
+ * <h1 class="header">Some important defaults.</h1>
+ * <ul>
+ * <li>All the data will be kept in memory</li>
+ * <li>Primitive types will not be indexed (e.g. java types which can be directly converted to SQL types)</li>
+ * <li>
+ * Key types will be converted to SQL types, so it is impossible to store one value type with
+ * different key types
+ * </li>
+ * </ul>
+ * @see GridIndexingSpi
+ */
+@SuppressWarnings({"UnnecessaryFullyQualifiedName", "NonFinalStaticVariableUsedInClassInitialization"})
+public class GridH2Indexing implements GridQueryIndexing {
+ /** Default DB options. */
+ private static final String DFLT_DB_OPTIONS = ";LOCK_MODE=3;MULTI_THREADED=1;DB_CLOSE_ON_EXIT=FALSE" +
+ ";DEFAULT_LOCK_TIMEOUT=10000";
+
+ /** Options for optimized mode to work properly. */
+ private static final String OPTIMIZED_DB_OPTIONS = ";OPTIMIZE_REUSE_RESULTS=0;QUERY_CACHE_SIZE=0;" +
+ "RECOMPILE_ALWAYS=1;MAX_OPERATION_MEMORY=0";
+
+ /** Field name for key. */
+ public static final String KEY_FIELD_NAME = "_key";
+
+ /** Field name for value. */
+ public static final String VAL_FIELD_NAME = "_val";
+
+ /** */
+ private static final Field COMMAND_FIELD;
+
+ /**
+ * Command in H2 prepared statement.
+ */
+ static {
+ try {
+ COMMAND_FIELD = JdbcPreparedStatement.class.getDeclaredField("command");
+
+ COMMAND_FIELD.setAccessible(true);
+ }
+ catch (NoSuchFieldException e) {
+ throw new IllegalStateException("Check H2 version in classpath.", e);
+ }
+ }
+
+ /** */
+ private static final ThreadLocal<GridH2Indexing> localSpi = new ThreadLocal<>();
+
+ /** */
+ private volatile String cachedSearchPathCmd;
+
+ /** Cache for deserialized offheap rows. */
+ private CacheLongKeyLIRS<GridH2KeyValueRowOffheap> rowCache = new CacheLongKeyLIRS<>(32 * 1024, 1, 128, 256);
+
+ /** Logger. */
+ @IgniteLoggerResource
+ private IgniteLogger log;
+
+ /** Node ID. */
+ @IgniteLocalNodeIdResource
+ private UUID nodeId;
+
+ /** */
+ @IgniteMarshallerResource
+ private IgniteMarshaller marshaller;
+
+ /** */
+ private GridUnsafeMemory offheap;
+
+ /** */
+ private final Collection<String> schemaNames = new GridConcurrentHashSet<>();
+
+ /** Collection of schemaNames and registered tables. */
+ private final ConcurrentMap<String, Schema> schemas = new ConcurrentHashMap8<>();
+
+ /** */
+ private String dbUrl = "jdbc:h2:mem:";
+
+ /** */
+ private final Collection<Connection> conns = Collections.synchronizedCollection(new ArrayList<Connection>());
+
+ /** */
+ private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() {
+ @Nullable @Override public ConnectionWrapper get() {
+ ConnectionWrapper c = super.get();
+
+ boolean reconnect = true;
+
+ try {
+ reconnect = c == null || c.connection().isClosed();
+ }
+ catch (SQLException e) {
+ U.warn(log, "Failed to check connection status.", e);
+ }
+
+ if (reconnect) {
+ c = initialValue();
+
+ set(c);
+ }
+
+ return c;
+ }
+
+ @Nullable @Override protected ConnectionWrapper initialValue() {
+ Connection c = null;
+
+ try {
+ c = DriverManager.getConnection(dbUrl);
+
+ String[] searchPath = cfg.getSearchPath();
+
+ if (!F.isEmpty(searchPath)) {
+ try (Statement s = c.createStatement()) {
+ String cmd = cachedSearchPathCmd;
+
+ if (cmd == null) {
+ SB b = new SB("SET SCHEMA_SEARCH_PATH ");
+
+ for (int i = 0; i < searchPath.length; i++) {
+ if (i != 0)
+ b.a(',');
+
+ b.a('"').a(schema(searchPath[i])).a('"');
+ }
+
+ cachedSearchPathCmd = cmd = b.toString();
+ }
+
+ s.executeUpdate(cmd);
+ }
+ }
+
+ conns.add(c);
+
+ return new ConnectionWrapper(c);
+ }
+ catch (SQLException e) {
+ U.close(c, log);
+
+ throw new GridRuntimeException("Failed to initialize DB connection: " + dbUrl, e);
+ }
+ }
+ };
+
+ /** */
+ private volatile GridQueryConfiguration cfg = new GridQueryConfiguration();
+
+ /** */
+ private volatile GridKernalContext ctx;
+
+ /**
+ * Gets DB connection.
+ *
+ * @param schema Whether to set schema for connection or not.
+ * @return DB connection.
+ * @throws GridException In case of error.
+ */
+ private Connection connectionForThread(@Nullable String schema) throws GridException {
+ ConnectionWrapper c = connCache.get();
+
+ if (c == null)
+ throw new GridException("Failed to get DB connection for thread (check log for details).");
+
+ if (schema != null && !F.eq(c.schema(), schema)) {
+ Statement stmt = null;
+
+ try {
+ stmt = c.connection().createStatement();
+
+ stmt.executeUpdate("SET SCHEMA \"" + schema + '"');
+
+ if (log.isDebugEnabled())
+ log.debug("Initialized H2 schema for queries on space: " + schema);
+
+ c.schema(schema);
+ }
+ catch (SQLException e) {
+ throw new GridException("Failed to set schema for DB connection for thread [schema=" +
+ schema + "]", e);
+ }
+ finally {
+ U.close(stmt, log);
+ }
+ }
+
+ return c.connection();
+ }
+
+ /**
+ * Creates DB schema if it has not been created yet.
+ *
+ * @param schema Schema name.
+ * @throws GridException If failed to create db schema.
+ */
+ private void createSchemaIfAbsent(String schema) throws GridException {
+ executeStatement("CREATE SCHEMA IF NOT EXISTS \"" + schema + '"');
+
+ if (log.isDebugEnabled())
+ log.debug("Created H2 schema for index database: " + schema);
+ }
+
+ /**
+ * @param sql SQL statement.
+ * @throws GridException If failed.
+ */
+ private void executeStatement(String sql) throws GridException {
+ Statement stmt = null;
+
+ try {
+ Connection c = connectionForThread(null);
+
+ stmt = c.createStatement();
+
+ stmt.executeUpdate(sql);
+ }
+ catch (SQLException e) {
+ onSqlException();
+
+ throw new GridException("Failed to execute statement: " + sql, e);
+ }
+ finally {
+ U.close(stmt, log);
+ }
+ }
+
+ /**
+ * Removes entry with specified key from any tables (if exist).
+ *
+ * @param spaceName Space name.
+ * @param key Key.
+ * @param tblToUpdate Table to update.
+ * @throws GridException In case of error.
+ */
+ private void removeKey(@Nullable String spaceName, Object key, TableDescriptor tblToUpdate)
+ throws GridException {
+ try {
+ Collection<TableDescriptor> tbls = tables(schema(spaceName));
+
+ if (tbls.size() > 1) {
+ boolean fixedTyping = isIndexFixedTyping(spaceName);
+
+ for (TableDescriptor tbl : tbls) {
+ if (tbl != tblToUpdate && (tbl.type().keyClass().equals(key.getClass()) ||
+ !fixedTyping)) {
+ if (tbl.tbl.update(key, null, 0)) {
+ if (tbl.luceneIdx != null)
+ tbl.luceneIdx.remove(key);
+
+ return;
+ }
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ throw new GridException("Failed to remove key: " + key, e);
+ }
+ }
+
+ /**
+ * Binds object to prepared statement.
+ *
+ * @param stmt SQL statement.
+ * @param idx Index.
+ * @param obj Value to store.
+ * @throws GridException If failed.
+ */
+ private void bindObject(PreparedStatement stmt, int idx, @Nullable Object obj) throws GridException {
+ try {
+ if (obj == null)
+ stmt.setNull(idx, Types.VARCHAR);
+ else
+ stmt.setObject(idx, obj);
+ }
+ catch (SQLException e) {
+ throw new GridException("Failed to bind parameter [idx=" + idx + ", obj=" + obj + ']', e);
+ }
+ }
+
+ /**
+ * Handles SQL exception.
+ */
+ private void onSqlException() {
+ Connection conn = connCache.get().connection();
+
+ connCache.set(null);
+
+ if (conn != null) {
+ conns.remove(conn);
+
+ // Reset connection to receive new one at next call.
+ U.close(conn, log);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, Object k, Object v, byte[] ver,
+ long expirationTime) throws GridException {
+ TableDescriptor tbl = tableDescriptor(spaceName, type);
+
+ if (tbl == null)
+ return; // Type was rejected.
+
+ localSpi.set(this);
+
+ try {
+ removeKey(spaceName, k, tbl);
+
+ if (expirationTime == 0)
+ expirationTime = Long.MAX_VALUE;
+
+ tbl.tbl.update(k, v, expirationTime);
+
+ if (tbl.luceneIdx != null)
+ tbl.luceneIdx.store(k, v, ver, expirationTime);
+ }
+ finally {
+ localSpi.remove();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(@Nullable String spaceName, Object key) throws GridException {
+ if (log.isDebugEnabled())
+ log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ']');
+
+ localSpi.set(this);
+
+ try {
+ for (TableDescriptor tbl : tables(schema(spaceName))) {
+ if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
+ if (tbl.tbl.update(key, null, 0)) {
+ if (tbl.luceneIdx != null)
+ tbl.luceneIdx.remove(key);
+
+ return;
+ }
+ }
+ }
+ }
+ finally {
+ localSpi.remove();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSwap(@Nullable String spaceName, Object key) throws GridException {
+ Schema schema = schemas.get(schema(spaceName));
+
+ if (schema == null)
+ return;
+
+ localSpi.set(this);
+
+ try {
+ for (TableDescriptor tbl : schema.values()) {
+ if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
+ try {
+ if (tbl.tbl.onSwap(key))
+ return;
+ }
+ catch (GridException e) {
+ throw new GridException(e);
+ }
+ }
+ }
+ }
+ finally {
+ localSpi.remove();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes)
+ throws GridException {
+ localSpi.set(this);
+
+ try {
+ for (TableDescriptor tbl : tables(schema(spaceName))) {
+ if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
+ try {
+ if (tbl.tbl.onUnswap(key, val))
+ return;
+ }
+ catch (GridException e) {
+ throw new GridException(e);
+ }
+ }
+ }
+ }
+ finally {
+ localSpi.remove();
+ }
+ }
+
+ /**
+ * Drops table form h2 database and clear all related indexes (h2 text, lucene).
+ *
+ * @param tbl Table to unregister.
+ * @throws GridException If failed to unregister.
+ */
+ private void removeTable(TableDescriptor tbl) throws GridException {
+ assert tbl != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Removing query index table: " + tbl.fullTableName());
+
+ Connection c = connectionForThread(null);
+
+ Statement stmt = null;
+
+ try {
+ // NOTE: there is no method dropIndex() for lucene engine correctly working.
+ // So we have to drop all lucene index.
+ // FullTextLucene.dropAll(c); TODO: GG-4015: fix this
+
+ stmt = c.createStatement();
+
+ String sql = "DROP TABLE IF EXISTS " + tbl.fullTableName();
+
+ if (log.isDebugEnabled())
+ log.debug("Dropping database index table with SQL: " + sql);
+
+ stmt.executeUpdate(sql);
+ }
+ catch (SQLException e) {
+ onSqlException();
+
+ throw new GridException("Failed to drop database index table [type=" + tbl.type().name() +
+ ", table=" + tbl.fullTableName() + "]", e);
+ }
+ finally {
+ U.close(stmt, log);
+ }
+
+ tbl.tbl.close();
+
+ if (tbl.luceneIdx != null)
+ U.closeQuiet(tbl.luceneIdx);
+
+ ConcurrentMap<String, TableDescriptor> tbls = schemas.get(tbl.schema());
+
+ if (!F.isEmpty(tbls))
+ tbls.remove(tbl.name());
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryText(
+ @Nullable String spaceName, String qry, GridQueryTypeDescriptor type,
+ GridIndexingQueryFilter filters) throws GridException {
+ TableDescriptor tbl = tableDescriptor(spaceName, type);
+
+ if (tbl != null && tbl.luceneIdx != null)
+ return tbl.luceneIdx.query(qry, filters);
+
+ return new GridEmptyCloseableIterator<>();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unregisterType(@Nullable String spaceName, GridQueryTypeDescriptor type)
+ throws GridException {
+ TableDescriptor tbl = tableDescriptor(spaceName, type);
+
+ if (tbl != null)
+ removeTable(tbl);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <K, V> GridQueryFieldsResult queryFields(@Nullable final String spaceName, final String qry,
+ @Nullable final Collection<Object> params, final GridIndexingQueryFilter filters)
+ throws GridException {
+ localSpi.set(this);
+
+ setFilters(filters);
+
+ try {
+ Connection conn = connectionForThread(schema(spaceName));
+
+ ResultSet rs = executeSqlQueryWithTimer(conn, qry, params);
+
+ List<GridQueryFieldMetadata> meta = null;
+
+ if (rs != null) {
+ try {
+ ResultSetMetaData rsMeta = rs.getMetaData();
+
+ meta = new ArrayList<>(rsMeta.getColumnCount());
+
+ for (int i = 1; i <= rsMeta.getColumnCount(); i++) {
+ String schemaName = rsMeta.getSchemaName(i);
+ String typeName = rsMeta.getTableName(i);
+ String name = rsMeta.getColumnLabel(i);
+ String type = rsMeta.getColumnClassName(i);
+
+ meta.add(new SqlFieldMetadata(schemaName, typeName, name, type));
+ }
+ }
+ catch (SQLException e) {
+ throw new IgniteSpiException("Failed to get meta data.", e);
+ }
+ }
+
+ return new GridQueryFieldsResultAdapter(meta, new FieldsIterator(rs));
+ }
+ finally {
+ setFilters(null);
+
+ localSpi.remove();
+ }
+ }
+
+ /**
+ * @param stmt Prepared statement.
+ * @return Command type.
+ */
+ private static int commandType(PreparedStatement stmt) {
+ try {
+ return ((CommandInterface)COMMAND_FIELD.get(stmt)).getCommandType();
+ }
+ catch (IllegalAccessException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * @return Configuration.
+ */
+ public GridQueryConfiguration configuration() {
+ return cfg;
+ }
+
+ /**
+ * Executes sql query.
+ *
+ * @param conn Connection,.
+ * @param sql Sql query.
+ * @param params Parameters.
+ * @return Result.
+ * @throws GridException If failed.
+ */
+ @Nullable private ResultSet executeSqlQuery(Connection conn, String sql,
+ @Nullable Collection<Object> params) throws GridException {
+ PreparedStatement stmt;
+
+ try {
+ stmt = conn.prepareStatement(sql);
+ }
+ catch (SQLException e) {
+ if (e.getErrorCode() == ErrorCode.TABLE_OR_VIEW_NOT_FOUND_1)
+ return null;
+
+ throw new GridException("Failed to parse SQL query: " + sql, e);
+ }
+
+ switch (commandType(stmt)) {
+ case CommandInterface.SELECT:
+ case CommandInterface.CALL:
+ case CommandInterface.EXPLAIN:
+ case CommandInterface.ANALYZE:
+ break;
+ default:
+ throw new GridException("Failed to execute non-query SQL statement: " + sql);
+ }
+
+ bindParameters(stmt, params);
+
+ try {
+ return stmt.executeQuery();
+ }
+ catch (SQLException e) {
+ throw new GridException("Failed to execute SQL query.", e);
+ }
+ }
+
+ /**
+ * Executes sql query and prints warning if query is too slow..
+ *
+ * @param conn Connection,.
+ * @param sql Sql query.
+ * @param params Parameters.
+ * @return Result.
+ * @throws GridException If failed.
+ */
+ private ResultSet executeSqlQueryWithTimer(Connection conn, String sql,
+ @Nullable Collection<Object> params) throws GridException {
+ long start = U.currentTimeMillis();
+
+ try {
+ ResultSet rs = executeSqlQuery(conn, sql, params);
+
+ long time = U.currentTimeMillis() - start;
+
+ long longQryExecTimeout = cfg.getLongQueryExecutionTimeout();
+
+ if (time > longQryExecTimeout) {
+ String msg = "Query execution is too long (" + time + " ms): " + sql;
+
+ String longMsg = msg;
+
+ if (cfg.isLongQueryExplain()) {
+ ResultSet plan = executeSqlQuery(conn, "EXPLAIN " + sql, params);
+
+ if (plan == null)
+ longMsg = "Failed to explain plan because required table does not exist: " + sql;
+ else {
+ plan.next();
+
+ // Add SQL explain result message into log.
+ longMsg = "Query execution is too long [time=" + time + " ms, sql='" + sql + '\'' +
+ ", plan=" + U.nl() + plan.getString(1) + U.nl() + ", parameters=" + params + "]";
+ }
+ }
+
+ LT.warn(log, null, longMsg, msg);
+ }
+
+ return rs;
+ }
+ catch (SQLException e) {
+ onSqlException();
+
+ throw new GridException(e);
+ }
+ }
+
+ /**
+ * Executes query.
+ *
+ * @param qry Query.
+ * @param params Query parameters.
+ * @param tbl Target table of query to generate select.
+ * @return Result set.
+ * @throws GridException If failed.
+ */
+ private ResultSet executeQuery(String qry, @Nullable Collection<Object> params,
+ @Nullable TableDescriptor tbl) throws GridException {
+ Connection conn = connectionForThread(tbl != null ? tbl.schema() : "PUBLIC");
+
+ String sql = generateQuery(qry, tbl);
+
+ return executeSqlQueryWithTimer(conn, sql, params);
+ }
+
+ /**
+ * Binds parameters to prepared statement.
+ *
+ * @param stmt Prepared statement.
+ * @param params Parameters collection.
+ * @throws GridException If failed.
+ */
+ private void bindParameters(PreparedStatement stmt, @Nullable Collection<Object> params) throws GridException {
+ if (!F.isEmpty(params)) {
+ int idx = 1;
+
+ for (Object arg : params)
+ bindObject(stmt, idx++, arg);
+ }
+ }
+
+ /**
+ * Executes regular query.
+ * Note that SQL query can not refer to table alias, so use full table name instead.
+ *
+ * @param spaceName Space name.
+ * @param qry Query.
+ * @param params Query parameters.
+ * @param type Query return type.
+ * @param filters Space name and key filters.
+ * @return Queried rows.
+ * @throws GridException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(@Nullable String spaceName,
+ final String qry, @Nullable final Collection<Object> params, GridQueryTypeDescriptor type,
+ final GridIndexingQueryFilter filters) throws GridException {
+ final TableDescriptor tbl = tableDescriptor(spaceName, type);
+
+ if (tbl == null)
+ return new GridEmptyCloseableIterator<>();
+
+ setFilters(filters);
+
+ localSpi.set(this);
+
+ try {
+ ResultSet rs = executeQuery(qry, params, tbl);
+
+ return new KeyValIterator(rs);
+ }
+ finally {
+ setFilters(null);
+
+ localSpi.remove();
+ }
+ }
+
+ /**
+ * Sets filters for current thread. Must be set to not null value
+ * before executeQuery and reset to null after in finally block since it signals
+ * to table that it should return content without expired values.
+ *
+ * @param filters Filters.
+ */
+ private void setFilters(@Nullable GridIndexingQueryFilter filters) {
+ GridH2IndexBase.setFiltersForThread(filters);
+ }
+
+ /**
+ * Prepares statement for query.
+ *
+ * @param qry Query string.
+ * @param tbl Table to use.
+ * @return Prepared statement.
+ * @throws GridException In case of error.
+ */
+ private String generateQuery(String qry, @Nullable TableDescriptor tbl) throws GridException {
+ boolean needSelect = tbl != null;
+
+ String str = qry.trim().toUpperCase();
+
+ if (!str.startsWith("FROM")) {
+ if (str.startsWith("SELECT")) {
+ if (needSelect) {
+ StringTokenizer st = new StringTokenizer(str, " ");
+
+ String errMsg = "Wrong query format, query must start with 'select * from' " +
+ "or 'from' or without such keywords.";
+
+ if (st.countTokens() > 3) {
+ st.nextToken();
+ String wildcard = st.nextToken();
+ String from = st.nextToken();
+
+ if (!"*".equals(wildcard) || !"FROM".equals(from))
+ throw new GridException(errMsg);
+
+ needSelect = false;
+ }
+ else
+ throw new GridException(errMsg);
+ }
+ }
+ else {
+ boolean needWhere = !str.startsWith("ORDER") && !str.startsWith("LIMIT");
+
+ qry = needWhere ? "FROM " + tbl.fullTableName() + " WHERE " + qry :
+ "FROM " + tbl.fullTableName() + ' ' + qry;
+ }
+ }
+
+ GridStringBuilder ptrn = new SB("SELECT {0}.").a(KEY_FIELD_NAME);
+
+ ptrn.a(", {0}.").a(VAL_FIELD_NAME);
+
+ return needSelect ? MessageFormat.format(ptrn.toString(), tbl.fullTableName()) + ' ' + qry : qry;
+ }
+
+ /**
+ * Registers new class description.
+ *
+ * This implementation doesn't support type reregistration.
+ *
+ * @param type Type description.
+ * @throws GridException In case of error.
+ */
+ @Override public boolean registerType(@Nullable String spaceName, GridQueryTypeDescriptor type)
+ throws GridException {
+ if (!validateTypeDescriptor(spaceName, type))
+ return false;
+
+ for (TableDescriptor table : tables(schema(spaceName)))
+ // Need to compare class names rather than classes to define
+ // whether a class was previously undeployed.
+ if (table.type().valueClass().getClass().getName().equals(type.valueClass().getName()))
+ throw new GridException("Failed to register type in query index because" +
+ " class is already registered (most likely that class with the same name" +
+ " was not properly undeployed): " + type);
+
+ TableDescriptor tbl = new TableDescriptor(spaceName, type);
+
+ try {
+ Connection conn = connectionForThread(null);
+
+ Schema schema = schemas.get(tbl.schema());
+
+ if (schema == null) {
+ schema = new Schema(spaceName);
+
+ Schema existing = schemas.putIfAbsent(tbl.schema(), schema);
+
+ if (existing != null)
+ schema = existing;
+ }
+
+ createTable(schema, tbl, conn);
+
+ schema.put(tbl.name(), tbl);
+ }
+ catch (SQLException e) {
+ onSqlException();
+
+ throw new GridException("Failed to register query type: " + type, e);
+ }
+
+ return true;
+ }
+
+ /**
+ * @param cls Class.
+ * @return True if given class has primitive respective sql type.
+ */
+ private boolean isPrimitive(Class<?> cls) {
+ DBTypeEnum valType = DBTypeEnum.fromClass(cls);
+
+ return valType != DBTypeEnum.BINARY && valType != DBTypeEnum.OTHER &&
+ valType != DBTypeEnum.ARRAY;
+ }
+
+ /**
+ * Validates properties described by query types.
+ *
+ * @param spaceName Space name.
+ * @param type Type descriptor.
+ * @return True if type is valid.
+ * @throws GridException If validation failed.
+ */
+ private boolean validateTypeDescriptor(@Nullable String spaceName, GridQueryTypeDescriptor type)
+ throws GridException {
+ assert type != null;
+
+ boolean keyPrimitive = isPrimitive(type.keyClass());
+ boolean valPrimitive = isPrimitive(type.valueClass());
+
+ // Do not register if value is not primitive and
+ // there are no indexes or fields defined.
+ if (!type.valueTextIndex() && type.indexes().isEmpty() &&
+ type.keyFields().isEmpty() && type.valueFields().isEmpty())
+ return keyPrimitive && isIndexPrimitiveKey(spaceName) || valPrimitive && isIndexPrimitiveValue(spaceName);
+
+ Collection<String> names = new HashSet<>();
+
+ names.addAll(type.keyFields().keySet());
+ names.addAll(type.valueFields().keySet());
+
+ if (names.size() < type.keyFields().size() + type.valueFields().size())
+ throw new GridException("Found duplicated properties with the same name [keyType=" +
+ type.keyClass().getName() + ", valueType=" + type.valueClass().getName() + "]");
+
+ String ptrn = "Name ''{0}'' is reserved and cannot be used as a field name [class=" + type + "]";
+
+ for (String name : names) {
+ if (name.equals(KEY_FIELD_NAME) || name.equals(VAL_FIELD_NAME))
+ throw new GridException(MessageFormat.format(ptrn, name));
+ }
+
+ return true;
+ }
+
+ /**
+ * Escapes name to be valid SQL identifier. Currently just replaces '.' and '$' sign with '_'.
+ *
+ * @param name Name.
+ * @param escapeAll Escape flag.
+ * @return Escaped name.
+ */
+ private static String escapeName(String name, boolean escapeAll) {
+ if (escapeAll)
+ return "\"" + name + "\"";
+
+ SB sb = null;
+
+ for (int i = 0; i < name.length(); i++) {
+ char ch = name.charAt(i);
+
+ if (!Character.isLetter(ch) && !Character.isDigit(ch) && ch != '_' &&
+ !(ch == '"' && (i == 0 || i == name.length() - 1)) && ch != '-') {
+ // Class name can also contain '$' or '.' - these should be escaped.
+ assert ch == '$' || ch == '.';
+
+ if (sb == null)
+ sb = new SB();
+
+ sb.a(name.substring(sb.length(), i));
+
+ // Replace illegal chars with '_'.
+ sb.a('_');
+ }
+ }
+
+ if (sb == null)
+ return name;
+
+ sb.a(name.substring(sb.length(), name.length()));
+
+ return sb.toString();
+ }
+
+ /**
+ * Create db table by using given table descriptor.
+ *
+ * @param schema Schema.
+ * @param tbl Table descriptor.
+ * @param conn Connection.
+ * @throws SQLException If failed to create db table.
+ */
+ private void createTable(Schema schema, TableDescriptor tbl, Connection conn) throws SQLException {
+ assert tbl != null;
+
+ boolean keyAsObj = !isIndexFixedTyping(schema.spaceName);
+
+ boolean escapeAll = isEscapeAll(schema.spaceName);
+
+ String keyType = keyAsObj ? "OTHER" : dbTypeFromClass(tbl.type().keyClass());
+ String valTypeStr = dbTypeFromClass(tbl.type().valueClass());
+
+ SB sql = new SB();
+
+ sql.a("CREATE TABLE ").a(tbl.fullTableName()).a(" (")
+ .a(KEY_FIELD_NAME).a(' ').a(keyType).a(" NOT NULL");
+
+ sql.a(',').a(VAL_FIELD_NAME).a(' ').a(valTypeStr);
+
+ for (Map.Entry<String, Class<?>> e: tbl.type().keyFields().entrySet())
+ sql.a(',').a(escapeName(e.getKey(), escapeAll)).a(' ').a(dbTypeFromClass(e.getValue()));
+
+ for (Map.Entry<String, Class<?>> e: tbl.type().valueFields().entrySet())
+ sql.a(',').a(escapeName(e.getKey(), escapeAll)).a(' ').a(dbTypeFromClass(e.getValue()));
+
+ sql.a(')');
+
+ if (log.isDebugEnabled())
+ log.debug("Creating DB table with SQL: " + sql);
+
+ GridH2RowDescriptor desc = new RowDescriptor(tbl.type(), schema, keyAsObj);
+
+ GridH2Table.Engine.createTable(conn, sql.toString(), desc, tbl, tbl.spaceName);
+ }
+
+ /**
+ * Gets corresponding DB type from java class.
+ *
+ * @param cls Java class.
+ * @return DB type name.
+ */
+ private String dbTypeFromClass(Class<?> cls) {
+ return DBTypeEnum.fromClass(cls).dBTypeAsString();
+ }
+
+ /**
+ * Gets table descriptor by value type.
+ *
+ * @param spaceName Space name.
+ * @param type Value type descriptor.
+ * @return Table descriptor or {@code null} if not found.
+ */
+ @Nullable private TableDescriptor tableDescriptor(@Nullable String spaceName, GridQueryTypeDescriptor type) {
+ return tableDescriptor(type.name(), spaceName);
+ }
+
+ /**
+ * Gets table descriptor by type and space names.
+ *
+ * @param type Type name.
+ * @param space Space name.
+ * @return Table descriptor.
+ */
+ @Nullable private TableDescriptor tableDescriptor(String type, @Nullable String space) {
+ ConcurrentMap<String, TableDescriptor> tbls = schemas.get(schema(space));
+
+ if (tbls == null)
+ return null;
+
+ return tbls.get(type);
+ }
+
+ /**
+ * Gets collection of table for given schema name.
+ *
+ * @param schema Schema name.
+ * @return Collection of table descriptors.
+ */
+ private Collection<TableDescriptor> tables(String schema) {
+ ConcurrentMap<String, TableDescriptor> tbls = schemas.get(schema);
+
+ if (tbls == null)
+ return Collections.emptySet();
+
+ return tbls.values();
+ }
+
+ /**
+ * Gets database schema from space.
+ *
+ * @param space Space name.
+ * @return Schema name.
+ */
+ private static String schema(@Nullable String space) {
+ if (F.isEmpty(space))
+ return "PUBLIC";
+
+ return space;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rebuildIndexes(@Nullable String spaceName, GridQueryTypeDescriptor type) {
+ if (offheap != null)
+ throw new UnsupportedOperationException("Index rebuilding is not supported when off-heap memory is used");
+
+ TableDescriptor tbl = tableDescriptor(spaceName, type);
+
+ if (tbl == null)
+ return;
+
+ tbl.tbl.rebuildIndexes();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long size(@Nullable String spaceName, GridQueryTypeDescriptor type,
+ GridIndexingQueryFilter filters) throws GridException {
+ TableDescriptor tbl = tableDescriptor(spaceName, type);
+
+ if (tbl == null)
+ return -1;
+
+ IgniteSpiCloseableIterator<List<?>> iter = queryFields(spaceName,
+ "SELECT COUNT(*) FROM " + tbl.fullTableName(), null, null).iterator();
+
+ return ((Number)iter.next().get(0)).longValue();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("NonThreadSafeLazyInitialization")
+ @Override public void start(GridKernalContext ctx) throws GridException {
+ if (log.isDebugEnabled())
+ log.debug("Starting cache query index...");
+
+ if (ctx != null) { // This is allowed in some tests.
+ this.ctx = ctx;
+
+ GridQueryConfiguration cfg0 = ctx.config().getQueryConfiguration();
+
+ if (cfg0 != null)
+ cfg = cfg0;
+
+ for (GridCacheConfiguration cacheCfg : ctx.config().getCacheConfiguration())
+ registerSpace(cacheCfg.getName());
+ }
+
+ System.setProperty("h2.serializeJavaObject", "false");
+
+ if (SysProperties.serializeJavaObject) {
+ U.warn(log, "Serialization of Java objects in H2 was enabled.");
+
+ SysProperties.serializeJavaObject = false;
+ }
+
+ if (cfg.isUseOptimizedSerializer())
+ Utils.serializer = h2Serializer();
+
+ long maxOffHeapMemory = cfg.getMaxOffHeapMemory();
+
+ if (maxOffHeapMemory != -1) {
+ assert maxOffHeapMemory >= 0 : maxOffHeapMemory;
+
+ offheap = new GridUnsafeMemory(maxOffHeapMemory);
+ }
+
+ SB opt = new SB();
+
+ opt.a(DFLT_DB_OPTIONS).a(OPTIMIZED_DB_OPTIONS);
+
+ String dbName = UUID.randomUUID().toString();
+
+ dbUrl = "jdbc:h2:mem:" + dbName + opt;
+
+ try {
+ Class.forName("org.h2.Driver");
+ }
+ catch (ClassNotFoundException e) {
+ throw new GridException("Failed to find org.h2.Driver class", e);
+ }
+
+ for (String schema : schemaNames)
+ createSchemaIfAbsent(schema);
+
+ try {
+ createSqlFunctions();
+ runInitScript();
+
+ if (getString(GG_H2_DEBUG_CONSOLE) != null) {
+ Connection c = DriverManager.getConnection(dbUrl);
+
+ WebServer webSrv = new WebServer();
+ Server web = new Server(webSrv, "-webPort", "0");
+ web.start();
+ String url = webSrv.addSession(c);
+
+ try {
+ Server.openBrowser(url);
+ }
+ catch (Exception e) {
+ U.warn(log, "Failed to open browser: " + e.getMessage());
+ }
+ }
+ }
+ catch (SQLException e) {
+ throw new GridException(e);
+ }
+
+// registerMBean(gridName, this, GridH2IndexingSpiMBean.class); TODO
+ }
+
+ /**
+ * @return Serializer.
+ */
+ protected JavaObjectSerializer h2Serializer() {
+ return new JavaObjectSerializer() {
+ @Override public byte[] serialize(Object obj) throws Exception {
+ return marshaller.marshal(obj);
+ }
+
+ @Override public Object deserialize(byte[] bytes) throws Exception {
+ return marshaller.unmarshal(bytes, null);
+ }
+ };
+ }
+
+ /**
+ * Runs initial script.
+ *
+ * @throws GridException If failed.
+ * @throws SQLException If failed.
+ */
+ private void runInitScript() throws GridException, SQLException {
+ String initScriptPath = cfg.getInitialScriptPath();
+
+ if (initScriptPath == null)
+ return;
+
+ try (PreparedStatement p = connectionForThread(null).prepareStatement("RUNSCRIPT FROM ? CHARSET 'UTF-8'")) {
+ p.setString(1, initScriptPath);
+
+ p.execute();
+ }
+ }
+
+ /**
+ * Registers SQL functions.
+ *
+ * @throws SQLException If failed.
+ * @throws GridException If failed.
+ */
+ private void createSqlFunctions() throws SQLException, GridException {
+ Class<?>[] idxCustomFuncClss = cfg.getIndexCustomFunctionClasses();
+
+ if (F.isEmpty(idxCustomFuncClss))
+ return;
+
+ for (Class<?> cls : idxCustomFuncClss) {
+ for (Method m : cls.getDeclaredMethods()) {
+ GridCacheQuerySqlFunction ann = m.getAnnotation(GridCacheQuerySqlFunction.class);
+
+ if (ann != null) {
+ int modifiers = m.getModifiers();
+
+ if (!Modifier.isStatic(modifiers) || !Modifier.isPublic(modifiers))
+ throw new GridException("Method " + m.getName() + " must be public static.");
+
+ String alias = ann.alias().isEmpty() ? m.getName() : ann.alias();
+
+ String clause = "CREATE ALIAS " + alias + (ann.deterministic() ? " DETERMINISTIC FOR \"" :
+ " FOR \"") + cls.getName() + '.' + m.getName() + '"';
+
+ Collection<String> schemas = new ArrayList<>(schemaNames);
+
+ if (!schemaNames.contains(schema(null)))
+ schemas.add(schema(null));
+
+ for (String schema : schemas) {
+ Connection c = connectionForThread(schema);
+
+ Statement s = c.createStatement();
+
+ s.execute(clause);
+
+ s.close();
+ }
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws GridException {
+ if (log.isDebugEnabled())
+ log.debug("Stopping cache query index...");
+
+// unregisterMBean(); TODO
+
+ Connection conn = connectionForThread(null);
+
+ for (ConcurrentMap<String, TableDescriptor> m : schemas.values()) {
+ for (TableDescriptor desc : m.values()) {
+ desc.tbl.close();
+
+ if (desc.luceneIdx != null)
+ U.closeQuiet(desc.luceneIdx);
+ }
+ }
+
+ if (conn != null) {
+ Statement stmt = null;
+
+ try {
+ stmt = conn.createStatement();
+
+ stmt.execute("DROP ALL OBJECTS DELETE FILES");
+ stmt.execute("SHUTDOWN");
+ }
+ catch (SQLException e) {
+ throw new GridException("Failed to shutdown database.", e);
+ }
+ finally {
+ U.close(stmt, log);
+ }
+ }
+
+ for (Connection c : conns)
+ U.close(c, log);
+
+ conns.clear();
+ schemas.clear();
+ rowCache.clear();
+
+ if (log.isDebugEnabled())
+ log.debug("Cache query index stopped.");
+ }
+
+ /**
+ * @param spaceName Space name.
+ * @return {@code true} If primitive keys must be indexed.
+ */
+ public boolean isIndexPrimitiveKey(@Nullable String spaceName) {
+ GridCacheQueryConfiguration cfg = cacheQueryConfiguration(spaceName);
+
+ return cfg != null && cfg.isIndexPrimitiveKey();
+ }
+
+ /**
+ * @param spaceName Space name.
+ * @return {@code true} If primitive values must be indexed.
+ */
+ public boolean isIndexPrimitiveValue(String spaceName) {
+ GridCacheQueryConfiguration cfg = cacheQueryConfiguration(spaceName);
+
+ return cfg != null && cfg.isIndexPrimitiveValue();
+ }
+
+ /** {@inheritDoc} */
+ public boolean isIndexFixedTyping(String spaceName) {
+ GridCacheQueryConfiguration cfg = cacheQueryConfiguration(spaceName);
+
+ return cfg != null && cfg.isIndexFixedTyping();
+ }
+
+ /** {@inheritDoc} */
+ public boolean isEscapeAll(String spaceName) {
+ GridCacheQueryConfiguration cfg = cacheQueryConfiguration(spaceName);
+
+ return cfg != null && cfg.isEscapeAll();
+ }
+
+ /**
+ * @param spaceName Space name.
+ * @return Cache query configuration.
+ */
+ @Nullable private GridCacheQueryConfiguration cacheQueryConfiguration(String spaceName) {
+ return ctx == null ? null : ctx.cache().internalCache(spaceName).configuration().getQueryConfiguration();
+ }
+
+ /** {@inheritDoc} */
+ public int getMaxOffheapRowsCacheSize() {
+ return (int)rowCache.getMaxMemory();
+ }
+
+ /** {@inheritDoc} */
+ public int getOffheapRowsCacheSize() {
+ return (int)rowCache.getUsedMemory();
+ }
+
+ /** {@inheritDoc} */
+ public long getAllocatedOffHeapMemory() {
+ return offheap == null ? -1 : offheap.allocatedSize();
+ }
+
+ /**
+ * @param spaceName Space name.
+ */
+ public void registerSpace(String spaceName) {
+ schemaNames.add(schema(spaceName));
+ }
+
+ /**
+ * Wrapper to store connection and flag is schema set or not.
+ */
+ private static class ConnectionWrapper {
+ /** */
+ private Connection conn;
+
+ /** */
+ private volatile String schema;
+
+ /**
+ * @param conn Connection to use.
+ */
+ ConnectionWrapper(Connection conn) {
+ this.conn = conn;
+ }
+
+ /**
+ * @return Schema name if schema is set, null otherwise.
+ */
+ public String schema() {
+ return schema;
+ }
+
+ /**
+ * @param schema Schema name set on this connection.
+ */
+ public void schema(@Nullable String schema) {
+ this.schema = schema;
+ }
+
+ /**
+ * @return Connection.
+ */
+ public Connection connection() {
+ return conn;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ConnectionWrapper.class, this);
+ }
+ }
+
+ /**
+ * Enum that helps to map java types to database types.
+ */
+ private enum DBTypeEnum {
+ /** */
+ INT("INT"),
+
+ /** */
+ BOOL("BOOL"),
+
+ /** */
+ TINYINT("TINYINT"),
+
+ /** */
+ SMALLINT("SMALLINT"),
+
+ /** */
+ BIGINT("BIGINT"),
+
+ /** */
+ DECIMAL("DECIMAL"),
+
+ /** */
+ DOUBLE("DOUBLE"),
+
+ /** */
+ REAL("REAL"),
+
+ /** */
+ TIME("TIME"),
+
+ /** */
+ TIMESTAMP("TIMESTAMP"),
+
+ /** */
+ DATE("DATE"),
+
+ /** */
+ VARCHAR("VARCHAR"),
+
+ /** */
+ CHAR("CHAR"),
+
+ /** */
+ BINARY("BINARY"),
+
+ /** */
+ UUID("UUID"),
+
+ /** */
+ ARRAY("ARRAY"),
+
+ /** */
+ GEOMETRY("GEOMETRY"),
+
+ /** */
+ OTHER("OTHER");
+
+ /** Map of Class to enum. */
+ private static final Map<Class<?>, DBTypeEnum> map = new HashMap<>();
+
+ /**
+ * Initialize map of DB types.
+ */
+ static {
+ map.put(int.class, INT);
+ map.put(Integer.class, INT);
+ map.put(boolean.class, BOOL);
+ map.put(Boolean.class, BOOL);
+ map.put(byte.class, TINYINT);
+ map.put(Byte.class, TINYINT);
+ map.put(short.class, SMALLINT);
+ map.put(Short.class, SMALLINT);
+ map.put(long.class, BIGINT);
+ map.put(Long.class, BIGINT);
+ map.put(BigDecimal.class, DECIMAL);
+ map.put(double.class, DOUBLE);
+ map.put(Double.class, DOUBLE);
+ map.put(float.class, REAL);
+ map.put(Float.class, REAL);
+ map.put(Time.class, TIME);
+ map.put(Timestamp.class, TIMESTAMP);
+ map.put(java.util.Date.class, TIMESTAMP);
+ map.put(java.sql.Date.class, DATE);
+ map.put(char.class, CHAR);
+ map.put(Character.class, CHAR);
+ map.put(String.class, VARCHAR);
+ map.put(UUID.class, UUID);
+ map.put(byte[].class, BINARY);
+ }
+
+ /** */
+ private final String dbType;
+
+ /**
+ * Constructs new instance.
+ *
+ * @param dbType DB type name.
+ */
+ DBTypeEnum(String dbType) {
+ this.dbType = dbType;
+ }
+
+ /**
+ * Resolves enum by class.
+ *
+ * @param cls Class.
+ * @return Enum value.
+ */
+ public static DBTypeEnum fromClass(Class<?> cls) {
+ DBTypeEnum res = map.get(cls);
+
+ if (res != null)
+ return res;
+
+ if (DataType.isGeometryClass(cls))
+ return GEOMETRY;
+
+ return cls.isArray() && !cls.getComponentType().isPrimitive() ? ARRAY : OTHER;
+ }
+
+ /**
+ * Gets DB type name.
+ *
+ * @return DB type name.
+ */
+ public String dBTypeAsString() {
+ return dbType;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DBTypeEnum.class, this);
+ }
+ }
+
+ /**
+ * Information about table in database.
+ */
+ private class TableDescriptor implements GridH2Table.IndexesFactory {
+ /** */
+ private final String fullTblName;
+
+ /** */
+ private final GridQueryTypeDescriptor type;
+
+ /** */
+ private final String spaceName;
+
+ /** */
+ private final String schema;
+
+ /** */
+ private GridH2Table tbl;
+
+ /** */
+ private GridLuceneIndex luceneIdx;
+
+ /**
+ * @param spaceName Space name.
+ * @param type Type descriptor.
+ */
+ TableDescriptor(@Nullable String spaceName, GridQueryTypeDescriptor type) {
+ this.spaceName = spaceName;
+ this.type = type;
+
+ schema = GridH2Indexing.schema(spaceName);
+
+ fullTblName = '\"' + schema + "\"." + escapeName(type.name(), isEscapeAll(spaceName));
+ }
+
+ /**
+ * @return Schema name.
+ */
+ public String schema() {
+ return schema;
+ }
+
+ /**
+ * @return Database table name.
+ */
+ String fullTableName() {
+ return fullTblName;
+ }
+
+ /**
+ * @return Database table name.
+ */
+ String name() {
+ return type.name();
+ }
+
+ /**
+ * @return Type.
+ */
+ GridQueryTypeDescriptor type() {
+ return type;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TableDescriptor.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ArrayList<Index> createIndexes(GridH2Table tbl) {
+ this.tbl = tbl;
+
+ ArrayList<Index> idxs = new ArrayList<>();
+
+ idxs.add(new GridH2TreeIndex("_key_PK", tbl, true, KEY_COL, VAL_COL, tbl.indexColumn(0, ASCENDING)));
+
+ if (type().valueClass() == String.class) {
+ try {
+ luceneIdx = new GridLuceneIndex(marshaller, offheap, spaceName, type, true);
+ }
+ catch (GridException e1) {
+ throw new GridRuntimeException(e1);
+ }
+ }
+
+ for (Map.Entry<String, GridQueryIndexDescriptor> e : type.indexes().entrySet()) {
+ String name = e.getKey();
+ GridQueryIndexDescriptor idx = e.getValue();
+
+ if (idx.type() == FULLTEXT) {
+ try {
+ luceneIdx = new GridLuceneIndex(marshaller, offheap, spaceName, type, true);
+ }
+ catch (GridException e1) {
+ throw new GridRuntimeException(e1);
+ }
+ }
+ else {
+ IndexColumn[] cols = new IndexColumn[idx.fields().size()];
+
+ int i = 0;
+
+ boolean escapeAll = isEscapeAll(spaceName);
+
+ for (String field : idx.fields()) {
+ // H2 reserved keywords used as column name is case sensitive.
+ String fieldName = escapeAll ? field : escapeName(field, escapeAll).toUpperCase();
+
+ Column col = tbl.getColumn(fieldName);
+
+ cols[i++] = tbl.indexColumn(col.getColumnId(), idx.descending(field) ? DESCENDING : ASCENDING);
+ }
+
+ if (idx.type() == SORTED)
+ idxs.add(new GridH2TreeIndex(name, tbl, false, KEY_COL, VAL_COL, cols));
+ else if (idx.type() == GEO_SPATIAL)
+ idxs.add(new GridH2SpatialIndex(tbl, name, cols, KEY_COL, VAL_COL));
+ else
+ throw new IllegalStateException();
+ }
+ }
+
+ return idxs;
+ }
+ }
+
+ /**
+ * Special field set iterator based on database result set.
+ */
+ private static class FieldsIterator extends GridH2ResultSetIterator<List<?>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param data Data.
+ * @throws GridException If failed.
+ */
+ protected FieldsIterator(ResultSet data) throws GridException {
+ super(data);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<?> createRow() {
+ ArrayList<Object> res = new ArrayList<>(row.length);
+
+ Collections.addAll(res, row);
+
+ return res;
+ }
+ }
+
+ /**
+ * Special key/value iterator based on database result set.
+ */
+ private static class KeyValIterator<K, V> extends GridH2ResultSetIterator<IgniteBiTuple<K, V>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param data Data array.
+ * @throws GridException If failed.
+ */
+ protected KeyValIterator(ResultSet data) throws GridException {
+ super(data);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteBiTuple<K, V> createRow() {
+ K key = (K)row[0];
+ V val = (V)row[1];
+
+ return new IgniteBiTuple<>(key, val);
+ }
+ }
+
+ /**
+ * Field descriptor.
+ */
+ private static class SqlFieldMetadata implements GridQueryFieldMetadata {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Schema name. */
+ private String schemaName;
+
+ /** Type name. */
+ private String typeName;
+
+ /** Name. */
+ private String name;
+
+ /** Type. */
+ private String type;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public SqlFieldMetadata() {
+ // No-op
+ }
+
+ /**
+ * @param schemaName Schema name.
+ * @param typeName Type name.
+ * @param name Name.
+ * @param type Type.
+ */
+ SqlFieldMetadata(@Nullable String schemaName, @Nullable String typeName, String name, String type) {
+ assert name != null;
+ assert type != null;
+
+ this.schemaName = schemaName;
+ this.typeName = typeName;
+ this.name = name;
+ this.type = type;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String schemaName() {
+ return schemaName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String typeName() {
+ return typeName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String fieldName() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String fieldTypeName() {
+ return type;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, schemaName);
+ U.writeString(out, typeName);
+ U.writeString(out, name);
+ U.writeString(out, type);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ schemaName = U.readString(in);
+ typeName = U.readString(in);
+ name = U.readString(in);
+ type = U.readString(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SqlFieldMetadata.class, this);
+ }
+ }
+
+ /**
+ * Database schema object.
+ */
+ private static class Schema extends ConcurrentHashMap8<String, TableDescriptor> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final String spaceName;
+
+ /**
+ * @param spaceName Space name.
+ */
+ private Schema(@Nullable String spaceName) {
+ this.spaceName = spaceName;
+ }
+ }
+
+ /**
+ * Row descriptor.
+ */
+ private class RowDescriptor implements GridH2RowDescriptor {
+ /** */
+ private final GridQueryTypeDescriptor type;
+
+ /** */
+ private final String[] fields;
+
+ /** */
+ private final int[] fieldTypes;
+
+ /** */
+ private final int keyType;
+
+ /** */
+ private final int valType;
+
+ /** */
+ private final Schema schema;
+
+ /** */
+ private final int keyCols;
+
+ /** */
+ private final GridUnsafeGuard guard = offheap == null ? null : new GridUnsafeGuard();
+
+ /**
+ * @param type Type descriptor.
+ * @param schema Schema.
+ * @param keyAsObj Store key as java object.
+ */
+ RowDescriptor(GridQueryTypeDescriptor type, Schema schema, boolean keyAsObj) {
+ assert type != null;
+ assert schema != null;
+
+ this.type = type;
+ this.schema = schema;
+
+ keyCols = type.keyFields().size();
+
+ Map<String, Class<?>> allFields = new LinkedHashMap<>();
+
+ allFields.putAll(type.keyFields());
+ allFields.putAll(type.valueFields());
+
+ fields = allFields.keySet().toArray(new String[allFields.size()]);
+
+ fieldTypes = new int[fields.length];
+
+ Class[] classes = allFields.values().toArray(new Class[fields.length]);
+
+ for (int i = 0; i < fieldTypes.length; i++)
+ fieldTypes[i] = DataType.getTypeFromClass(classes[i]);
+
+ keyType = keyAsObj ? Value.JAVA_OBJECT : DataType.getTypeFromClass(type.keyClass());
+ valType = DataType.getTypeFromClass(type.valueClass());
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridUnsafeGuard guard() {
+ return guard;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cache(GridH2KeyValueRowOffheap row) {
+ long ptr = row.pointer();
+
+ assert ptr > 0 : ptr;
+
+ rowCache.put(ptr, row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void uncache(long ptr) {
+ rowCache.remove(ptr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridUnsafeMemory memory() {
+ return offheap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridH2Indexing owner() {
+ return GridH2Indexing.this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridH2AbstractKeyValueRow createRow(Object key, @Nullable Object val, long expirationTime)
+ throws GridException {
+ try {
+ return offheap == null ?
+ new GridH2KeyValueRowOnheap(this, key, keyType, val, valType, expirationTime) :
+ new GridH2KeyValueRowOffheap(this, key, keyType, val, valType, expirationTime);
+ }
+ catch (ClassCastException e) {
+ throw new GridException("Failed to convert key to SQL type. " +
+ "Please make sure that you always store each value type with the same key type or disable " +
+ "'defaultIndexFixedTyping' property on GridH2IndexingSpi.", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public Object readFromSwap(Object key) throws GridException {
+ GridCache<Object, ?> cache = ctx.cache().cache(schema.spaceName);
+
+ GridCacheContext cctx = ((GridCacheProxyImpl)cache).context();
+
+ if (cctx.isNear())
+ cctx = cctx.near().dht().context();
+
+ GridCacheSwapEntry e = cctx.swap().read(key);
+
+ return e != null ? e.value() : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int valueType() {
+ return valType;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int fieldsCount() {
+ return fields.length;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int fieldType(int col) {
+ return fieldTypes[col];
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object columnValue(Object obj, int col) {
+ try {
+ return type.value(obj, fields[col]);
+ }
+ catch (GridException e) {
+ throw DbException.convert(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isKeyColumn(int col) {
+ return keyCols > col;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean valueToString() {
+ return type.valueTextIndex();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridH2KeyValueRowOffheap createPointer(long ptr) {
+ GridH2KeyValueRowOffheap row = rowCache.get(ptr);
+
+ if (row != null) {
+ assert row.pointer() == ptr : ptr + " " + row.pointer();
+
+ return row;
+ }
+
+ return new GridH2KeyValueRowOffheap(this, ptr);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
new file mode 100644
index 0000000..a231144
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
@@ -0,0 +1,122 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2;
+
+import org.gridgain.grid.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.sql.*;
+import java.util.*;
+
+
+/**
+ * Iterator over result set.
+ */
+abstract class GridH2ResultSetIterator<T> extends GridCloseableIteratorAdapter<T> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final ResultSet data;
+
+ /** */
+ protected final Object[] row;
+
+ /** */
+ private boolean hasRow;
+
+ /**
+ * @param data Data array.
+ * @throws GridException If failed.
+ */
+ protected GridH2ResultSetIterator(ResultSet data) throws GridException {
+ this.data = data;
+
+ if (data != null) {
+ try {
+ row = new Object[data.getMetaData().getColumnCount()];
+ }
+ catch (SQLException e) {
+ throw new GridException(e);
+ }
+ }
+ else
+ row = null;
+ }
+
+ /**
+ * @return {@code true} If next row was fetched successfully.
+ */
+ private boolean fetchNext() {
+ if (data == null)
+ return false;
+
+ try {
+ if (!data.next())
+ return false;
+
+ for (int c = 0; c < row.length; c++)
+ row[c] = data.getObject(c + 1);
+
+ return true;
+ }
+ catch (SQLException e) {
+ throw new GridRuntimeException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onHasNext() {
+ return hasRow || (hasRow = fetchNext());
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException")
+ @Override public T onNext() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ hasRow = false;
+
+ return createRow();
+ }
+
+ /**
+ * @return Row.
+ */
+ protected abstract T createRow();
+
+ /** {@inheritDoc} */
+ @Override public void onRemove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClose() throws GridException {
+ if (data == null)
+ // Nothing to close.
+ return;
+
+ try {
+ U.closeQuiet(data.getStatement());
+ }
+ catch (SQLException e) {
+ throw new GridException(e);
+ }
+
+ U.closeQuiet(data);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString((Class<GridH2ResultSetIterator>)getClass(), this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
new file mode 100644
index 0000000..ee96666
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -0,0 +1,447 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import org.apache.ignite.spi.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.processors.query.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.h2.message.*;
+import org.h2.result.*;
+import org.h2.value.*;
+import org.jetbrains.annotations.*;
+
+import java.lang.ref.*;
+import java.math.*;
+import java.sql.Date;
+import java.sql.*;
+import java.util.*;
+
+/**
+ * Table row implementation based on {@link GridQueryTypeDescriptor}.
+ */
+public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
+ /** */
+ private static final int DEFAULT_COLUMNS_COUNT = 2;
+
+ /** Key column. */
+ public static final int KEY_COL = 0;
+
+ /** Value column. */
+ public static final int VAL_COL = 1;
+
+ /** */
+ protected final GridH2RowDescriptor desc;
+
+ /** */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ protected long expirationTime;
+
+ /**
+ * Constructor.
+ *
+ * @param desc Row descriptor.
+ * @param key Key.
+ * @param keyType Key type.
+ * @param val Value.
+ * @param valType Value type.
+ * @param expirationTime Expiration time.
+ * @throws IgniteSpiException If failed.
+ */
+ protected GridH2AbstractKeyValueRow(GridH2RowDescriptor desc, Object key, int keyType, @Nullable Object val,
+ int valType, long expirationTime) throws IgniteSpiException {
+ super(wrap(key, keyType),
+ val == null ? null : wrap(val, valType)); // We remove by key only, so value can be null here.
+
+ this.desc = desc;
+ this.expirationTime = expirationTime;
+ }
+
+ /**
+ * Protected constructor for {@link GridH2KeyValueRowOffheap}
+ *
+ * @param desc Row descriptor.
+ */
+ protected GridH2AbstractKeyValueRow(GridH2RowDescriptor desc) {
+ super(new Value[DEFAULT_COLUMNS_COUNT]);
+
+ this.desc = desc;
+ }
+
+ /**
+ * Wraps object to respective {@link Value}.
+ *
+ * @param obj Object.
+ * @param type Value type.
+ * @return Value.
+ * @throws IgniteSpiException If failed.
+ */
+ private static Value wrap(Object obj, int type) throws IgniteSpiException {
+ switch (type) {
+ case Value.BOOLEAN:
+ return ValueBoolean.get((Boolean)obj);
+ case Value.BYTE:
+ return ValueByte.get((Byte)obj);
+ case Value.SHORT:
+ return ValueShort.get((Short)obj);
+ case Value.INT:
+ return ValueInt.get((Integer)obj);
+ case Value.FLOAT:
+ return ValueFloat.get((Float)obj);
+ case Value.LONG:
+ return ValueLong.get((Long)obj);
+ case Value.DOUBLE:
+ return ValueDouble.get((Double)obj);
+ case Value.UUID:
+ UUID uuid = (UUID)obj;
+ return ValueUuid.get(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
+ case Value.DATE:
+ return ValueDate.get((Date)obj);
+ case Value.TIME:
+ return ValueTime.get((Time)obj);
+ case Value.TIMESTAMP:
+ if (obj instanceof java.util.Date && !(obj instanceof Timestamp))
+ obj = new Timestamp(((java.util.Date) obj).getTime());
+
+ return GridH2Utils.toValueTimestamp((Timestamp)obj);
+ case Value.DECIMAL:
+ return ValueDecimal.get((BigDecimal)obj);
+ case Value.STRING:
+ return ValueString.get(obj.toString());
+ case Value.BYTES:
+ return ValueBytes.get((byte[])obj);
+ case Value.JAVA_OBJECT:
+ return ValueJavaObject.getNoCopy(obj, null, null);
+ case Value.ARRAY:
+ Object[] arr = (Object[])obj;
+
+ Value[] valArr = new Value[arr.length];
+
+ for (int i = 0; i < arr.length; i++) {
+ Object o = arr[i];
+
+ valArr[i] = o == null ? ValueNull.INSTANCE : wrap(o, DataType.getTypeFromClass(o.getClass()));
+ }
+
+ return ValueArray.get(valArr);
+
+ case Value.GEOMETRY:
+ return ValueGeometry.getFromGeometry(obj);
+ }
+
+ throw new IgniteSpiException("Failed to wrap value[type=" + type + ", value=" + obj + "]");
+ }
+
+ /**
+ * @return Expiration time of respective cache entry.
+ */
+ public long expirationTime() {
+ return expirationTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getColumnCount() {
+ return DEFAULT_COLUMNS_COUNT + desc.fieldsCount();
+ }
+
+ /**
+ * Should be called to remove reference on value.
+ *
+ * @throws IgniteSpiException If failed.
+ */
+ public synchronized void onSwap() throws GridException {
+ setValue(VAL_COL, null);
+ }
+
+ /**
+ * Should be called when entry getting unswapped.
+ *
+ * @param val Value.
+ * @throws GridException If failed.
+ */
+ public synchronized void onUnswap(Object val) throws GridException {
+ setValue(VAL_COL, wrap(val, desc.valueType()));
+ }
+
+ /**
+ * Atomically updates weak value.
+ *
+ * @param exp Expected value.
+ * @param upd New value.
+ * @return Expected value if update succeeded, unexpected value otherwise.
+ */
+ protected synchronized Value updateWeakValue(Value exp, Value upd) {
+ Value res = super.getValue(VAL_COL);
+
+ if (res != exp && !(res instanceof WeakValue))
+ return res;
+
+ setValue(VAL_COL, new WeakValue(upd));
+
+ return exp;
+ }
+
+ /**
+ * @return Synchronized value.
+ */
+ protected synchronized Value syncValue() {
+ return super.getValue(VAL_COL);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Value getValue(int col) {
+ if (col < DEFAULT_COLUMNS_COUNT) {
+ Value v = super.getValue(col);
+
+ if (col == VAL_COL) {
+ while ((v = WeakValue.unwrap(v)) == null) {
+ v = getOffheapValue(VAL_COL);
+
+ if (v != null) {
+ setValue(VAL_COL, v);
+
+ if (super.getValue(KEY_COL) == null)
+ cache();
+
+ return v;
+ }
+
+ try {
+ Object valObj = desc.readFromSwap(getValue(KEY_COL).getObject());
+
+ if (valObj != null) {
+ Value upd = wrap(valObj, desc.valueType());
+
+ Value res = updateWeakValue(v, upd);
+
+ if (res == v) {
+ if (super.getValue(KEY_COL) == null)
+ cache();
+
+ return upd;
+ }
+
+ v = res;
+ }
+ else {
+ // If nothing found in swap then we should be already unswapped.
+ v = syncValue();
+ }
+ }
+ catch (GridException e) {
+ throw new GridRuntimeException(e);
+ }
+ }
+ }
+
+ if (v == null) {
+ assert col == KEY_COL : col;
+
+ v = getOffheapValue(KEY_COL);
+
+ assert v != null : v;
+
+ setValue(KEY_COL, v);
+
+ if (super.getValue(VAL_COL) == null)
+ cache();
+ }
+
+ assert !(v instanceof WeakValue) : v;
+
+ return v;
+ }
+
+ col -= DEFAULT_COLUMNS_COUNT;
+
+ assert col >= 0;
+
+ Value v = getValue(desc.isKeyColumn(col) ? KEY_COL : VAL_COL);
+
+ if (v == null)
+ return null;
+
+ Object obj = v.getObject();
+
+ Object res = desc.columnValue(obj, col);
+
+ if (res == null)
+ return ValueNull.INSTANCE;
+
+ try {
+ return wrap(res, desc.fieldType(col));
+ }
+ catch (IgniteSpiException e) {
+ throw DbException.convert(e);
+ }
+ }
+
+ /**
+ * Caches this row for reuse.
+ */
+ protected abstract void cache();
+
+ /**
+ * @param col Column.
+ * @return Value read from offheap memory or null if it is impossible.
+ */
+ protected abstract Value getOffheapValue(int col);
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ SB sb = new SB("Row@");
+
+ sb.a(Integer.toHexString(System.identityHashCode(this)));
+
+ Value v = super.getValue(KEY_COL);
+ sb.a("[ key: ").a(v == null ? "nil" : v.getString());
+
+ v = WeakValue.unwrap(super.getValue(VAL_COL));
+ sb.a(", val: ").a(v == null ? "nil" : v.getString());
+
+ sb.a(" ][ ");
+
+ if (v != null) {
+ for (int i = 2, cnt = getColumnCount(); i < cnt; i++) {
+ v = getValue(i);
+
+ if (i != 2)
+ sb.a(", ");
+
+ sb.a(v == null ? "nil" : v.getString());
+ }
+ }
+
+ sb.a(" ]");
+
+ return sb.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setKeyAndVersion(SearchRow old) {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setKey(long key) {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row getCopy() {
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setDeleted(boolean deleted) {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getKey() {
+ assert false;
+
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setSessionId(int sesId) {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setVersion(int ver) {
+ assert false;
+ }
+
+ /**
+ * Weak reference to value that was swapped but accessed in indexing SPI.
+ */
+ private static class WeakValue extends Value {
+ /**
+ * Unwraps value.
+ *
+ * @param v Value.
+ * @return Unwrapped value.
+ */
+ static Value unwrap(Value v) {
+ return (v instanceof WeakValue) ? ((WeakValue)v).get() : v;
+ }
+
+ /** */
+ private final WeakReference<Value> ref;
+
+ /**
+ * @param v Value.
+ */
+ private WeakValue(Value v) {
+ ref = new WeakReference<>(v);
+ }
+
+ /**
+ * @return Referenced value.
+ */
+ public Value get() {
+ return ref.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getSQL() {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getType() {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getPrecision() {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getDisplaySize() {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getString() {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object getObject() {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void set(PreparedStatement preparedStatement, int i) throws SQLException {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int compareSecure(Value val, CompareMode compareMode) {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ throw new IllegalStateException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Cursor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Cursor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Cursor.java
new file mode 100644
index 0000000..5cc3711
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Cursor.java
@@ -0,0 +1,62 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import org.h2.index.*;
+import org.h2.message.*;
+import org.h2.result.*;
+
+import java.util.*;
+
+/**
+ * H2 Cursor implementation.
+ */
+public class GridH2Cursor implements Cursor {
+ /** */
+ private Iterator<GridH2Row> iter;
+
+ /** */
+ private Row row;
+
+ /**
+ * Constructor.
+ *
+ * @param iter Rows iterator.
+ */
+ public GridH2Cursor(Iterator<GridH2Row> iter) {
+ this.iter = iter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row get() {
+ return row;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SearchRow getSearchRow() {
+ return get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean next() {
+ row = null;
+
+ if (iter.hasNext())
+ row = iter.next();
+
+ return row != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean previous() {
+ // Should never be called.
+ throw DbException.getUnsupportedException("previous");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
new file mode 100644
index 0000000..00cb06d
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
@@ -0,0 +1,198 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.opt;
+
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.indexing.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.h2.engine.*;
+import org.h2.index.*;
+import org.h2.message.*;
+import org.h2.result.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Index base.
+ */
+public abstract class GridH2IndexBase extends BaseIndex {
+ /** */
+ protected static final ThreadLocal<GridIndexingQueryFilter> filters = new ThreadLocal<>();
+
+ /** */
+ protected final int keyCol;
+
+ /** */
+ protected final int valCol;
+
+ /**
+ * @param keyCol Key column.
+ * @param valCol Value column.
+ */
+ protected GridH2IndexBase(int keyCol, int valCol) {
+ this.keyCol = keyCol;
+ this.valCol = valCol;
+ }
+
+ /**
+ * Sets key filters for current thread.
+ *
+ * @param fs Filters.
+ */
+ public static void setFiltersForThread(GridIndexingQueryFilter fs) {
+ filters.set(fs);
+ }
+
+ /**
+ * If the index supports rebuilding it has to creates its own copy.
+ *
+ * @return Rebuilt copy.
+ * @throws InterruptedException If interrupted.
+ */
+ public GridH2IndexBase rebuild() throws InterruptedException {
+ return this;
+ }
+
+ /**
+ * Put row if absent.
+ *
+ * @param row Row.
+ * @return Existing row or {@code null}.
+ */
+ public abstract GridH2Row put(GridH2Row row);
+
+ /**
+ * Remove row from index.
+ *
+ * @param row Row.
+ * @return Removed row.
+ */
+ public abstract GridH2Row remove(SearchRow row);
+
+ /**
+ * Takes or sets existing snapshot to be used in current thread.
+ *
+ * @param s Optional existing snapshot to use.
+ * @return Snapshot.
+ */
+ public Object takeSnapshot(@Nullable Object s) {
+ return s;
+ }
+
+ /**
+ * Releases snapshot for current thread.
+ */
+ public void releaseSnapshot() {
+ // No-op.
+ }
+
+ /**
+ * Filters rows from expired ones and using predicate.
+ *
+ * @param iter Iterator over rows.
+ * @return Filtered iterator.
+ */
+ protected Iterator<GridH2Row> filter(Iterator<GridH2Row> iter) {
+ IgniteBiPredicate<Object, Object> p = null;
+
+ GridIndexingQueryFilter f = filters.get();
+
+ if (f != null) {
+ String spaceName = ((GridH2Table)getTable()).spaceName();
+
+ p = f.forSpace(spaceName);
+ }
+
+ return new FilteringIterator(iter, U.currentTimeMillis(), p);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getDiskSpaceUsed() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkRename() {
+ throw DbException.getUnsupportedException("rename");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void add(Session ses, Row row) {
+ throw DbException.getUnsupportedException("add");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(Session ses, Row row) {
+ throw DbException.getUnsupportedException("remove row");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(Session ses) {
+ throw DbException.getUnsupportedException("remove index");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void truncate(Session ses) {
+ throw DbException.getUnsupportedException("truncate");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean needRebuild() {
+ return false;
+ }
+
+ /**
+ * Iterator which filters by expiration time and predicate.
+ */
+ protected class FilteringIterator extends GridFilteredIterator<GridH2Row> {
+ /** */
+ private final IgniteBiPredicate<Object, Object> fltr;
+
+ /** */
+ private final long time;
+
+ /**
+ * @param iter Iterator.
+ * @param time Time for expired rows filtering.
+ */
+ protected FilteringIterator(Iterator<GridH2Row> iter, long time,
+ IgniteBiPredicate<Object, Object> fltr) {
+ super(iter);
+
+ this.time = time;
+ this.fltr = fltr;
+ }
+
+ /**
+ * @param row Row.
+ * @return If this row was accepted.
+ */
+ @SuppressWarnings("unchecked")
+ @Override protected boolean accept(GridH2Row row) {
+ if (row instanceof GridH2AbstractKeyValueRow) {
+ if (((GridH2AbstractKeyValueRow) row).expirationTime() <= time)
+ return false;
+ }
+
+ if (fltr == null)
+ return true;
+
+ Object key = row.getValue(keyCol).getObject();
+ Object val = row.getValue(valCol).getObject();
+
+ assert key != null;
+ assert val != null;
+
+ return fltr.apply(key, val);
+ }
+ }
+}