You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/03 10:55:51 UTC

[2/2] ignite git commit: IGNITE-4633 Introduced component type for DDL statements processor.

IGNITE-4633 Introduced component type for DDL statements processor.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c05be524
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c05be524
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c05be524

Branch: refs/heads/ignite-4565-ddl
Commit: c05be524a9b73356d26474aa9962434155d111f3
Parents: 796c933
Author: Alexander Paschenko <al...@gmail.com>
Authored: Fri Mar 3 13:55:41 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 3 13:55:41 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/ContextAware.java    |  30 ++
 .../communication/GridIoMessageFactory.java     |  14 +-
 .../discovery/CustomMessageWrapper.java         |  10 +-
 .../processors/query/GridQueryIndexing.java     |   4 +-
 .../processors/query/GridQueryProcessor.java    |  24 +-
 .../query/ddl/DdlOperationNodeResult.java       | 136 ++++++
 .../query/ddl/DdlOperationResult.java           | 134 ++++++
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   5 +
 .../ignite/testframework/GridTestUtils.java     |  57 +++
 .../query/h2/DdlStatementsProcessor.java        |  67 ---
 .../query/h2/DmlStatementsProcessor.java        |  27 +-
 .../processors/query/h2/IgniteH2Indexing.java   |  66 ++-
 .../query/h2/ddl/DdlAbstractOperation.java      |  62 +++
 .../query/h2/ddl/DdlCreateIndexOperation.java   |  99 +++++
 .../query/h2/ddl/DdlStatementsProcessor.java    | 424 +++++++++++++++++++
 .../h2/ddl/msg/DdlAbstractDiscoveryMessage.java |  63 +++
 .../h2/ddl/msg/DdlAckDiscoveryMessage.java      |  70 +++
 .../h2/ddl/msg/DdlInitDiscoveryMessage.java     |  95 +++++
 .../query/h2/sql/GridCreateIndex.java           | 121 ------
 .../processors/query/h2/sql/GridDropIndex.java  |  82 ----
 .../query/h2/sql/GridSqlCreateIndex.java        | 121 ++++++
 .../query/h2/sql/GridSqlDropIndex.java          |  82 ++++
 .../query/h2/sql/GridSqlQueryParser.java        |  16 +-
 .../query/h2/ddl/GridDdlProtoTest.java          | 188 ++++++++
 .../query/h2/sql/GridQueryParsingTest.java      |  30 +-
 25 files changed, 1694 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/internal/ContextAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ContextAware.java b/modules/core/src/main/java/org/apache/ignite/internal/ContextAware.java
new file mode 100644
index 0000000..41b3193
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/ContextAware.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+/**
+ * Context aware entity.
+ */
+public interface ContextAware {
+    /**
+     * Provide this message with a kernal context.
+     *
+     * @param ctx Kernal context.
+     */
+    public void context(GridKernalContext ctx);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6f95400..3a12303 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -118,11 +118,11 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
-import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
 import org.apache.ignite.internal.processors.igfs.IgfsAckMessage;
 import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
 import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage;
@@ -133,6 +133,8 @@ import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerResponse;
 import org.apache.ignite.internal.processors.igfs.IgfsSyncMessage;
 import org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage;
 import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage;
+import org.apache.ignite.internal.processors.query.ddl.DdlOperationNodeResult;
+import org.apache.ignite.internal.processors.query.ddl.DdlOperationResult;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@ -838,6 +840,16 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case -45:
+                msg = new DdlOperationResult();
+
+                break;
+
+            case -46:
+                msg = new DdlOperationNodeResult();
+
+                break;
+
             // [-3..119] [124..127] [-36..-44]- this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
index 8f56248..0a12c44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
@@ -17,13 +17,15 @@
 
 package org.apache.ignite.internal.managers.discovery;
 
+import org.apache.ignite.internal.ContextAware;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
-class CustomMessageWrapper implements DiscoverySpiCustomMessage {
+class CustomMessageWrapper implements DiscoverySpiCustomMessage, ContextAware {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -57,6 +59,12 @@ class CustomMessageWrapper implements DiscoverySpiCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public void context(GridKernalContext ctx) {
+        if (delegate instanceof ContextAware)
+            ((ContextAware) delegate).context(ctx);
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return delegate.toString();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index ca04724..276a603 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -234,11 +234,11 @@ public interface GridQueryIndexing {
     /**
      * Prepare native statement to retrieve JDBC metadata from.
      *
-     * @param schema Schema.
+     * @param space Schema.
      * @param sql Query.
      * @return {@link PreparedStatement} from underlying engine to supply metadata to Prepared - most likely H2.
      */
-    public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException;
+    public PreparedStatement prepareNativeStatement(String space, String sql) throws SQLException;
 
     /**
      * Collect queries that already running more than specified duration.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index e19fbcc..42e7c49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -17,12 +17,11 @@
 
 package org.apache.ignite.internal.processors.query;
 
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.concurrent.TimeUnit;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.ArrayList;
@@ -40,6 +39,7 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import javax.cache.Cache;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
@@ -215,6 +215,16 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @return Indexing.
+     * @throws IgniteException If module is not enabled.
+     */
+    public GridQueryIndexing getIndexing() throws IgniteException {
+        checkxEnabled();
+
+        return idx;
+    }
+
+    /**
      * @param cctx Cache context.
      * @throws IgniteCheckedException If failed.
      */
@@ -966,14 +976,14 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
     /**
      *
-     * @param schema Schema.
+     * @param space Space name.
      * @param sql Query.
      * @return {@link PreparedStatement} from underlying engine to supply metadata to Prepared - most likely H2.
      */
-    public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException {
+    public PreparedStatement prepareNativeStatement(String space, String sql) throws SQLException {
         checkxEnabled();
 
-        return idx.prepareNativeStatement(schema, sql);
+        return idx.prepareNativeStatement(space, sql);
     }
 
     /**
@@ -1802,7 +1812,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
             throw (IgniteCheckedException)err;
         }
-        catch (CacheException e) {
+        catch (CacheException | IgniteException e) {
             err = e;
 
             throw e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationNodeResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationNodeResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationNodeResult.java
new file mode 100644
index 0000000..a699530
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationNodeResult.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Message sent from <b>peer node</b> to <b>coordinator</b> when local portion of work is done.
+ */
+public class DdlOperationNodeResult implements Message {
+    /** Operation id. */
+    private IgniteUuid opId;
+
+    /** Error bytes. */
+    private byte[] err;
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByteArray("err", err))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeIgniteUuid("opId", opId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                err = reader.readByteArray("err");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                opId = reader.readIgniteUuid("opId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(DdlOperationNodeResult.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -46;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /**
+     * @return Whole DDL operation ID.
+     */
+    public IgniteUuid getOperationId() {
+        return opId;
+    }
+
+    /**
+     * @param operationId Whole DDL operation ID.
+     */
+    public void setOperationId(IgniteUuid operationId) {
+        this.opId = operationId;
+    }
+
+    /**
+     * @return Error, if any.
+     */
+    public byte[] getError() {
+        return err;
+    }
+
+    /**
+     * @param err Error, if any.
+     */
+    public void setError(byte[] err) {
+        this.err = err;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationResult.java
new file mode 100644
index 0000000..a6904a9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationResult.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Message sent from <b>coordinator</b> to <b>client</b> when operation is ultimately finished.
+ */
+public class DdlOperationResult implements Message {
+    /** Operation id. */
+    private IgniteUuid opId;
+
+    /** Error bytes. */
+    private byte[] err;
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByteArray("err", err))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeIgniteUuid("opId", opId))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                err = reader.readByteArray("err");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                opId = reader.readIgniteUuid("opId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(DdlOperationResult.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -45;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /**
+     * @return Whole DDL operation ID.
+     */
+    public IgniteUuid getOperationId() {
+        return opId;
+    }
+
+    /**
+     * @param operationId Whole DDL operation ID.
+     */
+    public void setOperationId(IgniteUuid operationId) {
+        this.opId = operationId;
+    }
+
+    /**
+     * @return Error, if any.
+     */
+    public byte[] getError() {
+        return err;
+    }
+
+    /**
+     * @param err Error, if any.
+     */
+    public void setError(byte[] err) {
+        this.err = err;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 562b4c3..8adf062 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -63,11 +63,13 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.ContextAware;
 import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.processors.service.GridServiceProcessor;
@@ -5139,6 +5141,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
 
                     if (msgObj != null) {
+                        if (msgObj instanceof ContextAware)
+                            ((ContextAware)msgObj).context(((IgniteEx)spi.ignite()).context());
+
                         DiscoverySpiCustomMessage nextMsg = msgObj.ackMessage();
 
                         if (nextMsg != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 0ae6575..90b63ba 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -58,6 +58,7 @@ import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 import junit.framework.Test;
+import junit.framework.TestCase;
 import junit.framework.TestSuite;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -99,6 +100,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
 import org.apache.ignite.ssl.SslContextFactory;
 import org.apache.ignite.testframework.config.GridTestProperties;
+import org.apache.ignite.testframework.junits.GridAbstractTest;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -853,6 +855,61 @@ public final class GridTestUtils {
     }
 
     /**
+     * @return Name of current test based on stack trace. Works when called from JUnit thread and from test method.
+     */
+    public static String getGridTestName() {
+        List<StackTraceElement> trace = Arrays.asList(Thread.currentThread().getStackTrace());
+
+        Collections.reverse(trace);
+
+        int i = 0;
+
+        for (; i < trace.size(); i++) {
+            StackTraceElement e = trace.get(i);
+
+            if (e.getClassName().equals(TestCase.class.getName()) && e.getMethodName().equals("runTest"))
+                break;
+        }
+
+        if (++i >= trace.size())
+            throw new RuntimeException("JUnit TestCase.runTest not found on stack trace");
+
+        for (; i < trace.size() - 1; i++) {
+            StackTraceElement e = trace.get(i);
+
+            String clsName = e.getClassName();
+
+            // Skip reflection related stuff
+            if (!clsName.startsWith("java.lang.") && !clsName.startsWith("sun.reflect."))
+                break;
+        }
+
+        if (i == trace.size())
+            throw new RuntimeException("Non JDK related method not found on stack trace");
+
+        StackTraceElement e = trace.get(i);
+
+        Class<?> cls;
+
+        try {
+            cls = Class.forName(e.getClassName());
+        }
+        catch (ClassNotFoundException ex) {
+            throw new RuntimeException(ex);
+        }
+
+        if (!GridAbstractTest.class.isAssignableFrom(cls))
+            throw new RuntimeException("Grid test not found on stack trace");
+
+        String mtdName = e.getMethodName();
+
+        if (!mtdName.startsWith("test"))
+            throw new RuntimeException("GridTestUtils.getGridTestName must be called from test method");
+
+        return mtdName.substring(4);
+    }
+
+    /**
      * @param <T> Type.
      * @param cls Class.
      * @param annCls Annotation class.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DdlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DdlStatementsProcessor.java
deleted file mode 100644
index 462ab01..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DdlStatementsProcessor.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.util.List;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.QueryIndex;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.h2.sql.GridCreateIndex;
-import org.apache.ignite.internal.processors.query.h2.sql.GridDropIndex;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
-import org.h2.command.Prepared;
-
-/**
- * Logic to execute DDL statements.
- */
-class DdlStatementsProcessor {
-    /**
-     * Indexing.
-     */
-    private final IgniteH2Indexing indexing;
-
-    /** */
-    DdlStatementsProcessor(IgniteH2Indexing indexing) {
-        this.indexing = indexing;
-    }
-
-    /**
-     * Execute DDL statement.
-     *
-     * @param cctx Cache context.
-     * @param stmt H2 statement to parse and execute.
-     */
-    QueryCursor<List<?>> runDdlStatement(GridCacheContext<?, ?> cctx, Prepared stmt) throws IgniteCheckedException {
-        GridSqlStatement gridStmt = new GridSqlQueryParser().parse(stmt);
-
-        if (gridStmt instanceof GridCreateIndex) {
-            QueryIndex newIdx = ((GridCreateIndex) gridStmt).index();
-
-            throw new UnsupportedOperationException("CREATE INDEX");
-        }
-        else if (gridStmt instanceof GridDropIndex)
-            throw new UnsupportedOperationException("DROP INDEX");
-        else
-            throw new IgniteSQLException("Unexpected DDL operation [type=" + gridStmt.getClass() + ']',
-                IgniteQueryErrorCode.UNEXPECTED_OPERATION);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 799ce39..352b013 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -52,7 +52,6 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
@@ -94,7 +93,7 @@ public class DmlStatementsProcessor {
     private final static int DFLT_DML_RERUN_ATTEMPTS = 4;
 
     /** Indexing. */
-    private final IgniteH2Indexing indexing;
+    private IgniteH2Indexing idx;
 
     /** Set of binary type ids for which warning about missing identity in configuration has been printed. */
     private final static Set<Integer> WARNED_TYPES =
@@ -106,15 +105,11 @@ public class DmlStatementsProcessor {
     /** Update plans cache. */
     private final ConcurrentMap<String, ConcurrentMap<String, UpdatePlan>> planCache = new ConcurrentHashMap<>();
 
-    /** Dummy metadata for update result. */
-    private final static List<GridQueryFieldMetadata> UPDATE_RESULT_META = Collections.<GridQueryFieldMetadata>
-        singletonList(new IgniteH2Indexing.SqlFieldMetadata(null, null, "UPDATED", Long.class.getName()));
-
     /**
-     * @param indexing indexing.
+     * @param idx indexing.
      */
-    DmlStatementsProcessor(IgniteH2Indexing indexing) {
-        this.indexing = indexing;
+    public void start(IgniteH2Indexing idx) {
+        this.idx = idx;
     }
 
     /**
@@ -208,7 +203,7 @@ public class DmlStatementsProcessor {
         SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
         long res = updateSqlFields(spaceName, stmt, fieldsQry, true, filters, cancel);
 
-        return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META,
+        return new GridQueryFieldsResultAdapter(IgniteH2Indexing.UPDATE_RESULT_META,
             new IgniteSingletonIterator(Collections.singletonList(res)));
     }
 
@@ -259,10 +254,10 @@ public class DmlStatementsProcessor {
                 .setPageSize(fieldsQry.getPageSize())
                 .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS);
 
-            cur = (QueryCursorImpl<List<?>>) indexing.queryTwoStep(cctx, newFieldsQry, cancel);
+            cur = (QueryCursorImpl<List<?>>) idx.queryTwoStep(cctx, newFieldsQry, cancel);
         }
         else {
-            final GridQueryFieldsResult res = indexing.queryLocalSqlFields(cctx.name(), plan.selectQry, F.asList(params),
+            final GridQueryFieldsResult res = idx.queryLocalSqlFields(cctx.name(), plan.selectQry, F.asList(params),
                 filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel);
 
             cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@@ -399,7 +394,7 @@ public class DmlStatementsProcessor {
         while (it.hasNext()) {
             List<?> e = it.next();
             if (e.size() != 2) {
-                U.warn(indexing.getLogger(), "Invalid row size on DELETE - expected 2, got " + e.size());
+                U.warn(idx.getLogger(), "Invalid row size on DELETE - expected 2, got " + e.size());
                 continue;
             }
 
@@ -900,7 +895,7 @@ public class DmlStatementsProcessor {
     private BinaryObject updateHashCodeIfNeeded(GridCacheContext cctx, BinaryObject binObj) {
         if (U.isHashCodeEmpty(binObj)) {
             if (WARNED_TYPES.add(binObj.type().typeId()))
-                U.warn(indexing.getLogger(), "Binary object's type does not have identity resolver explicitly set, therefore " +
+                U.warn(idx.getLogger(), "Binary object's type does not have identity resolver explicitly set, therefore " +
                     "BinaryArrayIdentityResolver is used to generate hash codes for its instances, and therefore " +
                     "hash code of this binary object will most likely not match that of its non serialized form. " +
                     "For finer control over identity of this type, please update your BinaryConfiguration accordingly." +
@@ -1011,11 +1006,11 @@ public class DmlStatementsProcessor {
      * @return Resulting Iterable.
      */
     @SuppressWarnings("unchecked")
-    private static QueryCursorImpl<List<?>> cursorForUpdateResult(long itemsCnt) {
+    public static QueryCursorImpl<List<?>> cursorForUpdateResult(long itemsCnt) {
         QueryCursorImpl<List<?>> res =
             new QueryCursorImpl(Collections.singletonList(Collections.singletonList(itemsCnt)), null, false);
 
-        res.fieldsMeta(UPDATE_RESULT_META);
+        res.fieldsMeta(IgniteH2Indexing.UPDATE_RESULT_META);
 
         return res;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index bb04dcc..ad731ff 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -82,7 +82,6 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -91,7 +90,9 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
 import org.apache.ignite.internal.processors.query.GridQueryIndexing;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
@@ -212,6 +213,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         ";ROW_FACTORY=\"" + GridH2RowFactory.class.getName() + "\"" +
         ";DEFAULT_TABLE_ENGINE=" + GridH2DefaultTableEngine.class.getName();
 
+    /** Dummy metadata for update result. */
+    static final List<GridQueryFieldMetadata> UPDATE_RESULT_META = Collections.<GridQueryFieldMetadata>
+        singletonList(new SqlFieldMetadata(null, null, "UPDATED", Long.class.getName()));
+
     /** */
     private static final int PREPARED_STMT_CACHE_SIZE = 256;
 
@@ -262,6 +267,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
     }
 
+    /** For tests. */
+    public static Class<? extends DdlStatementsProcessor> ddlProcCls;
+
     /** Logger. */
     @LoggerResource
     private IgniteLogger log;
@@ -345,10 +353,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private volatile GridKernalContext ctx;
 
     /** */
-    private final DmlStatementsProcessor dmlProc = new DmlStatementsProcessor(this);
+    private final DmlStatementsProcessor dmlProc = new DmlStatementsProcessor();
 
     /** */
-    private final DdlStatementsProcessor ddlProc = new DdlStatementsProcessor(this);
+    private DdlStatementsProcessor ddlProc;
 
     /** */
     private final ConcurrentMap<String, GridH2Table> dataTables = new ConcurrentHashMap8<>();
@@ -380,6 +388,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * @return DDL statements processor.
+     */
+    public DdlStatementsProcessor getDdlStatementsProcessor() {
+        return ddlProc;
+    }
+
+    /**
      * @param space Space.
      * @return Connection.
      */
@@ -395,7 +410,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * @return Logger.
      */
-    IgniteLogger getLogger() {
+    public IgniteLogger getLogger() {
         return log;
     }
 
@@ -442,8 +457,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException {
-        return prepareStatement(connectionForSpace(schema), sql, false);
+    @Override public PreparedStatement prepareNativeStatement(String space, String sql) throws SQLException {
+        return prepareStatement(connectionForSpace(space), sql, true);
     }
 
     /**
@@ -817,8 +832,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     @SuppressWarnings("unchecked")
     @Override public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry,
         @Nullable final Collection<Object> params, final IndexingQueryFilter filters, boolean enforceJoinOrder,
-        final int timeout, final GridQueryCancel cancel)
-        throws IgniteCheckedException {
+        final int timeout, final GridQueryCancel cancel) throws IgniteCheckedException {
         final Connection conn = connectionForSpace(spaceName);
 
         setupConnection(conn, false, enforceJoinOrder);
@@ -827,7 +841,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         Prepared p = GridSqlQueryParser.prepared((JdbcPreparedStatement)stmt);
 
-        if (!p.isQuery()) {
+        if (DmlStatementsProcessor.isDmlStatement(p)) {
             SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry);
 
             if (params != null)
@@ -836,8 +850,17 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             fldsQry.setEnforceJoinOrder(enforceJoinOrder);
             fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
 
-            return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel);
+            try {
+                return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteSQLException("Failed to execute DML statement [stmt=" + qry + ", params=" +
+                    Arrays.deepToString(fldsQry.getArgs()) + "]", e);
+            }
         }
+        else if (DdlStatementsProcessor.isDdlStatement(p))
+            throw new IgniteSQLException("DDL statements are supported for the whole cluster only",
+                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
         List<GridQueryFieldMetadata> meta;
 
@@ -1296,7 +1319,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     IgniteQueryErrorCode.STMT_TYPE_MISMATCH);
 
             if (!prepared.isQuery()) {
-                if (dmlProc.isDmlStatement(prepared)) {
+                if (DmlStatementsProcessor.isDmlStatement(prepared)) {
                     try {
                         return dmlProc.updateSqlFieldsTwoStep(cctx.namexx(), stmt, qry, cancel);
                     }
@@ -1305,13 +1328,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                             Arrays.deepToString(qry.getArgs()) + "]", e);
                     }
                 }
-                else {
+
+                if (DdlStatementsProcessor.isDdlStatement(prepared)) {
                     try {
-                        return ddlProc.runDdlStatement(cctx, prepared);
+                        return ddlProc.runDdlStatement(stmt);
                     }
                     catch (IgniteCheckedException e) {
-                        throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + sqlQry + ", params=" +
-                            Arrays.deepToString(qry.getArgs()) + "]", e);
+                        throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + sqlQry + ']', e);
                     }
                 }
             }
@@ -1855,6 +1878,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     cleanupStatementCache();
                 }
             }, CLEANUP_STMT_CACHE_PERIOD, CLEANUP_STMT_CACHE_PERIOD);
+
+            try {
+                ddlProc = ddlProcCls == null ? new DdlStatementsProcessor() : ddlProcCls.newInstance();
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException("Failed to initialize DDL statements processor", e);
+            }
+
+            dmlProc.start(this);
+            ddlProc.start(ctx, this);
         }
 
         // TODO https://issues.apache.org/jira/browse/IGNITE-2139
@@ -2002,6 +2035,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
 //        unregisterMBean(); TODO https://issues.apache.org/jira/browse/IGNITE-2139
 
+        if (ddlProc != null)
+            ddlProc.stop();
+
         for (Schema schema : schemas.values())
             schema.onDrop();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlAbstractOperation.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlAbstractOperation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlAbstractOperation.java
new file mode 100644
index 0000000..ec73e7e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlAbstractOperation.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.ddl;
+
+import org.apache.ignite.lang.IgniteUuid;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ * DDL operation.
+ */
+public abstract class DdlAbstractOperation implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Operation ID. */
+    private final IgniteUuid opId;
+
+    /** ID of node that initiated this operation. */
+    private final UUID cliNodeId;
+
+    /**
+     * Constructor.
+     *
+     * @param opId Operation ID.
+     * @param cliNodeId Client node ID.
+     */
+    public DdlAbstractOperation(IgniteUuid opId, UUID cliNodeId) {
+        this.opId = opId;
+        this.cliNodeId = cliNodeId;
+    }
+
+    /**
+     * @return Operation id.
+     */
+    public IgniteUuid operationId() {
+        return opId;
+    }
+
+    /**
+     * @return Client node ID.
+     */
+    public UUID clientNodeId() {
+        return cliNodeId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlCreateIndexOperation.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlCreateIndexOperation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlCreateIndexOperation.java
new file mode 100644
index 0000000..88c980d
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlCreateIndexOperation.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.ddl;
+
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+import java.util.UUID;
+
+/**
+ * Arguments for {@code CREATE INDEX}.
+ */
+public class DdlCreateIndexOperation extends DdlAbstractOperation {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Index params. */
+    @GridToStringInclude
+    private final QueryIndex idx;
+
+    /** Cache name. */
+    private final String schemaName;
+
+    /** Table name. */
+    private final String tblName;
+
+    /** Ignore operation if index exists. */
+    private final boolean ifNotExists;
+
+    /**
+     * Constructor.
+     *
+     * @param opId Operation id.
+     * @param cliNodeId Id of node that initiated this operation.
+     * @param idx Index params.
+     * @param schemaName Schema name.
+     * @param tblName Table name.
+     * @param ifNotExists Ignore operation if index exists.
+     */
+    DdlCreateIndexOperation(IgniteUuid opId, UUID cliNodeId, QueryIndex idx, String schemaName, String tblName,
+        boolean ifNotExists) {
+        super(opId, cliNodeId);
+
+        this.idx = idx;
+        this.schemaName = schemaName;
+        this.tblName = tblName;
+        this.ifNotExists = ifNotExists;
+    }
+
+    /**
+     * @return Index params.
+     */
+    public QueryIndex index() {
+        return idx;
+    }
+
+    /**
+     * @return Schema name.
+     */
+    public String schemaName() {
+        return schemaName;
+    }
+
+    /**
+     * @return Table name.
+     */
+    public String tableName() {
+        return tblName;
+    }
+
+    /**
+     * @return Ignore operation if index exists.
+     */
+    public boolean ifNotExists() {
+        return ifNotExists;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DdlCreateIndexOperation.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
new file mode 100644
index 0000000..1c4eeaa
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.ddl;
+
+import java.sql.PreparedStatement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.ddl.DdlOperationNodeResult;
+import org.apache.ignite.internal.processors.query.ddl.DdlOperationResult;
+import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessor;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.ddl.msg.DdlAckDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.h2.ddl.msg.DdlInitDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlCreateIndex;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDropIndex;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.h2.command.Prepared;
+import org.h2.command.ddl.CreateIndex;
+import org.h2.command.ddl.DropIndex;
+import org.h2.jdbc.JdbcPreparedStatement;
+
+/**
+ * DDL statements processor.<p>
+ * Contains higher level logic to handle operations as a whole and communicate with the client.
+ */
+public class DdlStatementsProcessor {
+    /** Kernal context. */
+    GridKernalContext ctx;
+
+    /** Indexing engine. */
+    private IgniteH2Indexing idx;
+
+    /** State flag. */
+    private AtomicBoolean isStopped = new AtomicBoolean();
+
+    /** Running operations originating at this node as a client. */
+    private Map<IgniteUuid, GridFutureAdapter> operations = new ConcurrentHashMap<>();
+
+    /**
+     * Initialize message handlers and this' fields needed for further operation.
+     *
+     * @param ctx Kernal context.
+     * @param idx Indexing.
+     */
+    public void start(final GridKernalContext ctx, IgniteH2Indexing idx) {
+        this.ctx = ctx;
+        this.idx = idx;
+
+        ctx.discovery().setCustomEventListener(DdlInitDiscoveryMessage.class, new CustomEventListener<DdlInitDiscoveryMessage>() {
+            /** {@inheritDoc} */
+            @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
+            @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, DdlInitDiscoveryMessage msg) {
+                onInit(msg);
+            }
+        });
+
+        ctx.discovery().setCustomEventListener(DdlAckDiscoveryMessage.class, new CustomEventListener<DdlAckDiscoveryMessage>() {
+            /** {@inheritDoc} */
+            @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, DdlAckDiscoveryMessage msg) {
+                onAck(snd, msg);
+            }
+        });
+
+        ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
+            /** {@inheritDoc} */
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                if (msg instanceof DdlOperationResult) {
+                    DdlOperationResult res = (DdlOperationResult) msg;
+
+                    onResult(res.getOperationId(), bytesToException(res.getError()));
+                }
+
+                if (msg instanceof DdlOperationNodeResult) {
+                    DdlOperationNodeResult res = (DdlOperationNodeResult) msg;
+
+                    onNodeResult(res.getOperationId(), bytesToException(res.getError()));
+                }
+            }
+        });
+    }
+
+    /**
+     * Handle {@code ACK} message on a <b>peer node</b> - do local portion of actual DDL job and notify
+     * <b>coordinator</b> about success or failure.
+     *
+     * @param snd Sender.
+     * @param msg Message.
+     */
+    @SuppressWarnings({"ThrowableInstanceNeverThrown", "unchecked"})
+    private void onAck(ClusterNode snd, DdlAckDiscoveryMessage msg) {
+        // Don't do anything if we didn't choose to participate.
+        if (!msg.nodeIds().contains(ctx.localNodeId()))
+            return;
+
+        IgniteCheckedException ex = null;
+
+        DdlAbstractOperation args = msg.operation();
+
+        try {
+            doAck(args);
+        }
+        catch (Throwable e) {
+            ex = wrapThrowableIfNeeded(e);
+        }
+
+        try {
+            DdlOperationNodeResult res = new DdlOperationNodeResult();
+
+            res.setOperationId(msg.operation().operationId());
+            res.setError(exceptionToBytes(ex));
+
+            ctx.io().send(snd, GridTopic.TOPIC_QUERY, res, GridIoPolicy.IDX_POOL);
+        }
+        catch (Throwable e) {
+            idx.getLogger().error("Failed to notify coordinator about local DLL operation completion [opId=" +
+                msg.operation().operationId() + ", clientNodeId=" + snd.id() + ']', e);
+        }
+    }
+
+    /**
+     * Perform local portion of DDL operation.
+     * Exists as a separate method to allow overriding it in tests to check behavior in case of errors.
+     *
+     * @param args Operation arguments.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings("unchecked")
+    void doAck(DdlAbstractOperation args) throws IgniteCheckedException {
+        if (args instanceof DdlCreateIndexOperation) {
+            // No-op.
+        }
+    }
+
+    /**
+     * Handle local DDL operation result from <b>a peer node</b> on <b>the coordinator</b>.
+     *
+     * @param opId DDL operation ID.
+     * @param err Exception that occurred on the <b>peer</b>, or null if the local operation has been successful.
+     */
+    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "SynchronizationOnLocalVariableOrMethodParameter", "ForLoopReplaceableByForEach"})
+    private void onNodeResult(IgniteUuid opId, IgniteCheckedException err) {
+        // No-op.
+    }
+
+    /**
+     * Process result of executing {@link DdlInitDiscoveryMessage} and react accordingly.
+     * Called from {@link DdlInitDiscoveryMessage#ackMessage()}.
+     *
+     * @param msg {@link DdlInitDiscoveryMessage} message.
+     * @return {@link DiscoveryCustomMessage} to return from {@link DdlInitDiscoveryMessage#ackMessage()}.
+     */
+    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "UnnecessaryInitCause"})
+    public DiscoveryCustomMessage onInitFinished(DdlInitDiscoveryMessage msg) {
+        Map<UUID, IgniteCheckedException> nodesState = msg.nodeState();
+
+        assert nodesState != null;
+
+        Map<UUID, IgniteCheckedException> errors = new HashMap<>();
+
+        for (Map.Entry<UUID, IgniteCheckedException> e : nodesState.entrySet())
+            if (e.getValue() != null)
+                errors.put(e.getKey(), e.getValue());
+
+        if (!errors.isEmpty()) {
+            IgniteCheckedException resEx = new IgniteCheckedException("DDL operation has been cancelled at INIT stage");
+
+            if (errors.size() > 1) {
+                for (IgniteCheckedException e : errors.values())
+                    resEx.addSuppressed(e);
+            }
+            else
+                resEx.initCause(errors.values().iterator().next());
+
+            sendResult(msg.operation(), resEx);
+
+            return null;
+        }
+        else
+            return new DdlAckDiscoveryMessage(msg.operation(), msg.nodeState().keySet());
+    }
+
+    /**
+     * Notify client about result.
+     *
+     * @param args Operation arguments.
+     * @param err Error, if any.
+     */
+    private void sendResult(DdlAbstractOperation args, IgniteCheckedException err) {
+        assert args != null;
+
+        DdlOperationResult res = new DdlOperationResult();
+
+        res.setError(exceptionToBytes(err));
+        res.setOperationId(args.operationId());
+
+        try {
+            ctx.io().send(args.clientNodeId(), GridTopic.TOPIC_QUERY, res, GridIoPolicy.IDX_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            idx.getLogger().error("Failed to notify client node about DDL operation failure " +
+                "[opId=" + args.operationId() + ", clientNodeId=" + args.clientNodeId() + ']', e);
+        }
+    }
+
+    /**
+     * Callback handling whole DDL operation result <b>on the client</b>.
+     *
+     * @param opId DDL operation ID.
+     * @param err Error, if any.
+     */
+    @SuppressWarnings("unchecked")
+    private void onResult(IgniteUuid opId, IgniteCheckedException err) {
+        GridFutureAdapter fut = operations.get(opId);
+
+        if (fut == null) {
+            idx.getLogger().warning("DDL operation not found at its client [opId=" + opId + ", nodeId=" +
+                ctx.localNodeId() + ']');
+
+            return;
+        }
+
+        fut.onDone(null, err);
+    }
+
+    /**
+     * Perform preliminary actions and checks for {@code INIT} stage of DDL statement execution <b>on a peer node</b>.
+     *
+     * @param msg {@code INIT} message.
+     */
+    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+    private void onInit(DdlInitDiscoveryMessage msg) {
+        try {
+            // Let's tell everyone that we're participating if our init is successful...
+            if (doInit(msg.operation()))
+                msg.nodeState().put(ctx.localNodeId(), null);
+        }
+        catch (Throwable e) {
+            // Or tell everyone about the error that occurred
+            msg.nodeState().put(ctx.localNodeId(), wrapThrowableIfNeeded(e));
+        }
+    }
+
+    /**
+     * Perform actual INIT actions.
+     * Exists as a separate method to allow overriding it in tests to check behavior in case of errors.
+     *
+     * @param args Operation arguments.
+     * @return Whether this node participates in this operation, or not.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings("unchecked")
+    boolean doInit(DdlAbstractOperation args) throws IgniteCheckedException {
+        if (args instanceof DdlCreateIndexOperation) {
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Optionally wrap a {@link Throwable} into an {@link IgniteCheckedException}.
+     *
+     * @param e Throwable to wrap.
+     * @return {@code e} if it's an {@link IgniteCheckedException} or an {@link IgniteCheckedException} wrapping it.
+     */
+    private static IgniteCheckedException wrapThrowableIfNeeded(Throwable e) {
+        if (e instanceof IgniteCheckedException)
+            return (IgniteCheckedException) e;
+        else
+            return new IgniteCheckedException(e);
+    }
+
+    /**
+     * Do cleanup.
+     */
+    public void stop() throws IgniteCheckedException {
+        if (!isStopped.compareAndSet(false, true))
+            throw new IgniteCheckedException(new IllegalStateException("DDL processor has been stopped already"));
+
+        for (Map.Entry<IgniteUuid, GridFutureAdapter> e : operations.entrySet())
+            e.getValue().onDone(new IgniteCheckedException("Operation has been cancelled [opId=" + e.getKey() +']'));
+    }
+
+    /**
+     * Execute DDL statement.
+     *
+     * @param stmt H2 statement to parse and execute.
+     */
+    public QueryCursor<List<?>> runDdlStatement(PreparedStatement stmt)
+        throws IgniteCheckedException {
+        if (isStopped.get())
+            throw new IgniteCheckedException(new IllegalStateException("DDL processor has been stopped"));
+
+        assert stmt instanceof JdbcPreparedStatement;
+
+        GridSqlStatement gridStmt = new GridSqlQueryParser().parse(GridSqlQueryParser
+            .prepared((JdbcPreparedStatement) stmt));
+
+        DdlAbstractOperation op;
+
+        if (gridStmt instanceof GridSqlCreateIndex) {
+            GridSqlCreateIndex createIdx = (GridSqlCreateIndex) gridStmt;
+
+            op = new DdlCreateIndexOperation(IgniteUuid.randomUuid(), ctx.localNodeId(), createIdx.index(),
+                createIdx.schemaName(), createIdx.tableName(), createIdx.ifNotExists());
+        }
+        else if (gridStmt instanceof GridSqlDropIndex)
+            throw new UnsupportedOperationException("DROP INDEX");
+        else
+            throw new IgniteSQLException("Unexpected DDL operation [type=" + gridStmt.getClass() + ']',
+                IgniteQueryErrorCode.UNEXPECTED_OPERATION);
+
+        GridFutureAdapter opFut = new GridFutureAdapter();
+
+        operations.put(op.operationId(), opFut);
+
+        try {
+            ctx.discovery().sendCustomEvent(new DdlInitDiscoveryMessage(op));
+
+            opFut.get();
+        }
+        finally {
+            operations.remove(op.operationId());
+        }
+
+        return DmlStatementsProcessor.cursorForUpdateResult(0L);
+    }
+
+    /**
+     * Serialize exception or at least its message to bytes.
+     *
+     * @param ex Exception.
+     * @return Serialized exception.
+     */
+    private byte[] exceptionToBytes(IgniteCheckedException ex) {
+        if (ex == null)
+            return null;
+
+        try {
+            return U.marshal(ctx, ex);
+        }
+        catch (IgniteCheckedException e) {
+            IgniteCheckedException resEx;
+
+            // Let's try to serialize at least the message
+            try {
+                resEx = new IgniteCheckedException("Failed to serialize exception " +
+                    "[msg=" + ex.getMessage() + ']');
+            }
+            catch (Throwable ignored) {
+                resEx = new IgniteCheckedException("Failed to serialize exception");
+            }
+
+            try {
+                return U.marshal(ctx, resEx);
+            }
+            catch (IgniteCheckedException exx) {
+                // Why would it fail? We've sanitized it...
+                throw new AssertionError(exx);
+            }
+        }
+    }
+
+    /**
+     * Deserialize exception from bytes.
+     *
+     * @param ex Exception.
+     * @return Serialized exception.
+     */
+    private IgniteCheckedException bytesToException(byte[] ex) {
+        if (ex == null)
+            return null;
+
+        try {
+            return U.unmarshal(ctx, ex, U.resolveClassLoader(ctx.config()));
+        }
+        catch (Throwable e) {
+            return new IgniteCheckedException("Failed to deserialize exception", e);
+        }
+    }
+
+    /**
+     * @param cmd Statement.
+     * @return Whether {@code cmd} is a DDL statement we're able to handle.
+     */
+    public static boolean isDdlStatement(Prepared cmd) {
+        return cmd instanceof CreateIndex || cmd instanceof DropIndex;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAbstractDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAbstractDiscoveryMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAbstractDiscoveryMessage.java
new file mode 100644
index 0000000..f9fd641
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAbstractDiscoveryMessage.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.ddl.msg;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.query.h2.ddl.DdlAbstractOperation;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Abstract discovery message.
+ */
+public abstract class DdlAbstractDiscoveryMessage implements DiscoveryCustomMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** ID */
+    private final IgniteUuid id = IgniteUuid.randomUuid();
+
+    /** Operation. */
+    private final DdlAbstractOperation op;
+
+    /**
+     * Constructor.
+     *
+     * @param op Operation.
+     */
+    protected DdlAbstractDiscoveryMessage(DdlAbstractOperation op) {
+        this.op = op;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /**
+     * @return Operation.
+     */
+    public DdlAbstractOperation operation() {
+        return op;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DdlAbstractDiscoveryMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAckDiscoveryMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAckDiscoveryMessage.java
new file mode 100644
index 0000000..f33cece
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAckDiscoveryMessage.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.ddl.msg;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.query.h2.ddl.DdlAbstractOperation;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * {@code ACK} message - triggers actual execution of local portion of DDL operation.
+ */
+public class DdlAckDiscoveryMessage extends DdlAbstractDiscoveryMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Ids of participating nodes. */
+    private final Set<UUID> nodeIds;
+
+    /**
+     * Constructor.
+     *
+     * @param op Operation.
+     * @param nodeIds Ids of participating nodes.
+     */
+    public DdlAckDiscoveryMessage(DdlAbstractOperation op, Set<UUID> nodeIds) {
+        super(op);
+        this.nodeIds = nodeIds;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /**
+     * @return Ids of participating nodes.
+     */
+    public Set<UUID> nodeIds() {
+        return nodeIds;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DdlAckDiscoveryMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java
new file mode 100644
index 0000000..753eb0c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.ddl.msg;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.ContextAware;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.ddl.DdlAbstractOperation;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@code INIT} part of a distributed DDL operation.
+ */
+public class DdlInitDiscoveryMessage extends DdlAbstractDiscoveryMessage implements ContextAware {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Kernal context. */
+    @GridToStringExclude
+    private transient GridKernalContext ctx;
+
+    /**
+     * Map {@code node id} -> {@code init exception, if any}.
+     * If this field is null, then this message is being processed by coordinator.
+     * Note that this map not just helps to track errors but also contains node ids eligible for the operation
+     * filtered at coordinator, so its key set is important.
+     */
+    private Map<UUID, IgniteCheckedException> nodesState = new HashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param op Operation.
+     */
+    public DdlInitDiscoveryMessage(DdlAbstractOperation op) {
+        super(op);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return ((IgniteH2Indexing)ctx.query().getIndexing()).getDdlStatementsProcessor().onInitFinished(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return true;
+    }
+
+    /**
+     * @return Nodes state.
+     */
+    public Map<UUID, IgniteCheckedException> nodeState() {
+        return nodesState;
+    }
+
+    /**
+     * @param nodesState Nodes state.
+     */
+    public void nodeState(Map<UUID, IgniteCheckedException> nodesState) {
+        this.nodesState = nodesState;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void context(GridKernalContext ctx) {
+        this.ctx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DdlInitDiscoveryMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridCreateIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridCreateIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridCreateIndex.java
deleted file mode 100644
index 56c7dd3..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridCreateIndex.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.sql;
-
-import java.util.Map;
-import org.apache.ignite.cache.QueryIndex;
-import org.apache.ignite.cache.QueryIndexType;
-import org.h2.command.Parser;
-
-/**
- * CREATE INDEX statement.
- */
-public class GridCreateIndex extends GridSqlStatement {
-    /** Schema name. */
-    private String schemaName;
-
-    /** Table name. */
-    private String tblName;
-
-    /** Attempt to create the index only if it does not exist. */
-    private boolean ifNotExists;
-
-    /** Index to create. */
-    private QueryIndex idx;
-
-    /**
-     * @return Schema name for new index.
-     */
-    public String schemaName() {
-        return schemaName;
-    }
-
-    /**
-     * @param schemaName Schema name for new index.
-     */
-    public void schemaName(String schemaName) {
-        this.schemaName = schemaName;
-    }
-
-    /**
-     * @return Table name.
-     */
-    public String tableName() {
-        return tblName;
-    }
-
-    /**
-     * @param tblName Table name.
-     */
-    public void tableName(String tblName) {
-        this.tblName = tblName;
-    }
-
-    /**
-     * @return whether attempt to create the index should be made only if it does not exist.
-     */
-    public boolean ifNotExists() {
-        return ifNotExists;
-    }
-
-    /**
-     * @param ifNotExists whether attempt to create the index should be made only if it does not exist.
-     */
-    public void ifNotExists(boolean ifNotExists) {
-        this.ifNotExists = ifNotExists;
-    }
-
-    /**
-     * @return Index to create.
-     */
-    public QueryIndex index() {
-        return idx;
-    }
-
-    /**
-     * @param idx Index to create.
-     */
-    public void index(QueryIndex idx) {
-        this.idx = idx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getSQL() {
-        StringBuilder sb = new StringBuilder("CREATE ")
-            .append(idx.getIndexType() == QueryIndexType.GEOSPATIAL ? "SPATIAL " : "")
-            .append("INDEX ").append(ifNotExists ? "IF NOT EXISTS " : "")
-            .append(Parser.quoteIdentifier(schemaName)).append('.')
-            .append(Parser.quoteIdentifier(idx.getName())).append(" ON ")
-            .append(Parser.quoteIdentifier(tblName)).append(" (");
-
-        boolean first = true;
-
-        for (Map.Entry<String, Boolean> e : idx.getFields().entrySet()) {
-            if (first)
-                first = false;
-            else
-                sb.append(", ");
-
-            sb.append(Parser.quoteIdentifier(e.getKey())).append(e.getValue() ? " ASC" : " DESC");
-        }
-
-        sb.append(')');
-
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridDropIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridDropIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridDropIndex.java
deleted file mode 100644
index 07c39df..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridDropIndex.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.sql;
-
-import org.h2.command.Parser;
-
-/**
- * DROP INDEX statement.
- */
-public class GridDropIndex extends GridSqlStatement {
-    /** Index name. */
-    private String name;
-
-    /** Schema name. */
-    private String schemaName;
-
-    /** Attempt to drop the index only if it exists. */
-    private boolean ifExists;
-
-    /**
-     * @return Index name.
-     */
-    public String name() {
-        return name;
-    }
-
-    /**
-     * @param name Index name.
-     */
-    public void name(String name) {
-        this.name = name;
-    }
-
-    /**
-     * @return Schema name.
-     */
-    public String schemaName() {
-        return schemaName;
-    }
-
-    /**
-     * @param schemaName Schema name.
-     */
-    public void schemaName(String schemaName) {
-        this.schemaName = schemaName;
-    }
-
-    /**
-     * @return whether attempt to drop the index should be made only if it exists.
-     */
-    public boolean ifExists() {
-        return ifExists;
-    }
-
-    /**
-     * @param ifExists whether attempt to drop the index should be made only if it exists.
-     */
-    public void ifExists(boolean ifExists) {
-        this.ifExists = ifExists;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getSQL() {
-        return "DROP INDEX " + (ifExists ? "IF EXISTS " : "") + Parser.quoteIdentifier(schemaName) + '.' +
-            Parser.quoteIdentifier(name);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlCreateIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlCreateIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlCreateIndex.java
new file mode 100644
index 0000000..50d455c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlCreateIndex.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.sql;
+
+import java.util.Map;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.QueryIndexType;
+import org.h2.command.Parser;
+
+/**
+ * CREATE INDEX statement.
+ */
+public class GridSqlCreateIndex extends GridSqlStatement {
+    /** Schema name. */
+    private String schemaName;
+
+    /** Table name. */
+    private String tblName;
+
+    /** Attempt to create the index only if it does not exist. */
+    private boolean ifNotExists;
+
+    /** Index to create. */
+    private QueryIndex idx;
+
+    /**
+     * @return Schema name for new index.
+     */
+    public String schemaName() {
+        return schemaName;
+    }
+
+    /**
+     * @param schemaName Schema name for new index.
+     */
+    public void schemaName(String schemaName) {
+        this.schemaName = schemaName;
+    }
+
+    /**
+     * @return Table name.
+     */
+    public String tableName() {
+        return tblName;
+    }
+
+    /**
+     * @param tblName Table name.
+     */
+    public void tableName(String tblName) {
+        this.tblName = tblName;
+    }
+
+    /**
+     * @return whether attempt to create the index should be made only if it does not exist.
+     */
+    public boolean ifNotExists() {
+        return ifNotExists;
+    }
+
+    /**
+     * @param ifNotExists whether attempt to create the index should be made only if it does not exist.
+     */
+    public void ifNotExists(boolean ifNotExists) {
+        this.ifNotExists = ifNotExists;
+    }
+
+    /**
+     * @return Index to create.
+     */
+    public QueryIndex index() {
+        return idx;
+    }
+
+    /**
+     * @param idx Index to create.
+     */
+    public void index(QueryIndex idx) {
+        this.idx = idx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getSQL() {
+        StringBuilder sb = new StringBuilder("CREATE ")
+            .append(idx.getIndexType() == QueryIndexType.GEOSPATIAL ? "SPATIAL " : "")
+            .append("INDEX ").append(ifNotExists ? "IF NOT EXISTS " : "")
+            .append(Parser.quoteIdentifier(schemaName)).append('.')
+            .append(Parser.quoteIdentifier(idx.getName())).append(" ON ")
+            .append(Parser.quoteIdentifier(tblName)).append(" (");
+
+        boolean first = true;
+
+        for (Map.Entry<String, Boolean> e : idx.getFields().entrySet()) {
+            if (first)
+                first = false;
+            else
+                sb.append(", ");
+
+            sb.append(Parser.quoteIdentifier(e.getKey())).append(e.getValue() ? " ASC" : " DESC");
+        }
+
+        sb.append(')');
+
+        return sb.toString();
+    }
+}