You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/09 08:32:55 UTC

[06/24] incubator-ignite git commit: ignite-545: merge from ignite-sprint-6

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 1bc3409..200da77 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.processors.query.h2.opt.*;
 import org.apache.ignite.internal.processors.query.h2.sql.*;
 import org.apache.ignite.internal.processors.query.h2.twostep.*;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.offheap.unsafe.*;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.indexing.*;
@@ -63,6 +65,7 @@ import java.io.*;
 import java.lang.reflect.*;
 import java.math.*;
 import java.sql.*;
+import java.sql.Date;
 import java.text.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -291,13 +294,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param tblToUpdate Table to update.
      * @throws IgniteCheckedException In case of error.
      */
-    private void removeKey(@Nullable String spaceName, Object key, TableDescriptor tblToUpdate)
+    private void removeKey(@Nullable String spaceName, CacheObject key, TableDescriptor tblToUpdate)
         throws IgniteCheckedException {
         try {
             Collection<TableDescriptor> tbls = tables(schema(spaceName));
 
+            Class<?> keyCls = getClass(objectContext(spaceName), key);
+
             for (TableDescriptor tbl : tbls) {
-                if (tbl != tblToUpdate && tbl.type().keyClass().isAssignableFrom(key.getClass())) {
+                if (tbl != tblToUpdate && tbl.type().keyClass().isAssignableFrom(keyCls)) {
                     if (tbl.tbl.update(key, null, 0, true)) {
                         if (tbl.luceneIdx != null)
                             tbl.luceneIdx.remove(key);
@@ -350,8 +355,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, Object k, Object v, byte[] ver,
-        long expirationTime) throws IgniteCheckedException {
+    @Override public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, CacheObject k, CacheObject v,
+        byte[] ver, long expirationTime) throws IgniteCheckedException {
         TableDescriptor tbl = tableDescriptor(spaceName, type);
 
         removeKey(spaceName, k, tbl);
@@ -368,14 +373,63 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             tbl.luceneIdx.store(k, v, ver, expirationTime);
     }
 
+    /**
+     * @param o Object.
+     * @return {@code true} If it is a portable object.
+     */
+    private boolean isPortable(CacheObject o) {
+        if (ctx == null)
+            return false;
+
+        return ctx.cacheObjects().isPortableObject(o);
+    }
+
+    /**
+     * @param coctx Cache object context.
+     * @param o Object.
+     * @return Object class.
+     */
+    private Class<?> getClass(CacheObjectContext coctx, CacheObject o) {
+        return isPortable(o) ?
+            Object.class :
+            o.value(coctx, false).getClass();
+    }
+
+    /**
+     * @param space Space.
+     * @return Cache object context.
+     */
+    private CacheObjectContext objectContext(String space) {
+        if (ctx == null)
+            return null;
+
+        return ctx.cache().internalCache(space).context().cacheObjectContext();
+    }
+
+    /**
+     * @param space Space.
+     * @return Cache object context.
+     */
+    private GridCacheContext cacheContext(String space) {
+        if (ctx == null)
+            return null;
+
+        return ctx.cache().internalCache(space).context();
+    }
+
     /** {@inheritDoc} */
-    @Override public void remove(@Nullable String spaceName, Object key, Object val) throws IgniteCheckedException {
+    @Override public void remove(@Nullable String spaceName, CacheObject key, CacheObject val) throws IgniteCheckedException {
         if (log.isDebugEnabled())
-            log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ']');
+            log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ", val=" + val + ']');
+
+        CacheObjectContext coctx = objectContext(spaceName);
+
+        Class<?> keyCls = getClass(coctx, key);
+        Class<?> valCls = val == null ? null : getClass(coctx, val);
 
         for (TableDescriptor tbl : tables(schema(spaceName))) {
-            if (tbl.type().keyClass().isAssignableFrom(key.getClass())
-                && (val == null || tbl.type().valueClass().isAssignableFrom(val.getClass()))) {
+            if (tbl.type().keyClass().isAssignableFrom(keyCls)
+                && (val == null || tbl.type().valueClass().isAssignableFrom(valCls))) {
                 if (tbl.tbl.update(key, val, 0, true)) {
                     if (tbl.luceneIdx != null)
                         tbl.luceneIdx.remove(key);
@@ -387,14 +441,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteCheckedException {
+    @Override public void onSwap(@Nullable String spaceName, CacheObject key) throws IgniteCheckedException {
         Schema schema = schemas.get(schema(spaceName));
 
         if (schema == null)
             return;
 
+        Class<?> keyCls = getClass(objectContext(spaceName), key);
+
         for (TableDescriptor tbl : schema.tbls.values()) {
-            if (tbl.type().keyClass().isAssignableFrom(key.getClass())) {
+            if (tbl.type().keyClass().isAssignableFrom(keyCls)) {
                 try {
                     if (tbl.tbl.onSwap(key))
                         return;
@@ -407,13 +463,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes)
+    @Override public void onUnswap(@Nullable String spaceName, CacheObject key, CacheObject val)
         throws IgniteCheckedException {
         assert val != null;
 
+        CacheObjectContext coctx = objectContext(spaceName);
+
+        Class<?> keyCls = getClass(coctx, key);
+        Class<?> valCls = getClass(coctx, val);
+
         for (TableDescriptor tbl : tables(schema(spaceName))) {
-            if (tbl.type().keyClass().isAssignableFrom(key.getClass())
-                && tbl.type().valueClass().isAssignableFrom(val.getClass())) {
+            if (tbl.type().keyClass().isAssignableFrom(keyCls)
+                && tbl.type().valueClass().isAssignableFrom(valCls)) {
                 try {
                     if (tbl.tbl.onUnswap(key, val))
                         return;
@@ -514,7 +575,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     meta = meta(rs.getMetaData());
                 }
                 catch (SQLException e) {
-                    throw new IgniteSpiException("Failed to get meta data.", e);
+                    throw new IgniteCheckedException("Failed to get meta data.", e);
                 }
             }
 
@@ -1114,6 +1175,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             log.debug("Starting cache query index...");
 
         System.setProperty("h2.serializeJavaObject", "false");
+        System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching.
 
         if (SysProperties.serializeJavaObject) {
             U.warn(log, "Serialization of Java objects in H2 was enabled.");
@@ -1259,6 +1321,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             throw new IgniteCheckedException("Cache already registered: " + U.maskName(ccfg.getName()));
 
         createSchema(schema);
+
+        executeStatement(schema, "CREATE ALIAS " + GridSqlQuerySplitter.TABLE_FUNC_NAME +
+            " NOBUFFER FOR \"" + GridReduceQueryExecutor.class.getName() + ".mergeTableFunction\"");
+
         createSqlFunctions(schema, ccfg.getSqlFunctionClasses());
     }
 
@@ -1297,6 +1363,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         };
     }
 
+    /** {@inheritDoc} */
+    @Override public MessageFactory messageFactory() {
+        return new GridH2ValueMessageFactory();
+    }
+
     /**
      * Wrapper to store connection and flag is schema set or not.
      */
@@ -1341,6 +1412,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean isSqlType(Class<?> cls) {
+        switch (DBTypeEnum.fromClass(cls)) {
+            case OTHER:
+            case ARRAY:
+                return false;
+
+            default:
+                return true;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isGeometryClass(Class<?> cls) {
+        return DataType.isGeometryClass(cls);
+    }
+
     /**
      * Enum that helps to map java types to database types.
      */
@@ -1551,7 +1639,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             if (type().valueClass() == String.class) {
                 try {
-                    luceneIdx = new GridLuceneIndex(ctx, marshaller, schema.offheap, schema.spaceName, type, true);
+                    luceneIdx = new GridLuceneIndex(ctx, schema.offheap, schema.spaceName, type);
                 }
                 catch (IgniteCheckedException e1) {
                     throw new IgniteException(e1);
@@ -1564,7 +1652,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                 if (idx.type() == FULLTEXT) {
                     try {
-                        luceneIdx = new GridLuceneIndex(ctx, marshaller, schema.offheap, schema.spaceName, type, true);
+                        luceneIdx = new GridLuceneIndex(ctx, schema.offheap, schema.spaceName, type);
                     }
                     catch (IgniteCheckedException e1) {
                         throw new IgniteException(e1);
@@ -1907,7 +1995,75 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
 
         /** {@inheritDoc} */
-        @Override public GridH2Row createRow(Object key, @Nullable Object val, long expirationTime)
+        @Override public Value wrap(Object obj, int type) throws IgniteCheckedException {
+            assert obj != null;
+
+            if (obj instanceof CacheObject) { // Handle cache object.
+                CacheObject co = (CacheObject)obj;
+
+                if (type == Value.JAVA_OBJECT)
+                    return new GridH2ValueCacheObject(cacheContext(schema.spaceName), co);
+
+                obj = co.value(objectContext(schema.spaceName), false);
+            }
+
+            switch (type) {
+                case Value.BOOLEAN:
+                    return ValueBoolean.get((Boolean)obj);
+                case Value.BYTE:
+                    return ValueByte.get((Byte)obj);
+                case Value.SHORT:
+                    return ValueShort.get((Short)obj);
+                case Value.INT:
+                    return ValueInt.get((Integer)obj);
+                case Value.FLOAT:
+                    return ValueFloat.get((Float)obj);
+                case Value.LONG:
+                    return ValueLong.get((Long)obj);
+                case Value.DOUBLE:
+                    return ValueDouble.get((Double)obj);
+                case Value.UUID:
+                    UUID uuid = (UUID)obj;
+                    return ValueUuid.get(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
+                case Value.DATE:
+                    return ValueDate.get((Date)obj);
+                case Value.TIME:
+                    return ValueTime.get((Time)obj);
+                case Value.TIMESTAMP:
+                    if (obj instanceof java.util.Date && !(obj instanceof Timestamp))
+                        obj = new Timestamp(((java.util.Date) obj).getTime());
+
+                    return GridH2Utils.toValueTimestamp((Timestamp)obj);
+                case Value.DECIMAL:
+                    return ValueDecimal.get((BigDecimal)obj);
+                case Value.STRING:
+                    return ValueString.get(obj.toString());
+                case Value.BYTES:
+                    return ValueBytes.get((byte[])obj);
+                case Value.JAVA_OBJECT:
+                    return ValueJavaObject.getNoCopy(obj, null, null);
+                case Value.ARRAY:
+                    Object[] arr = (Object[])obj;
+
+                    Value[] valArr = new Value[arr.length];
+
+                    for (int i = 0; i < arr.length; i++) {
+                        Object o = arr[i];
+
+                        valArr[i] = o == null ? ValueNull.INSTANCE : wrap(o, DataType.getTypeFromClass(o.getClass()));
+                    }
+
+                    return ValueArray.get(valArr);
+
+                case Value.GEOMETRY:
+                    return ValueGeometry.getFromGeometry(obj);
+            }
+
+            throw new IgniteCheckedException("Failed to wrap value[type=" + type + ", value=" + obj + "]");
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridH2Row createRow(CacheObject key, @Nullable CacheObject val, long expirationTime)
             throws IgniteCheckedException {
             try {
                 if (val == null) // Only can happen for remove operation, can create simple search row.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index 4a0809a..6e95710 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -20,17 +20,14 @@ package org.apache.ignite.internal.processors.query.h2.opt;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.*;
 import org.h2.message.*;
 import org.h2.result.*;
 import org.h2.value.*;
 import org.jetbrains.annotations.*;
 
 import java.lang.ref.*;
-import java.math.*;
-import java.sql.Date;
 import java.sql.*;
-import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Table row implementation based on {@link GridQueryTypeDescriptor}.
@@ -61,12 +58,16 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
      * @param val Value.
      * @param valType Value type.
      * @param expirationTime Expiration time.
-     * @throws IgniteSpiException If failed.
+     * @throws IgniteCheckedException If failed.
      */
     protected GridH2AbstractKeyValueRow(GridH2RowDescriptor desc, Object key, int keyType, @Nullable Object val,
-        int valType, long expirationTime) throws IgniteSpiException {
-        super(wrap(key, keyType),
-            val == null ? null : wrap(val, valType)); // We remove by key only, so value can be null here.
+        int valType, long expirationTime) throws IgniteCheckedException {
+        super(null, null);
+
+        setValue(KEY_COL, desc.wrap(key, keyType));
+
+        if (val != null) // We remove by key only, so value can be null here.
+            setValue(VAL_COL, desc.wrap(val, valType));
 
         this.desc = desc;
         this.expirationTime = expirationTime;
@@ -84,72 +85,6 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
     }
 
     /**
-     * Wraps object to respective {@link Value}.
-     *
-     * @param obj Object.
-     * @param type Value type.
-     * @return Value.
-     * @throws IgniteSpiException If failed.
-     */
-    public static Value wrap(Object obj, int type) throws IgniteSpiException {
-        assert obj != null;
-
-        switch (type) {
-            case Value.BOOLEAN:
-                return ValueBoolean.get((Boolean)obj);
-            case Value.BYTE:
-                return ValueByte.get((Byte)obj);
-            case Value.SHORT:
-                return ValueShort.get((Short)obj);
-            case Value.INT:
-                return ValueInt.get((Integer)obj);
-            case Value.FLOAT:
-                return ValueFloat.get((Float)obj);
-            case Value.LONG:
-                return ValueLong.get((Long)obj);
-            case Value.DOUBLE:
-                return ValueDouble.get((Double)obj);
-            case Value.UUID:
-                UUID uuid = (UUID)obj;
-                return ValueUuid.get(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
-            case Value.DATE:
-                return ValueDate.get((Date)obj);
-            case Value.TIME:
-                return ValueTime.get((Time)obj);
-            case Value.TIMESTAMP:
-                if (obj instanceof java.util.Date && !(obj instanceof Timestamp))
-                    obj = new Timestamp(((java.util.Date) obj).getTime());
-
-                return GridH2Utils.toValueTimestamp((Timestamp)obj);
-            case Value.DECIMAL:
-                return ValueDecimal.get((BigDecimal)obj);
-            case Value.STRING:
-                return ValueString.get(obj.toString());
-            case Value.BYTES:
-                return ValueBytes.get((byte[])obj);
-            case Value.JAVA_OBJECT:
-                return ValueJavaObject.getNoCopy(obj, null, null);
-            case Value.ARRAY:
-                Object[] arr = (Object[])obj;
-
-                Value[] valArr = new Value[arr.length];
-
-                for (int i = 0; i < arr.length; i++) {
-                    Object o = arr[i];
-
-                    valArr[i] = o == null ? ValueNull.INSTANCE : wrap(o, DataType.getTypeFromClass(o.getClass()));
-                }
-
-                return ValueArray.get(valArr);
-
-            case Value.GEOMETRY:
-                return ValueGeometry.getFromGeometry(obj);
-        }
-
-        throw new IgniteSpiException("Failed to wrap value[type=" + type + ", value=" + obj + "]");
-    }
-
-    /**
      * @return Expiration time of respective cache entry.
      */
     public long expirationTime() {
@@ -164,7 +99,7 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
     /**
      * Should be called to remove reference on value.
      *
-     * @throws IgniteSpiException If failed.
+     * @throws IgniteCheckedException If failed.
      */
     public synchronized void onSwap() throws IgniteCheckedException {
         setValue(VAL_COL, null);
@@ -178,7 +113,7 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
      * @throws IgniteCheckedException If failed.
      */
     public synchronized void onUnswap(Object val, boolean beforeRmv) throws IgniteCheckedException {
-        setValue(VAL_COL, wrap(val, desc.valueType()));
+        setValue(VAL_COL, desc.wrap(val, desc.valueType()));
 
         notifyAll();
     }
@@ -203,20 +138,27 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
     }
 
     /**
-     * @param attempt Attempt.
+     * @param waitTime Time to await for value unswap.
      * @return Synchronized value.
      */
-    protected synchronized Value syncValue(int attempt) {
+    protected synchronized Value syncValue(long waitTime) {
         Value v = peekValue(VAL_COL);
 
-        if (v == null && attempt != 0) {
+        while (v == null && waitTime > 0) {
+            long start = System.nanoTime(); // This call must be quite rare, so performance is not a concern.
+
             try {
-                wait(attempt);
+                wait(waitTime); // Wait for value arrival to allow other threads to make a progress.
             }
             catch (InterruptedException e) {
                 throw new IgniteInterruptedException(e);
             }
 
+            long t = System.nanoTime() - start;
+
+            if (t > 0)
+                waitTime -= TimeUnit.NANOSECONDS.toMillis(t);
+
             v = peekValue(VAL_COL);
         }
 
@@ -258,7 +200,7 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
                         Object valObj = desc.readFromSwap(k);
 
                         if (valObj != null) {
-                            Value upd = wrap(valObj, desc.valueType());
+                            Value upd = desc.wrap(valObj, desc.valueType());
 
                             v = updateWeakValue(upd);
 
@@ -277,7 +219,7 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
 
                     if (start == 0)
                         start = U.currentTimeMillis();
-                    else if (U.currentTimeMillis() - start > 15_000) // Loop for at most 15 seconds.
+                    else if (U.currentTimeMillis() - start > 60_000) // Loop for at most 60 seconds.
                         throw new IgniteException("Failed to get value for key: " + k +
                             ". This can happen due to a long GC pause.");
                 }
@@ -317,9 +259,9 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
             return ValueNull.INSTANCE;
 
         try {
-            return wrap(res, desc.fieldType(col));
+            return desc.wrap(res, desc.fieldType(col));
         }
-        catch (IgniteSpiException e) {
+        catch (IgniteCheckedException e) {
             throw DbException.convert(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
index 9c2c1b2..f89591a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
@@ -21,7 +21,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.offheap.unsafe.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.*;
 import org.h2.store.*;
 import org.h2.value.*;
 import org.jetbrains.annotations.*;
@@ -87,10 +86,10 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
      * @param val Value.
      * @param valType Value type.
      * @param expirationTime Expiration time.
-     * @throws IgniteSpiException If failed.
+     * @throws IgniteCheckedException If failed.
      */
     public GridH2KeyValueRowOffheap(GridH2RowDescriptor desc, Object key, int keyType, @Nullable Object val, int valType,
-        long expirationTime) throws IgniteSpiException {
+        long expirationTime) throws IgniteCheckedException {
         super(desc, key, keyType, val, valType, expirationTime);
     }
 
@@ -247,7 +246,7 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
             Value v = peekValue(VAL_COL);
 
             if (v == null) {
-                setValue(VAL_COL, wrap(val, desc.valueType()));
+                setValue(VAL_COL, desc.wrap(val, desc.valueType()));
 
                 v = peekValue(VAL_COL);
             }
@@ -273,8 +272,8 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
     }
 
     /** {@inheritDoc} */
-    @Override protected Value syncValue(int attempt) {
-        Value v = super.syncValue(attempt);
+    @Override protected Value syncValue(long waitTime) {
+        Value v = super.syncValue(waitTime);
 
         if (v != null)
             return v;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
index e998d9b..ee7c79e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.query.h2.opt;
 
 
-import org.apache.ignite.spi.*;
+import org.apache.ignite.*;
 import org.h2.value.*;
 import org.jetbrains.annotations.*;
 
@@ -35,10 +35,10 @@ public class GridH2KeyValueRowOnheap extends GridH2AbstractKeyValueRow {
      * @param val Value.
      * @param valType Value type.
      * @param expirationTime Expiration time.
-     * @throws IgniteSpiException If failed.
+     * @throws IgniteCheckedException If failed.
      */
     public GridH2KeyValueRowOnheap(GridH2RowDescriptor desc, Object key, int keyType, @Nullable Object val, int valType,
-        long expirationTime) throws IgniteSpiException {
+        long expirationTime) throws IgniteCheckedException {
         super(desc, key, keyType, val, valType, expirationTime);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index c5f9551..cd65ab3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.processors.query.h2.opt;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.query.h2.*;
 import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.h2.value.*;
 import org.jetbrains.annotations.*;
 
 /**
@@ -40,7 +42,7 @@ public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory<Grid
      * @return Row.
      * @throws IgniteCheckedException If failed.
      */
-    public GridH2Row createRow(Object key, @Nullable Object val, long expirationTime)
+    public GridH2Row createRow(CacheObject key, @Nullable CacheObject val, long expirationTime)
         throws IgniteCheckedException;
 
     /**
@@ -97,4 +99,14 @@ public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory<Grid
      * @return Guard.
      */
     public GridUnsafeGuard guard();
+
+    /**
+     * Wraps object to respective {@link Value}.
+     *
+     * @param o Object.
+     * @param type Value type.
+     * @return Value.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Value wrap(Object o, int type) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 62b3b5e..92991af 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.h2.opt;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.offheap.unsafe.*;
 import org.h2.api.*;
 import org.h2.command.ddl.*;
@@ -105,7 +106,7 @@ public class GridH2Table extends TableBase {
      * @return {@code true} If row was found.
      * @throws IgniteCheckedException If failed.
      */
-    public boolean onSwap(Object key) throws IgniteCheckedException {
+    public boolean onSwap(CacheObject key) throws IgniteCheckedException {
         return onSwapUnswap(key, null);
     }
 
@@ -117,7 +118,7 @@ public class GridH2Table extends TableBase {
      * @return {@code true} If row was found.
      * @throws IgniteCheckedException If failed.
      */
-    public boolean onUnswap(Object key, Object val) throws IgniteCheckedException {
+    public boolean onUnswap(CacheObject key, CacheObject val) throws IgniteCheckedException {
         assert val != null : "Key=" + key;
 
         return onSwapUnswap(key, val);
@@ -132,7 +133,7 @@ public class GridH2Table extends TableBase {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("LockAcquiredButNotSafelyReleased")
-    private boolean onSwapUnswap(Object key, @Nullable Object val) throws IgniteCheckedException {
+    private boolean onSwapUnswap(CacheObject key, @Nullable CacheObject val) throws IgniteCheckedException {
         assert key != null;
 
         GridH2TreeIndex pk = pk();
@@ -300,7 +301,8 @@ public class GridH2Table extends TableBase {
      * @return {@code true} If operation succeeded.
      * @throws IgniteCheckedException If failed.
      */
-    public boolean update(Object key, Object val, long expirationTime, boolean rmv) throws IgniteCheckedException {
+    public boolean update(CacheObject key, CacheObject val, long expirationTime, boolean rmv)
+        throws IgniteCheckedException {
         assert desc != null;
 
         GridH2Row row = desc.createRow(key, val, expirationTime);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java
new file mode 100644
index 0000000..0a37674
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.*;
+import org.h2.message.*;
+import org.h2.util.*;
+import org.h2.value.*;
+
+import java.sql.*;
+
+/**
+ * H2 Value over {@link CacheObject}. Replacement for {@link ValueJavaObject}.
+ */
+public class GridH2ValueCacheObject extends Value {
+    /** */
+    private CacheObject obj;
+
+    /** */
+    private GridCacheContext<?,?> cctx;
+
+    /**
+     * @param cctx Cache context.
+     * @param obj Object.
+     */
+    public GridH2ValueCacheObject(GridCacheContext<?,?> cctx, CacheObject obj) {
+        assert obj != null;
+
+        this.obj = obj;
+        this.cctx = cctx; // Allowed to be null in tests.
+    }
+
+    /**
+     * @return Cache object.
+     */
+    public CacheObject getCacheObject() {
+        return obj;
+    }
+
+    /**
+     * @return Cache context.
+     */
+    public GridCacheContext<?,?> getCacheContext() {
+        return cctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getSQL() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getType() {
+        return Value.JAVA_OBJECT;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getPrecision() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getDisplaySize() {
+        return 64;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getString() {
+        return getObject().toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] getBytes() {
+        return Utils.cloneByteArray(getBytesNoCopy());
+    }
+
+    /**
+     * @return Cache object context.
+     */
+    private CacheObjectContext objectContext() {
+        return cctx == null ? null : cctx.cacheObjectContext();
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] getBytesNoCopy() {
+        if (obj.type() == CacheObject.TYPE_REGULAR) {
+            // Result must be the same as `marshaller.marshall(obj.value(coctx, false));`
+            try {
+                return obj.valueBytes(objectContext());
+            }
+            catch (IgniteCheckedException e) {
+                throw DbException.convert(e);
+            }
+        }
+
+        // For portables and byte array cache object types.
+        return Utils.serialize(obj.value(objectContext(), false), null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object getObject() {
+        return obj.value(objectContext(), false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void set(PreparedStatement prep, int parameterIndex) throws SQLException {
+        prep.setObject(parameterIndex, getObject(), Types.JAVA_OBJECT);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected int compareSecure(Value v, CompareMode mode) {
+        Object o1 = getObject();
+        Object o2 = v.getObject();
+
+        boolean o1Comparable = o1 instanceof Comparable;
+        boolean o2Comparable = o2 instanceof Comparable;
+
+        if (o1Comparable && o2Comparable &&
+            Utils.haveCommonComparableSuperclass(o1.getClass(), o2.getClass())) {
+            Comparable<Object> c1 = (Comparable<Object>)o1;
+
+            return c1.compareTo(o2);
+        }
+
+        // Group by types.
+        if (o1.getClass() != o2.getClass()) {
+            if (o1Comparable != o2Comparable)
+                return o1Comparable ? -1 : 1;
+
+            return o1.getClass().getName().compareTo(o2.getClass().getName());
+        }
+
+        // Compare hash codes.
+        int h1 = hashCode();
+        int h2 = v.hashCode();
+
+        if (h1 == h2) {
+            if (o1.equals(o2))
+                return 0;
+
+            return Utils.compareNotNullSigned(getBytesNoCopy(), v.getBytesNoCopy());
+        }
+
+        return h1 > h2 ? 1 : -1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return getObject().hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object other) {
+        if (!(other instanceof Value))
+            return false;
+
+        Value otherVal = (Value)other;
+
+        return otherVal.getType() == Value.JAVA_OBJECT
+            && getObject().equals(otherVal.getObject());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Value convertPrecision(long precision, boolean force) {
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getMemory() {
+        return 0;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
index c6d597f..f2f11be 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
@@ -19,13 +19,14 @@ package org.apache.ignite.internal.processors.query.h2.opt;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.offheap.unsafe.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.*;
 import org.apache.ignite.spi.indexing.*;
 import org.apache.lucene.analysis.standard.*;
 import org.apache.lucene.document.*;
@@ -33,6 +34,7 @@ import org.apache.lucene.index.*;
 import org.apache.lucene.queryParser.*;
 import org.apache.lucene.search.*;
 import org.apache.lucene.util.*;
+import org.h2.util.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -55,9 +57,6 @@ public class GridLuceneIndex implements Closeable {
     public static final String EXPIRATION_TIME_FIELD_NAME = "_gg_expires__";
 
     /** */
-    private final Marshaller marshaller;
-
-    /** */
     private final String spaceName;
 
     /** */
@@ -70,9 +69,6 @@ public class GridLuceneIndex implements Closeable {
     private final String[] idxdFields;
 
     /** */
-    private final boolean storeVal;
-
-    /** */
     private final AtomicLong updateCntr = new GridAtomicLong();
 
     /** */
@@ -85,20 +81,16 @@ public class GridLuceneIndex implements Closeable {
      * Constructor.
      *
      * @param ctx Kernal context.
-     * @param marshaller Indexing marshaller.
      * @param mem Unsafe memory.
      * @param spaceName Space name.
      * @param type Type descriptor.
-     * @param storeVal Store value in index.
      * @throws IgniteCheckedException If failed.
      */
-    public GridLuceneIndex(GridKernalContext ctx, Marshaller marshaller, @Nullable GridUnsafeMemory mem,
-        @Nullable String spaceName, GridQueryTypeDescriptor type, boolean storeVal) throws IgniteCheckedException {
+    public GridLuceneIndex(GridKernalContext ctx, @Nullable GridUnsafeMemory mem,
+        @Nullable String spaceName, GridQueryTypeDescriptor type) throws IgniteCheckedException {
         this.ctx = ctx;
-        this.marshaller = marshaller;
         this.spaceName = spaceName;
         this.type = type;
-        this.storeVal = storeVal;
 
         dir = new GridLuceneDirectory(mem == null ? new GridUnsafeMemory(0) : mem);
 
@@ -137,15 +129,30 @@ public class GridLuceneIndex implements Closeable {
     }
 
     /**
+     * @return Cache object context.
+     */
+    private CacheObjectContext objectContext() {
+        if (ctx == null)
+            return null;
+
+        return ctx.cache().internalCache(spaceName).context().cacheObjectContext();
+    }
+
+    /**
      * Stores given data in this fulltext index.
      *
-     * @param key Key.
-     * @param val Value.
+     * @param k Key.
+     * @param v Value.
      * @param ver Version.
      * @param expires Expiration time.
      * @throws IgniteCheckedException If failed.
      */
-    public void store(Object key, Object val, byte[] ver, long expires) throws IgniteCheckedException {
+    public void store(CacheObject k, CacheObject v, byte[] ver, long expires) throws IgniteCheckedException {
+        CacheObjectContext coctx = objectContext();
+
+        Object key = k.value(coctx, false);
+        Object val = v.value(coctx, false);
+
         Document doc = new Document();
 
         boolean stringsFound = false;
@@ -166,7 +173,7 @@ public class GridLuceneIndex implements Closeable {
             }
         }
 
-        String keyStr = org.apache.commons.codec.binary.Base64.encodeBase64String(marshaller.marshal(key));
+        String keyStr = org.apache.commons.codec.binary.Base64.encodeBase64String(k.valueBytes(coctx));
 
         try {
             // Delete first to avoid duplicates.
@@ -177,8 +184,8 @@ public class GridLuceneIndex implements Closeable {
 
             doc.add(new Field(KEY_FIELD_NAME, keyStr, Field.Store.YES, Field.Index.NOT_ANALYZED));
 
-            if (storeVal && type.valueClass() != String.class)
-                doc.add(new Field(VAL_FIELD_NAME, marshaller.marshal(val)));
+            if (type.valueClass() != String.class)
+                doc.add(new Field(VAL_FIELD_NAME, v.valueBytes(coctx)));
 
             doc.add(new Field(VER_FIELD_NAME, ver));
 
@@ -201,9 +208,10 @@ public class GridLuceneIndex implements Closeable {
      * @param key Key.
      * @throws IgniteCheckedException If failed.
      */
-    public void remove(Object key) throws IgniteCheckedException {
+    public void remove(CacheObject key) throws IgniteCheckedException {
         try {
-            writer.deleteDocuments(new Term(KEY_FIELD_NAME, org.apache.commons.codec.binary.Base64.encodeBase64String(marshaller.marshal(key))));
+            writer.deleteDocuments(new Term(KEY_FIELD_NAME,
+                org.apache.commons.codec.binary.Base64.encodeBase64String(key.valueBytes(objectContext()))));
         }
         catch (IOException e) {
             throw new IgniteCheckedException(e);
@@ -297,6 +305,9 @@ public class GridLuceneIndex implements Closeable {
         /** */
         private IgniteBiTuple<K, V> curr;
 
+        /** */
+        private CacheObjectContext coctx;
+
         /**
          * Constructor.
          *
@@ -313,6 +324,8 @@ public class GridLuceneIndex implements Closeable {
             this.docs = docs;
             this.filters = filters;
 
+            coctx = objectContext();
+
             findNext();
         }
 
@@ -328,10 +341,25 @@ public class GridLuceneIndex implements Closeable {
         }
 
         /**
+         * @param bytes Bytes.
+         * @param ldr Class loader.
+         * @return Object.
+         * @throws IgniteCheckedException If failed.
+         */
+        @SuppressWarnings("unchecked")
+        private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws IgniteCheckedException {
+            if (coctx == null) // For tests.
+                return (Z)Utils.deserialize(bytes, null);
+
+            return (Z)coctx.processor().unmarshal(coctx, bytes, ldr);
+        }
+
+        /**
          * Finds next element.
          *
          * @throws IgniteCheckedException If failed.
          */
+        @SuppressWarnings("unchecked")
         private void findNext() throws IgniteCheckedException {
             curr = null;
 
@@ -345,26 +373,22 @@ public class GridLuceneIndex implements Closeable {
                     throw new IgniteCheckedException(e);
                 }
 
-                String keyStr = doc.get(KEY_FIELD_NAME);
-
                 ClassLoader ldr = null;
 
                 if (ctx != null && ctx.deploy().enabled())
                     ldr = ctx.cache().internalCache(spaceName).context().deploy().globalLoader();
 
-                K k = marshaller.unmarshal(org.apache.commons.codec.binary.Base64.decodeBase64(keyStr), ldr);
+                K k = unmarshall(org.apache.commons.codec.binary.Base64.decodeBase64(doc.get(KEY_FIELD_NAME)), ldr);
 
-                byte[] valBytes = doc.getBinaryValue(VAL_FIELD_NAME);
+                V v = type.valueClass() == String.class ?
+                    (V)doc.get(VAL_STR_FIELD_NAME) :
+                    this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME), ldr);
 
-                V v = valBytes != null ? marshaller.<V>unmarshal(valBytes, ldr) :
-                    type.valueClass() == String.class ?
-                    (V)doc.get(VAL_STR_FIELD_NAME): null;
+                assert v != null;
 
                 if (!filter(k, v))
                     continue;
 
-//                byte[] ver = doc.getBinaryValue(VER_FIELD_NAME); TODO rm version
-
                 curr = new IgniteBiTuple<>(k, v);
 
                 break;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
index 7aefbec..a071e73 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
@@ -131,7 +131,7 @@ public enum GridSqlOperationType {
         @Override public String getSql(GridSqlOperation operation) {
             assert operation.opType().childrenCnt == 2;
 
-            return "(INTERSECTS(" + operation.child(0) + ", " + operation.child(1) + "))";
+            return "(INTERSECTS(" + operation.child(0).getSQL() + ", " + operation.child(1).getSQL() + "))";
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
index c40632f..207588e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
@@ -37,6 +37,26 @@ public abstract class GridSqlQuery implements Cloneable {
     /** */
     protected GridSqlElement limit;
 
+    /** */
+    private boolean explain;
+
+    /**
+     * @param explain Explain.
+     * @return {@code this}.
+     */
+    public GridSqlQuery explain(boolean explain) {
+        this.explain = explain;
+
+        return this;
+    }
+
+    /**
+     * @return {@code true} If explain.
+     */
+    public boolean explain() {
+        return explain;
+    }
+
     /**
      * @return Offset.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index 62ec72f..74e4748 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -158,6 +158,9 @@ public class GridSqlQueryParser {
     private static final Getter<SelectUnion, SortOrder> UNION_SORT = getter(SelectUnion.class, "sort");
 
     /** */
+    private static final Getter<Explain, Prepared> EXPLAIN_COMMAND = getter(Explain.class, "command");
+
+    /** */
     private static volatile Getter<Command,Prepared> prepared;
 
     /** */
@@ -182,7 +185,7 @@ public class GridSqlQueryParser {
 
         Prepared statement = p.get(cmd);
 
-        return new GridSqlQueryParser().parse((Query)statement);
+        return new GridSqlQueryParser().parse(statement);
     }
 
     /**
@@ -321,13 +324,16 @@ public class GridSqlQueryParser {
     /**
      * @param qry Select.
      */
-    public GridSqlQuery parse(Query qry) {
+    public GridSqlQuery parse(Prepared qry) {
         if (qry instanceof Select)
             return parse((Select)qry);
 
         if (qry instanceof SelectUnion)
             return parse((SelectUnion)qry);
 
+        if (qry instanceof Explain)
+            return parse(EXPLAIN_COMMAND.get((Explain)qry)).explain(true);
+
         throw new UnsupportedOperationException("Unknown query type: " + qry);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 73d7751..b1d8913 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -78,7 +78,7 @@ public class GridSqlQuerySplitter {
         if (params == null)
             params = GridCacheSqlQuery.EMPTY_PARAMS;
 
-        GridSqlQuery qry0 = GridSqlQueryParser.parse(stmt);
+        final GridSqlQuery qry0 = GridSqlQueryParser.parse(stmt);
 
         GridSqlSelect srcQry;
 
@@ -87,6 +87,8 @@ public class GridSqlQuerySplitter {
         else { // Handle UNION.
             srcQry = new GridSqlSelect().from(new GridSqlSubquery(qry0));
 
+            srcQry.explain(qry0.explain());
+
             GridSqlSelect left = leftest(qry0);
 
             int c = 0;
@@ -127,7 +129,10 @@ public class GridSqlQuerySplitter {
 
         // Create map and reduce queries.
         GridSqlSelect mapQry = srcQry.clone();
-        GridSqlSelect rdcQry = new GridSqlSelect().from(new GridSqlFunction("PUBLIC", TABLE_FUNC_NAME)); // table(mergeTable)); TODO
+
+        mapQry.explain(false);
+
+        GridSqlSelect rdcQry = new GridSqlSelect().from(new GridSqlFunction(null, TABLE_FUNC_NAME)); // table(mergeTable)); TODO
 
         // Split all select expressions into map-reduce parts.
         List<GridSqlElement> mapExps = F.addAll(
@@ -213,6 +218,8 @@ public class GridSqlQuerySplitter {
         res.addMapQuery(mergeTable, mapQry.getSQL(),
             findParams(mapQry, params, new ArrayList<>(params.length)).toArray());
 
+        res.explain(qry0.explain());
+
         return res;
     }
 
@@ -281,6 +288,10 @@ public class GridSqlQuerySplitter {
             while (target.size() < idx)
                 target.add(null);
 
+            if (params.length <= idx)
+                throw new IgniteException("Invalid number of query parameters. " +
+                    "Cannot find " + idx + " parameter.");
+
             Object param = params[idx];
 
             if (idx == target.size())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java
index 0786eac..9972bba 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java
@@ -68,7 +68,7 @@ public class GridSqlSelect extends GridSqlQuery {
 
     /** {@inheritDoc} */
     @Override public String getSQL() {
-        StatementBuilder buff = new StatementBuilder("SELECT");
+        StatementBuilder buff = new StatementBuilder(explain() ? "EXPLAIN SELECT" : "SELECT");
 
         if (distinct)
             buff.append(" DISTINCT");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java
index 23c18ed..96beb6b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java
@@ -47,7 +47,7 @@ public class GridSqlUnion extends GridSqlQuery {
 
     /** {@inheritDoc} */
     @Override public String getSQL() {
-        StatementBuilder buff = new StatementBuilder();
+        StatementBuilder buff = new StatementBuilder(explain() ? "EXPLAIN \n" : "");
 
         buff.append('(').append(left.getSQL()).append(')');
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 918a541..f15a2da 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -30,6 +30,8 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 import org.h2.jdbc.*;
 import org.h2.result.*;
 import org.h2.store.*;
@@ -44,6 +46,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.*;
 
 /**
  * Map query executor.
@@ -138,6 +141,9 @@ public class GridMapQueryExecutor {
 
             ClusterNode node = ctx.discovery().node(nodeId);
 
+            if (node == null)
+                return; // Node left, ignore.
+
             boolean processed = true;
 
             if (msg instanceof GridQueryRequest)
@@ -203,7 +209,14 @@ public class GridMapQueryExecutor {
         Collection<GridCacheSqlQuery> qrys;
 
         try {
-            qrys = req.queries(ctx.config().getMarshaller());
+            qrys = req.queries();
+
+            if (!node.isLocal()) {
+                Marshaller m = ctx.config().getMarshaller();
+
+                for (GridCacheSqlQuery qry : qrys)
+                    qry.unmarshallParams(m);
+            }
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -344,7 +357,10 @@ public class GridMapQueryExecutor {
             boolean loc = node.isLocal();
 
             GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, qry, page,
-                page == 0 ? res.rowCount : -1, loc ? null : marshallRows(rows), loc ? rows : null);
+                page == 0 ? res.rowCount : -1 ,
+                res.cols,
+                loc ? null : toMessages(rows, new ArrayList<Message>(res.cols)),
+                loc ? rows : null);
 
             if (loc)
                 h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
@@ -510,6 +526,9 @@ public class GridMapQueryExecutor {
         private final UUID qrySrcNodeId;
 
         /** */
+        private final int cols;
+
+        /** */
         private int page;
 
         /** */
@@ -538,6 +557,7 @@ public class GridMapQueryExecutor {
             }
 
             rowCount = res.getRowCount();
+            cols = res.getVisibleColumnCount();
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 407341e..9136821 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -96,14 +96,14 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param nodeId Node ID.
      */
     public void fail(UUID nodeId) {
-        addPage0(new GridResultPage(nodeId, null, false));
+        addPage0(new GridResultPage(null, nodeId, null, false));
     }
 
     /**
      * @param page Page.
      */
     public final void addPage(GridResultPage page) {
-        int pageRowsCnt = page.rows().size();
+        int pageRowsCnt = page.rowsInPage();
 
         if (pageRowsCnt != 0)
             addPage0(page);
@@ -137,7 +137,7 @@ public abstract class GridMergeIndex extends BaseIndex {
             if (last)
                 last = lastSubmitted.compareAndSet(false, true);
 
-            addPage0(new GridResultPage(page.source(), null, last));
+            addPage0(new GridResultPage(null, page.source(), null, last));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index 93c9482..76a52e9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -44,7 +44,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
 
     /** {@inheritDoc} */
     @Override protected void addPage0(GridResultPage page) {
-        if (!page.rows().isEmpty() || page.isLast() || queue.isEmpty())
+        if (page.rowsInPage() != 0 || page.isLast() || queue.isEmpty())
             queue.add(page);
     }
 
@@ -75,7 +75,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
 
                     fetchNextPage(page);
 
-                    iter = page.rows().iterator();
+                    iter = page.rows();
                 }
 
                 return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 7f42e0d..50c30a5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -28,12 +28,13 @@ import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.processors.query.h2.*;
-import org.apache.ignite.internal.processors.query.h2.sql.*;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
 import org.apache.ignite.plugin.extensions.communication.*;
+import org.h2.command.*;
 import org.h2.command.ddl.*;
 import org.h2.command.dml.Query;
 import org.h2.engine.*;
@@ -151,9 +152,6 @@ public class GridReduceQueryExecutor {
                 }
             }
         }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
-
-        h2.executeStatement("PUBLIC", "CREATE ALIAS " + GridSqlQuerySplitter.TABLE_FUNC_NAME +
-            " NOBUFFER FOR \"" + GridReduceQueryExecutor.class.getName() + ".mergeTableFunction\"");
     }
 
     /**
@@ -166,6 +164,9 @@ public class GridReduceQueryExecutor {
 
             ClusterNode node = ctx.discovery().node(nodeId);
 
+            if (node == null)
+                return; // Node left, ignore.
+
             boolean processed = true;
 
             if (msg instanceof GridQueryNextPageResponse)
@@ -230,7 +231,7 @@ public class GridReduceQueryExecutor {
         GridResultPage page;
 
         try {
-            page = new GridResultPage(node.id(), msg, false) {
+            page = new GridResultPage(ctx, node.id(), msg, false) {
                 @Override public void fetchNextPage() {
                     if (r.rmtErr != null)
                         throw new CacheException("Next page fetch failed.", r.rmtErr);
@@ -279,15 +280,16 @@ public class GridReduceQueryExecutor {
 
         String space = cctx.name();
 
-        r.conn = h2.connectionForSpace(space);
+        r.conn = (JdbcConnection)h2.connectionForSpace(space);
 
-        // TODO Add topology version.
+        // TODO    Add topology version.
         ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space);
 
-        if (cctx.isReplicated()) {
-            assert dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node.";
+        if (cctx.isReplicated() || qry.explain()) {
+            assert qry.explain() || dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node.";
 
-            dataNodes = dataNodes.forRandom(); // Select random data node to run query on a replicated data.
+            // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
+            dataNodes = dataNodes.forRandom();
         }
 
         final Collection<ClusterNode> nodes = dataNodes.nodes();
@@ -296,7 +298,7 @@ public class GridReduceQueryExecutor {
             GridMergeTable tbl;
 
             try {
-                tbl = createFunctionTable((JdbcConnection)r.conn, mapQry); // createTable(r.conn, mapQry); TODO
+                tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
@@ -317,14 +319,32 @@ public class GridReduceQueryExecutor {
         runs.put(qryReqId, r);
 
         try {
-            send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, qry.mapQueries(),
-                ctx.config().getMarshaller().marshal(qry.mapQueries())));
+            Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
+
+            if (qry.explain()) {
+                mapQrys = new ArrayList<>(qry.mapQueries().size());
+
+                for (GridCacheSqlQuery mapQry : qry.mapQueries())
+                    mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters()));
+            }
+
+            if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
+                Marshaller m = ctx.config().getMarshaller();
+
+                for (GridCacheSqlQuery mapQry : mapQrys)
+                    mapQry.marshallParams(m);
+            }
+
+            send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys));
 
             r.latch.await();
 
             if (r.rmtErr != null)
                 throw new CacheException("Failed to run map query remotely.", r.rmtErr);
 
+            if (qry.explain())
+                return explainPlan(r.conn, space, qry);
+
             GridCacheSqlQuery rdc = qry.reduceQuery();
 
             final ResultSet res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters()));
@@ -355,6 +375,55 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @param c Connection.
+     * @param space Space.
+     * @param qry Query.
+     * @return Cursor for plans.
+     * @throws IgniteCheckedException if failed.
+     */
+    private QueryCursor<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry)
+        throws IgniteCheckedException {
+        List<List<?>> lists = new ArrayList<>();
+
+        for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+            ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "SELECT PLAN FROM " + mapQry.alias(), null);
+
+            lists.add(F.asList(getPlan(rs)));
+        }
+
+        for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+            GridMergeTable tbl = createFunctionTable(c, mapQry, false);
+
+            curFunTbl.set(tbl); // Now it will be only a single table.
+        }
+
+        GridCacheSqlQuery rdc = qry.reduceQuery();
+
+        ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "EXPLAIN " + rdc.query(), F.asList(rdc.parameters()));
+
+        lists.add(F.asList(getPlan(rs)));
+
+        return new QueryCursorImpl<>(lists.iterator());
+    }
+
+    /**
+     * @param rs Result set.
+     * @return Plan.
+     * @throws IgniteCheckedException If failed.
+     */
+    private String getPlan(ResultSet rs) throws IgniteCheckedException {
+        try {
+            if (!rs.next())
+                throw new IllegalStateException();
+
+            return rs.getString(1);
+        }
+        catch (SQLException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
      * @param nodes Nodes.
      * @param msg Message.
      * @throws IgniteCheckedException If failed.
@@ -495,10 +564,12 @@ public class GridReduceQueryExecutor {
     /**
      * @param conn Connection.
      * @param qry Query.
+     * @param explain Explain.
      * @return Table.
      * @throws IgniteCheckedException
      */
-    private GridMergeTable createFunctionTable(JdbcConnection conn, GridCacheSqlQuery qry) throws IgniteCheckedException {
+    private GridMergeTable createFunctionTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
+        throws IgniteCheckedException {
         try {
             Session ses = (Session)conn.getSession();
 
@@ -508,17 +579,21 @@ public class GridReduceQueryExecutor {
             data.schema = ses.getDatabase().getSchema(ses.getCurrentSchemaName());
             data.create = true;
 
-            Query prepare = (Query)ses.prepare(qry.query(), false);
+            if (!explain) {
+                Prepared prepare = ses.prepare(qry.query(), false);
 
-            List<org.h2.expression.Parameter> parsedParams = prepare.getParameters();
+                List<org.h2.expression.Parameter> parsedParams = prepare.getParameters();
 
-            for (int i = Math.min(parsedParams.size(), qry.parameters().length); --i >= 0; ) {
-                Object val = qry.parameters()[i];
+                for (int i = Math.min(parsedParams.size(), qry.parameters().length); --i >= 0; ) {
+                    Object val = qry.parameters()[i];
 
-                parsedParams.get(i).setValue(DataType.convertToValue(ses, val, Value.UNKNOWN));
-            }
+                    parsedParams.get(i).setValue(DataType.convertToValue(ses, val, Value.UNKNOWN));
+                }
 
-            data.columns = generateColumnsFromQuery(prepare);
+                data.columns = generateColumnsFromQuery((Query)prepare);
+            }
+            else
+                data.columns = planColumns();
 
             return new GridMergeTable(data);
         }
@@ -530,6 +605,17 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @return Columns.
+     */
+    private static ArrayList<Column> planColumns() {
+        ArrayList<Column> res = new ArrayList<>(1);
+
+        res.add(new Column("PLAN", Value.STRING));
+
+        return res;
+    }
+
+    /**
      * @param conn Connection.
      * @param qry Query.
      * @return Table.
@@ -566,7 +652,7 @@ public class GridReduceQueryExecutor {
         private CountDownLatch latch;
 
         /** */
-        private Connection conn;
+        private JdbcConnection conn;
 
         /** */
         private int pageSize;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
index 5f58b95..9392fd1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
@@ -17,12 +17,18 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 import org.h2.value.*;
 
+import javax.cache.*;
 import java.util.*;
 
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.*;
+
 /**
  * Page result.
  */
@@ -34,18 +40,22 @@ public class GridResultPage {
     protected final GridQueryNextPageResponse res;
 
     /** */
-    private final Collection<Value[]> rows;
+    private final int rowsInPage;
 
     /** */
     private final boolean last;
 
+    /** */
+    private Iterator<Value[]> rows;
+
     /**
+     * @param ctx Kernal context.
      * @param src Source.
      * @param res Response.
      * @param last If this is the globally last page.
      */
     @SuppressWarnings("unchecked")
-    public GridResultPage(UUID src, GridQueryNextPageResponse res, boolean last) {
+    public GridResultPage(final GridKernalContext ctx, UUID src, GridQueryNextPageResponse res, boolean last) {
         assert src != null;
 
         this.src = src;
@@ -57,12 +67,53 @@ public class GridResultPage {
 
         // res == null means that it is a terminating dummy page for the given source node ID.
         if (res != null) {
-            Object plainRows = res.plainRows();
+            Collection<?> plainRows = res.plainRows();
+
+            if (plainRows != null) {
+                rowsInPage = plainRows.size();
+
+                rows = (Iterator<Value[]>)plainRows.iterator();
+            }
+            else {
+                final int cols = res.columns();
+
+                rowsInPage = res.values().size() / cols;
+
+                final Iterator<Message> valsIter = res.values().iterator();
+
+                rows = new Iterator<Value[]>() {
+                    /** */
+                    int rowIdx;
+
+                    @Override public boolean hasNext() {
+                        return rowIdx < rowsInPage;
+                    }
+
+                    @Override public Value[] next() {
+                        if (!hasNext())
+                            throw new NoSuchElementException();
+
+                        rowIdx++;
 
-            rows = plainRows != null ? (Collection<Value[]>)plainRows : GridMapQueryExecutor.unmarshallRows(res.rows());
+                        try {
+                            return fillArray(valsIter, new Value[cols], ctx);
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new CacheException(e);
+                        }
+                    }
+
+                    @Override public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+        }
+        else {
+            rowsInPage = 0;
+
+            rows = Collections.emptyIterator();
         }
-        else
-            rows = Collections.emptySet();
     }
 
     /**
@@ -73,10 +124,23 @@ public class GridResultPage {
     }
 
     /**
+     * @return Number on rows in this page.
+     */
+    public int rowsInPage() {
+        return rowsInPage;
+    }
+
+    /**
      * @return Rows.
      */
-    public Collection<Value[]> rows() {
-        return rows;
+    public Iterator<Value[]> rows() {
+        Iterator<Value[]> r = rows;
+
+        assert r != null;
+
+        rows = null;
+
+        return r;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java
new file mode 100644
index 0000000..26fd81d
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.h2.value.*;
+
+import java.nio.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.*;
+
+/**
+ * H2 Array.
+ */
+public class GridH2Array extends GridH2ValueMessage {
+    /** */
+    @GridDirectCollection(Message.class)
+    private Collection<Message> x;
+
+    /**
+     *
+     */
+    public GridH2Array() {
+        // No-op.
+    }
+
+    /**
+     * @param val Value.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridH2Array(Value val) throws IgniteCheckedException {
+        assert val.getType() == Value.ARRAY : val.getType();
+
+        ValueArray arr = (ValueArray)val;
+
+        x = new ArrayList<>(arr.getList().length);
+
+        for (Value v : arr.getList())
+            x.add(toMessage(v));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Value value(GridKernalContext ctx) throws IgniteCheckedException {
+        return ValueArray.get(fillArray(x.iterator(), new Value[x.size()], ctx));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeCollection("x", x, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                x = reader.readCollection("x", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -18;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java
new file mode 100644
index 0000000..ec4e455
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.h2.value.*;
+
+import java.nio.*;
+
+/**
+ * H2 Boolean.
+ */
+public class GridH2Boolean extends GridH2ValueMessage {
+    /** */
+    private boolean x;
+
+    /**
+     *
+     */
+    public GridH2Boolean() {
+        // No-op.
+    }
+
+    /**
+     * @param val Value.
+     */
+    public GridH2Boolean(Value val) {
+        assert val.getType() == Value.BOOLEAN : val.getType();
+
+        x = val.getBoolean();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Value value(GridKernalContext ctx) {
+        return ValueBoolean.get(x);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeBoolean("x", x))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                x = reader.readBoolean("x");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -5;
+    }
+
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java
new file mode 100644
index 0000000..e7c8f33
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.h2.value.*;
+
+import java.nio.*;
+
+/**
+ * H2 Byte.
+ */
+public class GridH2Byte extends GridH2ValueMessage {
+    /** */
+    private byte x;
+
+    /**
+     *
+     */
+    public GridH2Byte() {
+        // No-op.
+    }
+
+    /**
+     * @param val Value.
+     */
+    public GridH2Byte(Value val) {
+        assert val.getType() == Value.BYTE : val.getType();
+
+        x = val.getByte();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Value value(GridKernalContext ctx) {
+        return ValueByte.get(x);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByte("x", x))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                x = reader.readByte("x");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -6;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+}