You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/22 11:13:08 UTC

[22/23] ignite git commit: Cleaning up indexing module.

Cleaning up indexing module.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f39de03c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f39de03c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f39de03c

Branch: refs/heads/ignite-4565-ddl
Commit: f39de03c6d12ae01a77a162f83a6c95742cb8f27
Parents: b35568e
Author: devozerov <vo...@gridgain.com>
Authored: Wed Mar 22 13:04:18 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Mar 22 13:04:18 2017 +0300

----------------------------------------------------------------------
 .../processors/query/h2/IgniteH2Indexing.java   |   6 +-
 .../query/h2/ddl/DdlStatementsProcessor.java    | 466 ++-----------------
 .../h2/ddl/msg/DdlInitDiscoveryMessage.java     |   2 +-
 .../query/h2/ddl/GridDdlProtoTest.java          | 188 --------
 4 files changed, 30 insertions(+), 632 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f39de03c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 07c0650..e61bfd6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1498,7 +1498,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                     if (DdlStatementsProcessor.isDdlStatement(prepared)) {
                         try {
-                            return ddlProc.runDdlStatement(stmt);
+                            return ddlProc.runDdlStatement(cctx.name(), stmt);
                         }
                         catch (IgniteCheckedException e) {
                             throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + sqlQry + ']',
@@ -2200,10 +2200,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             log.debug("Stopping cache query index...");
 
 //        unregisterMBean(); TODO https://issues.apache.org/jira/browse/IGNITE-2139
-
-        if (ddlProc != null)
-            ddlProc.stop();
-
         for (Schema schema : schemas.values())
             schema.onDrop();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f39de03c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
index 8bb831b..32e6da5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
@@ -17,50 +17,27 @@
 
 package org.apache.ignite.internal.processors.query.h2.ddl;
 
-import java.sql.PreparedStatement;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.GridTopic;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
-import org.apache.ignite.internal.managers.discovery.CustomEventListener;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.ddl.DdlOperationNodeResult;
-import org.apache.ignite.internal.processors.query.ddl.DdlOperationResult;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.processors.query.h2.ddl.msg.DdlAckDiscoveryMessage;
-import org.apache.ignite.internal.processors.query.h2.ddl.msg.DdlInitDiscoveryMessage;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlCreateIndex;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDropIndex;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.thread.IgniteThread;
 import org.h2.command.Prepared;
 import org.h2.command.ddl.CreateIndex;
 import org.h2.command.ddl.DropIndex;
 import org.h2.jdbc.JdbcPreparedStatement;
-import org.jetbrains.annotations.Nullable;
+
+import java.sql.PreparedStatement;
+import java.util.Collections;
+import java.util.List;
 
 import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META;
 
@@ -75,15 +52,6 @@ public class DdlStatementsProcessor {
     /** Logger. */
     private IgniteLogger log;
 
-    /** State flag. */
-    private AtomicBoolean isStopped = new AtomicBoolean();
-
-    /** Running operations originating at this node as a client. */
-    private Map<IgniteUuid, GridFutureAdapter> operations = new ConcurrentHashMap<>();
-
-    /** Worker. */
-    private volatile DdlWorker worker;
-
     /**
      * Initialize message handlers and this' fields needed for further operation.
      *
@@ -94,378 +62,49 @@ public class DdlStatementsProcessor {
         this.ctx = ctx;
 
         log = ctx.log(DdlStatementsProcessor.class);
-
-        worker = new DdlWorker(ctx.igniteInstanceName(), log);
-
-        IgniteThread workerThread = new IgniteThread(worker);
-
-        workerThread.setDaemon(true);
-
-        workerThread.start();
-
-        ctx.discovery().setCustomEventListener(DdlInitDiscoveryMessage.class,
-            new CustomEventListener<DdlInitDiscoveryMessage>() {
-            /** {@inheritDoc} */
-            @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
-            @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd,
-                DdlInitDiscoveryMessage msg) {
-                onInit(msg);
-            }
-        });
-
-        ctx.discovery().setCustomEventListener(DdlAckDiscoveryMessage.class,
-            new CustomEventListener<DdlAckDiscoveryMessage>() {
-            /** {@inheritDoc} */
-            @Override public void onCustomEvent(AffinityTopologyVersion topVer, final ClusterNode snd,
-                final DdlAckDiscoveryMessage msg) {
-                submitTask(new DdlTask() {
-                    @Override public void run() {
-                        onAck(snd, msg);
-                    }
-                });
-            }
-        });
-
-        ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
-            /** {@inheritDoc} */
-            @Override public void onMessage(UUID nodeId, Object msg) {
-                if (msg instanceof DdlOperationResult) {
-                    DdlOperationResult res = (DdlOperationResult) msg;
-
-                    onResult(res.getOperationId(), bytesToException(res.getError()));
-                }
-
-                if (msg instanceof DdlOperationNodeResult) {
-                    DdlOperationNodeResult res = (DdlOperationNodeResult) msg;
-
-                    onNodeResult(res.getOperationId(), bytesToException(res.getError()));
-                }
-            }
-        });
-    }
-
-    /**
-     * Submit a task to {@link #worker} for async execution.
-     *
-     * @param task Task.
-     */
-    private void submitTask(DdlTask task) {
-        DdlWorker worker0 = worker;
-
-        if (worker0 != null)
-            worker0.submit(task);
-        else
-            log.debug("Cannot submit DDL task because worker is null (node is stopping): " + task);
-    }
-
-    /**
-     * Handle {@code ACK} message on a <b>peer node</b> - do local portion of actual DDL job and notify
-     * <b>coordinator</b> about success or failure.
-     *
-     * @param snd Sender.
-     * @param msg Message.
-     */
-    @SuppressWarnings({"ThrowableInstanceNeverThrown", "unchecked"})
-    private void onAck(ClusterNode snd, DdlAckDiscoveryMessage msg) {
-        // Don't do anything if we didn't choose to participate.
-        if (!msg.nodeIds().contains(ctx.localNodeId()))
-            return;
-
-        IgniteCheckedException ex = null;
-
-        DdlAbstractOperation args = msg.operation();
-
-        try {
-            doAck(args);
-        }
-        catch (Throwable e) {
-            ex = wrapThrowableIfNeeded(e);
-        }
-
-        try {
-            DdlOperationNodeResult res = new DdlOperationNodeResult();
-
-            res.setOperationId(msg.operation().operationId());
-            res.setError(exceptionToBytes(ex));
-
-            ctx.io().sendToGridTopic(snd, GridTopic.TOPIC_QUERY, res, GridIoPolicy.IDX_POOL);
-        }
-        catch (Throwable e) {
-            U.error(log, "Failed to notify coordinator about local DLL operation completion [opId=" +
-                msg.operation().operationId() + ", clientNodeId=" + snd.id() + ']', e);
-        }
-    }
-
-    /**
-     * Perform local portion of DDL operation.
-     * Exists as a separate method to allow overriding it in tests to check behavior in case of errors.
-     *
-     * @param args Operation arguments.
-     * @throws IgniteCheckedException if failed.
-     */
-    @SuppressWarnings("unchecked")
-    void doAck(DdlAbstractOperation args) throws IgniteCheckedException {
-        if (args instanceof DdlCreateIndexOperation) {
-            // No-op.
-        }
-    }
-
-    /**
-     * Handle local DDL operation result from <b>a peer node</b> on <b>the coordinator</b>.
-     *
-     * @param opId DDL operation ID.
-     * @param err Exception that occurred on the <b>peer</b>, or null if the local operation has been successful.
-     */
-    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "SynchronizationOnLocalVariableOrMethodParameter", "ForLoopReplaceableByForEach"})
-    private void onNodeResult(IgniteUuid opId, IgniteCheckedException err) {
-        // No-op.
-    }
-
-    /**
-     * Process result of executing {@link DdlInitDiscoveryMessage} and react accordingly.
-     * Called from {@link DdlInitDiscoveryMessage#ackMessage()}.
-     *
-     * @param msg {@link DdlInitDiscoveryMessage} message.
-     * @return {@link DiscoveryCustomMessage} to return from {@link DdlInitDiscoveryMessage#ackMessage()}.
-     */
-    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "UnnecessaryInitCause"})
-    public DiscoveryCustomMessage onInitFinished(DdlInitDiscoveryMessage msg) {
-        Map<UUID, IgniteCheckedException> nodesState = msg.nodeState();
-
-        assert nodesState != null;
-
-        Map<UUID, IgniteCheckedException> errors = new HashMap<>();
-
-        for (Map.Entry<UUID, IgniteCheckedException> e : nodesState.entrySet())
-            if (e.getValue() != null)
-                errors.put(e.getKey(), e.getValue());
-
-        if (!errors.isEmpty()) {
-            IgniteCheckedException resEx = new IgniteCheckedException("DDL operation has been cancelled at INIT stage");
-
-            if (errors.size() > 1) {
-                for (IgniteCheckedException e : errors.values())
-                    resEx.addSuppressed(e);
-            }
-            else
-                resEx.initCause(errors.values().iterator().next());
-
-            sendResult(msg.operation(), resEx);
-
-            return null;
-        }
-        else
-            return new DdlAckDiscoveryMessage(msg.operation(), msg.nodeState().keySet());
-    }
-
-    /**
-     * Notify client about result.
-     *
-     * @param args Operation arguments.
-     * @param err Error, if any.
-     */
-    private void sendResult(DdlAbstractOperation args, IgniteCheckedException err) {
-        assert args != null;
-
-        DdlOperationResult res = new DdlOperationResult();
-
-        res.setError(exceptionToBytes(err));
-        res.setOperationId(args.operationId());
-
-        try {
-            ctx.io().sendToGridTopic(args.clientNodeId(), GridTopic.TOPIC_QUERY, res, GridIoPolicy.IDX_POOL);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to notify client node about DDL operation failure " +
-                "[opId=" + args.operationId() + ", clientNodeId=" + args.clientNodeId() + ']', e);
-        }
-    }
-
-    /**
-     * Callback handling whole DDL operation result <b>on the client</b>.
-     *
-     * @param opId DDL operation ID.
-     * @param err Error, if any.
-     */
-    @SuppressWarnings("unchecked")
-    private void onResult(IgniteUuid opId, IgniteCheckedException err) {
-        GridFutureAdapter fut = operations.get(opId);
-
-        if (fut == null) {
-            U.warn(log, "DDL operation not found at its client [opId=" + opId + ", nodeId=" + ctx.localNodeId() + ']');
-
-            return;
-        }
-
-        fut.onDone(null, err);
-    }
-
-    /**
-     * Perform preliminary actions and checks for {@code INIT} stage of DDL statement execution <b>on a peer node</b>.
-     *
-     * @param msg {@code INIT} message.
-     */
-    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
-    private void onInit(DdlInitDiscoveryMessage msg) {
-        try {
-            // Let's tell everyone that we're participating if our init is successful...
-            if (doInit(msg.operation()))
-                msg.nodeState().put(ctx.localNodeId(), null);
-        }
-        catch (Throwable e) {
-            // Or tell everyone about the error that occurred
-            msg.nodeState().put(ctx.localNodeId(), wrapThrowableIfNeeded(e));
-        }
-    }
-
-    /**
-     * Perform actual INIT actions.
-     * Exists as a separate method to allow overriding it in tests to check behavior in case of errors.
-     *
-     * @param args Operation arguments.
-     * @throws IgniteCheckedException if failed.
-     */
-    @SuppressWarnings("unchecked")
-    boolean doInit(DdlAbstractOperation args) throws IgniteCheckedException {
-        if (args instanceof DdlCreateIndexOperation) {
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * Optionally wrap a {@link Throwable} into an {@link IgniteCheckedException}.
-     *
-     * @param e Throwable to wrap.
-     * @return {@code e} if it's an {@link IgniteCheckedException} or an {@link IgniteCheckedException} wrapping it.
-     */
-    private static IgniteCheckedException wrapThrowableIfNeeded(Throwable e) {
-        if (e instanceof IgniteCheckedException)
-            return (IgniteCheckedException) e;
-        else
-            return new IgniteCheckedException(e);
-    }
-
-    /**
-     * Do cleanup.
-     */
-    public void stop() throws IgniteCheckedException {
-        if (!isStopped.compareAndSet(false, true))
-            throw new IgniteCheckedException(new IllegalStateException("DDL processor has been stopped already"));
-
-        DdlWorker worker0 = worker;
-
-        if (worker0 != null) {
-            worker0.cancel();
-
-            worker = null;
-        }
-
-        for (Map.Entry<IgniteUuid, GridFutureAdapter> e : operations.entrySet())
-            e.getValue().onDone(new IgniteCheckedException("Operation has been cancelled [opId=" + e.getKey() +']'));
     }
 
     /**
      * Execute DDL statement.
      *
+     * @param cacheName Cache name.
      * @param stmt H2 statement to parse and execute.
      */
     @SuppressWarnings("unchecked")
-    public QueryCursor<List<?>> runDdlStatement(PreparedStatement stmt)
+    public QueryCursor<List<?>> runDdlStatement(String cacheName, PreparedStatement stmt)
         throws IgniteCheckedException {
-        if (isStopped.get())
-            throw new IgniteCheckedException(new IllegalStateException("DDL processor has been stopped"));
-
         assert stmt instanceof JdbcPreparedStatement;
 
-        GridSqlStatement gridStmt = new GridSqlQueryParser(false).parse(GridSqlQueryParser.prepared(stmt));
-
-        DdlAbstractOperation op;
-
-        if (gridStmt instanceof GridSqlCreateIndex) {
-            GridSqlCreateIndex createIdx = (GridSqlCreateIndex) gridStmt;
-
-            op = new DdlCreateIndexOperation(IgniteUuid.randomUuid(), ctx.localNodeId(), createIdx.index(),
-                createIdx.schemaName(), createIdx.tableName(), createIdx.ifNotExists());
-        }
-        else if (gridStmt instanceof GridSqlDropIndex)
-            throw new UnsupportedOperationException("DROP INDEX");
-        else
-            throw new IgniteSQLException("Unexpected DDL operation [type=" + gridStmt.getClass() + ']',
-                IgniteQueryErrorCode.UNEXPECTED_OPERATION);
-
-        GridFutureAdapter opFut = new GridFutureAdapter();
-
-        operations.put(op.operationId(), opFut);
+        IgniteInternalFuture fut;
 
         try {
-            ctx.discovery().sendCustomEvent(new DdlInitDiscoveryMessage(op));
+            GridSqlStatement gridStmt = new GridSqlQueryParser(false).parse(GridSqlQueryParser.prepared(stmt));
 
-            opFut.get();
-        }
-        finally {
-            operations.remove(op.operationId());
-        }
+            if (gridStmt instanceof GridSqlCreateIndex) {
+                GridSqlCreateIndex createIdx = (GridSqlCreateIndex) gridStmt;
 
-        QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
-            (Collections.singletonList(0L)), null, false);
-
-        resCur.fieldsMeta(UPDATE_RESULT_META);
-
-        return resCur;
-    }
-
-    /**
-     * Serialize exception or at least its message to bytes.
-     *
-     * @param ex Exception.
-     * @return Serialized exception.
-     */
-    private byte[] exceptionToBytes(IgniteCheckedException ex) {
-        if (ex == null)
-            return null;
-
-        try {
-            return U.marshal(ctx, ex);
-        }
-        catch (IgniteCheckedException e) {
-            IgniteCheckedException resEx;
-
-            // Let's try to serialize at least the message
-            try {
-                resEx = new IgniteCheckedException("Failed to serialize exception " +
-                    "[msg=" + ex.getMessage() + ']');
-            }
-            catch (Throwable ignored) {
-                resEx = new IgniteCheckedException("Failed to serialize exception");
+                // TODO: How to handle schema name properly?
+                fut = ctx.cache().dynamicIndexCreate(
+                    cacheName, createIdx.tableName(), createIdx.index(), createIdx.ifNotExists());
             }
+            else if (gridStmt instanceof GridSqlDropIndex)
+                throw new UnsupportedOperationException("DROP INDEX");
+            else
+                throw new IgniteSQLException("Unexpected DDL operation [type=" + gridStmt.getClass() + ']',
+                    IgniteQueryErrorCode.UNEXPECTED_OPERATION);
 
-            try {
-                return U.marshal(ctx, resEx);
-            }
-            catch (IgniteCheckedException exx) {
-                // Why would it fail? We've sanitized it...
-                throw new AssertionError(exx);
-            }
-        }
-    }
+            fut.get();
 
-    /**
-     * Deserialize exception from bytes.
-     *
-     * @param ex Exception.
-     * @return Serialized exception.
-     */
-    private IgniteCheckedException bytesToException(byte[] ex) {
-        if (ex == null)
-            return null;
+            QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
+                (Collections.singletonList(0L)), null, false);
 
-        try {
-            return U.unmarshal(ctx, ex, U.resolveClassLoader(ctx.config()));
+            resCur.fieldsMeta(UPDATE_RESULT_META);
+
+            return resCur;
         }
-        catch (Throwable e) {
-            return new IgniteCheckedException("Failed to deserialize exception", e);
+        catch (Exception e) {
+            // TODO: Proper error handling.
+            throw new IgniteSQLException("DLL operation failed.", e);
         }
     }
 
@@ -476,53 +115,4 @@ public class DdlStatementsProcessor {
     public static boolean isDdlStatement(Prepared cmd) {
         return cmd instanceof CreateIndex || cmd instanceof DropIndex;
     }
-
-    /**
-     * DDL worker.
-     */
-    private class DdlWorker extends GridWorker {
-        /** Worker queue. */
-        private final BlockingQueue<DdlTask> queue = new LinkedBlockingDeque<>();
-
-        /**
-         * Constructor.
-         *
-         * @param gridName Gird name.
-         * @param log Logger.
-         */
-        public DdlWorker(@Nullable String gridName, IgniteLogger log) {
-            super(gridName, "indexing-ddl-worker", log);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            while (!isCancelled()) {
-                DdlTask task = queue.take();
-
-                try {
-                    task.run();
-                }
-                catch (Exception e) {
-                    U.error(log, "Unexpected exception during DDL task processing [task=" + task + ']', e);
-                }
-                catch (Throwable t) {
-                    U.error(log, "Unexpected error during DDL task processing (worker will be stopped) [task=" +
-                        task + ']', t);
-
-                    throw t;
-                }
-            }
-        }
-
-        /**
-         * Submit task.
-         *
-         * @param task Task.
-         */
-        public void submit(DdlTask task) {
-            assert task != null;
-
-            queue.add(task);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f39de03c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java
index 753eb0c..4e00d54 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java
@@ -61,7 +61,7 @@ public class DdlInitDiscoveryMessage extends DdlAbstractDiscoveryMessage impleme
     /** {@inheritDoc} */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
     @Nullable @Override public DiscoveryCustomMessage ackMessage() {
-        return ((IgniteH2Indexing)ctx.query().getIndexing()).getDdlStatementsProcessor().onInitFinished(this);
+        return null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f39de03c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ddl/GridDdlProtoTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ddl/GridDdlProtoTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ddl/GridDdlProtoTest.java
deleted file mode 100644
index d507dff..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ddl/GridDdlProtoTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.ddl;
-
-import java.io.Serializable;
-import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.cache.query.annotations.QuerySqlField;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.processors.query.h2.ddl.msg.DdlInitDiscoveryMessage;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- *
- */
-public class GridDdlProtoTest extends GridCommonAbstractTest {
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override protected void beforeTestsStarted() throws Exception {
-        IgniteH2Indexing.ddlProcCls = DdlProc.class;
-
-        startGridsMultiThreaded(3, true);
-
-        ignite(0).createCache(cacheConfig("S2P", true, false).setIndexedTypes(String.class, Person.class));
-
-        startGrid(getTestIgniteInstanceName(3), getConfiguration().setClientMode(true));
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        super.beforeTest();
-
-        DdlProc.testName = null;
-
-        ignite(0).cache("S2P").clear();
-
-        ignite(0).cache("S2P").put("FirstKey", new Person(1, "John", "White"));
-        ignite(0).cache("S2P").put("SecondKey", new Person(2, "Joe", "Black"));
-        ignite(0).cache("S2P").put("k3", new Person(3, "Sylvia", "Green"));
-        ignite(0).cache("S2P").put("f0u4thk3y", new Person(4, "Jane", "Silver"));
-    }
-
-    /** Test behavior in case of INIT failure (cancel via {@link DdlInitDiscoveryMessage#ackMessage}). */
-    public void testInitFailure() {
-        DdlProc.testName = GridTestUtils.getGridTestName();
-
-        assertCreateIndexThrowsWithMessage("DDL operation has been cancelled at INIT stage", false);
-    }
-
-    /**
-     * Test error handling.
-     *
-     * @param msg Expected message.
-     * @param loc Run query locally on single node.
-     */
-    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    private void assertCreateIndexThrowsWithMessage(String msg, final boolean loc) {
-        final Throwable e = GridTestUtils.assertThrows(null, new Callable<Object>() {
-            /** {@inheritDoc} */
-            @Override public Object call() throws Exception {
-                ignite(3).cache("S2P").query(new SqlFieldsQuery("create index idx on Person(id desc)").setLocal(loc));
-                return null;
-            }
-        }, IgniteSQLException.class, "Failed to execute DDL statement");
-
-        GridTestUtils.assertThrows(null, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                throw (Exception) e.getCause();
-            }
-        }, IgniteCheckedException.class, msg);
-    }
-
-    /**
-     * @param name Cache name.
-     * @param partitioned Partition or replicated cache.
-     * @param escapeSql whether identifiers should be quoted - see {@link CacheConfiguration#setSqlEscapeAll}
-     * @return Cache configuration.
-     */
-    protected static CacheConfiguration cacheConfig(String name, boolean partitioned, boolean escapeSql) {
-        return new CacheConfiguration()
-            .setName(name)
-            .setCacheMode(partitioned ? CacheMode.PARTITIONED : CacheMode.REPLICATED)
-            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
-            .setBackups(1)
-            .setSqlEscapeAll(escapeSql);
-    }
-    
-    /**
-     *
-     */
-    static class Person implements Serializable {
-        /** */
-        public Person(int id, String name, String secondName) {
-            this.id = id;
-            this.name = name;
-            this.secondName = secondName;
-        }
-
-        /** */
-        @QuerySqlField
-        protected int id;
-
-        /** */
-        @QuerySqlField(name = "firstName")
-        protected final String name;
-
-        /** */
-        @QuerySqlField
-        final String secondName;
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            Person person = (Person) o;
-
-            return id == person.id && name.equals(person.name) && secondName.equals(person.secondName);
-
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = id;
-            res = 31 * res + name.hashCode();
-            res = 31 * res + secondName.hashCode();
-            return res;
-        }
-    }
-
-    /**
-     * Custom implementation to test behavior on failure during various stages.
-     */
-    public final static class DdlProc extends DdlStatementsProcessor {
-        /** Name of current test. */
-        private static volatile String testName;
-
-        /** {@inheritDoc} */
-        @Override boolean doInit(DdlAbstractOperation args) {
-            // Let's throw an exception on a single node in the ring
-            if ("InitFailure".equals(testName) && ctx.igniteInstanceName().endsWith("2"))
-                throw new RuntimeException("Hello from DdlProc Init");
-            else
-                try {
-                    return super.doInit(args);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
-                }
-        }
-
-        /** {@inheritDoc}
-         * @param args*/
-        @Override void doAck(DdlAbstractOperation args) {
-            if ("AckFailure".equals(testName) && ctx.igniteInstanceName().endsWith("1"))
-                throw new RuntimeException("Hello from DdlProc Ack");
-            else
-                try {
-                    super.doInit(args);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
-                }
-        }
-    }
-}