You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/03 10:55:51 UTC
[2/2] ignite git commit: IGNITE-4633 Introduced component type for
DDL statements processor.
IGNITE-4633 Introduced component type for DDL statements processor.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c05be524
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c05be524
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c05be524
Branch: refs/heads/ignite-4565-ddl
Commit: c05be524a9b73356d26474aa9962434155d111f3
Parents: 796c933
Author: Alexander Paschenko <al...@gmail.com>
Authored: Fri Mar 3 13:55:41 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 3 13:55:41 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/ContextAware.java | 30 ++
.../communication/GridIoMessageFactory.java | 14 +-
.../discovery/CustomMessageWrapper.java | 10 +-
.../processors/query/GridQueryIndexing.java | 4 +-
.../processors/query/GridQueryProcessor.java | 24 +-
.../query/ddl/DdlOperationNodeResult.java | 136 ++++++
.../query/ddl/DdlOperationResult.java | 134 ++++++
.../ignite/spi/discovery/tcp/ServerImpl.java | 5 +
.../ignite/testframework/GridTestUtils.java | 57 +++
.../query/h2/DdlStatementsProcessor.java | 67 ---
.../query/h2/DmlStatementsProcessor.java | 27 +-
.../processors/query/h2/IgniteH2Indexing.java | 66 ++-
.../query/h2/ddl/DdlAbstractOperation.java | 62 +++
.../query/h2/ddl/DdlCreateIndexOperation.java | 99 +++++
.../query/h2/ddl/DdlStatementsProcessor.java | 424 +++++++++++++++++++
.../h2/ddl/msg/DdlAbstractDiscoveryMessage.java | 63 +++
.../h2/ddl/msg/DdlAckDiscoveryMessage.java | 70 +++
.../h2/ddl/msg/DdlInitDiscoveryMessage.java | 95 +++++
.../query/h2/sql/GridCreateIndex.java | 121 ------
.../processors/query/h2/sql/GridDropIndex.java | 82 ----
.../query/h2/sql/GridSqlCreateIndex.java | 121 ++++++
.../query/h2/sql/GridSqlDropIndex.java | 82 ++++
.../query/h2/sql/GridSqlQueryParser.java | 16 +-
.../query/h2/ddl/GridDdlProtoTest.java | 188 ++++++++
.../query/h2/sql/GridQueryParsingTest.java | 30 +-
25 files changed, 1694 insertions(+), 333 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/internal/ContextAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ContextAware.java b/modules/core/src/main/java/org/apache/ignite/internal/ContextAware.java
new file mode 100644
index 0000000..41b3193
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/ContextAware.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+/**
+ * Context aware entity.
+ */
+public interface ContextAware {
+ /**
+ * Provide this message with a kernal context.
+ *
+ * @param ctx Kernal context.
+ */
+ public void context(GridKernalContext ctx);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6f95400..3a12303 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -118,11 +118,11 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
-import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
import org.apache.ignite.internal.processors.igfs.IgfsAckMessage;
import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage;
@@ -133,6 +133,8 @@ import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerResponse;
import org.apache.ignite.internal.processors.igfs.IgfsSyncMessage;
import org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage;
import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage;
+import org.apache.ignite.internal.processors.query.ddl.DdlOperationNodeResult;
+import org.apache.ignite.internal.processors.query.ddl.DdlOperationResult;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@ -838,6 +840,16 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case -45:
+ msg = new DdlOperationResult();
+
+ break;
+
+ case -46:
+ msg = new DdlOperationNodeResult();
+
+ break;
+
// [-3..119] [124..127] [-36..-44]- this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
index 8f56248..0a12c44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
@@ -17,13 +17,15 @@
package org.apache.ignite.internal.managers.discovery;
+import org.apache.ignite.internal.ContextAware;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;
/**
*
*/
-class CustomMessageWrapper implements DiscoverySpiCustomMessage {
+class CustomMessageWrapper implements DiscoverySpiCustomMessage, ContextAware {
/** */
private static final long serialVersionUID = 0L;
@@ -57,6 +59,12 @@ class CustomMessageWrapper implements DiscoverySpiCustomMessage {
}
/** {@inheritDoc} */
+ @Override public void context(GridKernalContext ctx) {
+ if (delegate instanceof ContextAware)
+ ((ContextAware) delegate).context(ctx);
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return delegate.toString();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index ca04724..276a603 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -234,11 +234,11 @@ public interface GridQueryIndexing {
/**
* Prepare native statement to retrieve JDBC metadata from.
*
- * @param schema Schema.
+ * @param space Schema.
* @param sql Query.
* @return {@link PreparedStatement} from underlying engine to supply metadata to Prepared - most likely H2.
*/
- public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException;
+ public PreparedStatement prepareNativeStatement(String space, String sql) throws SQLException;
/**
* Collect queries that already running more than specified duration.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index e19fbcc..42e7c49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -17,12 +17,11 @@
package org.apache.ignite.internal.processors.query;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.concurrent.TimeUnit;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
@@ -40,6 +39,7 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.cache.Cache;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
@@ -215,6 +215,16 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * @return Indexing.
+ * @throws IgniteException If module is not enabled.
+ */
+ public GridQueryIndexing getIndexing() throws IgniteException {
+ checkxEnabled();
+
+ return idx;
+ }
+
+ /**
* @param cctx Cache context.
* @throws IgniteCheckedException If failed.
*/
@@ -966,14 +976,14 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/**
*
- * @param schema Schema.
+ * @param space Space name.
* @param sql Query.
* @return {@link PreparedStatement} from underlying engine to supply metadata to Prepared - most likely H2.
*/
- public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException {
+ public PreparedStatement prepareNativeStatement(String space, String sql) throws SQLException {
checkxEnabled();
- return idx.prepareNativeStatement(schema, sql);
+ return idx.prepareNativeStatement(space, sql);
}
/**
@@ -1802,7 +1812,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw (IgniteCheckedException)err;
}
- catch (CacheException e) {
+ catch (CacheException | IgniteException e) {
err = e;
throw e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationNodeResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationNodeResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationNodeResult.java
new file mode 100644
index 0000000..a699530
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationNodeResult.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Message sent from <b>peer node</b> to <b>coordinator</b> when local portion of work is done.
+ */
+public class DdlOperationNodeResult implements Message {
+ /** Operation id. */
+ private IgniteUuid opId;
+
+ /** Error bytes. */
+ private byte[] err;
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByteArray("err", err))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeIgniteUuid("opId", opId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ err = reader.readByteArray("err");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ opId = reader.readIgniteUuid("opId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(DdlOperationNodeResult.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -46;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /**
+ * @return Whole DDL operation ID.
+ */
+ public IgniteUuid getOperationId() {
+ return opId;
+ }
+
+ /**
+ * @param operationId Whole DDL operation ID.
+ */
+ public void setOperationId(IgniteUuid operationId) {
+ this.opId = operationId;
+ }
+
+ /**
+ * @return Error, if any.
+ */
+ public byte[] getError() {
+ return err;
+ }
+
+ /**
+ * @param err Error, if any.
+ */
+ public void setError(byte[] err) {
+ this.err = err;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationResult.java
new file mode 100644
index 0000000..a6904a9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/DdlOperationResult.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Message sent from <b>coordinator</b> to <b>client</b> when operation is ultimately finished.
+ */
+public class DdlOperationResult implements Message {
+ /** Operation id. */
+ private IgniteUuid opId;
+
+ /** Error bytes. */
+ private byte[] err;
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByteArray("err", err))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeIgniteUuid("opId", opId))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ err = reader.readByteArray("err");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ opId = reader.readIgniteUuid("opId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(DdlOperationResult.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -45;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /**
+ * @return Whole DDL operation ID.
+ */
+ public IgniteUuid getOperationId() {
+ return opId;
+ }
+
+ /**
+ * @param operationId Whole DDL operation ID.
+ */
+ public void setOperationId(IgniteUuid operationId) {
+ this.opId = operationId;
+ }
+
+ /**
+ * @return Error, if any.
+ */
+ public byte[] getError() {
+ return err;
+ }
+
+ /**
+ * @param err Error, if any.
+ */
+ public void setError(byte[] err) {
+ this.err = err;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 562b4c3..8adf062 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -63,11 +63,13 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.ContextAware;
import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
@@ -5139,6 +5141,9 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msgObj != null) {
+ if (msgObj instanceof ContextAware)
+ ((ContextAware)msgObj).context(((IgniteEx)spi.ignite()).context());
+
DiscoverySpiCustomMessage nextMsg = msgObj.ackMessage();
if (nextMsg != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 0ae6575..90b63ba 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -58,6 +58,7 @@ import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import junit.framework.Test;
+import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -99,6 +100,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
import org.apache.ignite.ssl.SslContextFactory;
import org.apache.ignite.testframework.config.GridTestProperties;
+import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -853,6 +855,61 @@ public final class GridTestUtils {
}
/**
+ * @return Name of current test based on stack trace. Works when called from JUnit thread and from test method.
+ */
+ public static String getGridTestName() {
+ List<StackTraceElement> trace = Arrays.asList(Thread.currentThread().getStackTrace());
+
+ Collections.reverse(trace);
+
+ int i = 0;
+
+ for (; i < trace.size(); i++) {
+ StackTraceElement e = trace.get(i);
+
+ if (e.getClassName().equals(TestCase.class.getName()) && e.getMethodName().equals("runTest"))
+ break;
+ }
+
+ if (++i >= trace.size())
+ throw new RuntimeException("JUnit TestCase.runTest not found on stack trace");
+
+ for (; i < trace.size() - 1; i++) {
+ StackTraceElement e = trace.get(i);
+
+ String clsName = e.getClassName();
+
+ // Skip reflection related stuff
+ if (!clsName.startsWith("java.lang.") && !clsName.startsWith("sun.reflect."))
+ break;
+ }
+
+ if (i == trace.size())
+ throw new RuntimeException("Non JDK related method not found on stack trace");
+
+ StackTraceElement e = trace.get(i);
+
+ Class<?> cls;
+
+ try {
+ cls = Class.forName(e.getClassName());
+ }
+ catch (ClassNotFoundException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ if (!GridAbstractTest.class.isAssignableFrom(cls))
+ throw new RuntimeException("Grid test not found on stack trace");
+
+ String mtdName = e.getMethodName();
+
+ if (!mtdName.startsWith("test"))
+ throw new RuntimeException("GridTestUtils.getGridTestName must be called from test method");
+
+ return mtdName.substring(4);
+ }
+
+ /**
* @param <T> Type.
* @param cls Class.
* @param annCls Annotation class.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DdlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DdlStatementsProcessor.java
deleted file mode 100644
index 462ab01..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DdlStatementsProcessor.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.util.List;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.QueryIndex;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.h2.sql.GridCreateIndex;
-import org.apache.ignite.internal.processors.query.h2.sql.GridDropIndex;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
-import org.h2.command.Prepared;
-
-/**
- * Logic to execute DDL statements.
- */
-class DdlStatementsProcessor {
- /**
- * Indexing.
- */
- private final IgniteH2Indexing indexing;
-
- /** */
- DdlStatementsProcessor(IgniteH2Indexing indexing) {
- this.indexing = indexing;
- }
-
- /**
- * Execute DDL statement.
- *
- * @param cctx Cache context.
- * @param stmt H2 statement to parse and execute.
- */
- QueryCursor<List<?>> runDdlStatement(GridCacheContext<?, ?> cctx, Prepared stmt) throws IgniteCheckedException {
- GridSqlStatement gridStmt = new GridSqlQueryParser().parse(stmt);
-
- if (gridStmt instanceof GridCreateIndex) {
- QueryIndex newIdx = ((GridCreateIndex) gridStmt).index();
-
- throw new UnsupportedOperationException("CREATE INDEX");
- }
- else if (gridStmt instanceof GridDropIndex)
- throw new UnsupportedOperationException("DROP INDEX");
- else
- throw new IgniteSQLException("Unexpected DDL operation [type=" + gridStmt.getClass() + ']',
- IgniteQueryErrorCode.UNEXPECTED_OPERATION);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 799ce39..352b013 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -52,7 +52,6 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
@@ -94,7 +93,7 @@ public class DmlStatementsProcessor {
private final static int DFLT_DML_RERUN_ATTEMPTS = 4;
/** Indexing. */
- private final IgniteH2Indexing indexing;
+ private IgniteH2Indexing idx;
/** Set of binary type ids for which warning about missing identity in configuration has been printed. */
private final static Set<Integer> WARNED_TYPES =
@@ -106,15 +105,11 @@ public class DmlStatementsProcessor {
/** Update plans cache. */
private final ConcurrentMap<String, ConcurrentMap<String, UpdatePlan>> planCache = new ConcurrentHashMap<>();
- /** Dummy metadata for update result. */
- private final static List<GridQueryFieldMetadata> UPDATE_RESULT_META = Collections.<GridQueryFieldMetadata>
- singletonList(new IgniteH2Indexing.SqlFieldMetadata(null, null, "UPDATED", Long.class.getName()));
-
/**
- * @param indexing indexing.
+ * @param idx indexing.
*/
- DmlStatementsProcessor(IgniteH2Indexing indexing) {
- this.indexing = indexing;
+ public void start(IgniteH2Indexing idx) {
+ this.idx = idx;
}
/**
@@ -208,7 +203,7 @@ public class DmlStatementsProcessor {
SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
long res = updateSqlFields(spaceName, stmt, fieldsQry, true, filters, cancel);
- return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META,
+ return new GridQueryFieldsResultAdapter(IgniteH2Indexing.UPDATE_RESULT_META,
new IgniteSingletonIterator(Collections.singletonList(res)));
}
@@ -259,10 +254,10 @@ public class DmlStatementsProcessor {
.setPageSize(fieldsQry.getPageSize())
.setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS);
- cur = (QueryCursorImpl<List<?>>) indexing.queryTwoStep(cctx, newFieldsQry, cancel);
+ cur = (QueryCursorImpl<List<?>>) idx.queryTwoStep(cctx, newFieldsQry, cancel);
}
else {
- final GridQueryFieldsResult res = indexing.queryLocalSqlFields(cctx.name(), plan.selectQry, F.asList(params),
+ final GridQueryFieldsResult res = idx.queryLocalSqlFields(cctx.name(), plan.selectQry, F.asList(params),
filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel);
cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@@ -399,7 +394,7 @@ public class DmlStatementsProcessor {
while (it.hasNext()) {
List<?> e = it.next();
if (e.size() != 2) {
- U.warn(indexing.getLogger(), "Invalid row size on DELETE - expected 2, got " + e.size());
+ U.warn(idx.getLogger(), "Invalid row size on DELETE - expected 2, got " + e.size());
continue;
}
@@ -900,7 +895,7 @@ public class DmlStatementsProcessor {
private BinaryObject updateHashCodeIfNeeded(GridCacheContext cctx, BinaryObject binObj) {
if (U.isHashCodeEmpty(binObj)) {
if (WARNED_TYPES.add(binObj.type().typeId()))
- U.warn(indexing.getLogger(), "Binary object's type does not have identity resolver explicitly set, therefore " +
+ U.warn(idx.getLogger(), "Binary object's type does not have identity resolver explicitly set, therefore " +
"BinaryArrayIdentityResolver is used to generate hash codes for its instances, and therefore " +
"hash code of this binary object will most likely not match that of its non serialized form. " +
"For finer control over identity of this type, please update your BinaryConfiguration accordingly." +
@@ -1011,11 +1006,11 @@ public class DmlStatementsProcessor {
* @return Resulting Iterable.
*/
@SuppressWarnings("unchecked")
- private static QueryCursorImpl<List<?>> cursorForUpdateResult(long itemsCnt) {
+ public static QueryCursorImpl<List<?>> cursorForUpdateResult(long itemsCnt) {
QueryCursorImpl<List<?>> res =
new QueryCursorImpl(Collections.singletonList(Collections.singletonList(itemsCnt)), null, false);
- res.fieldsMeta(UPDATE_RESULT_META);
+ res.fieldsMeta(IgniteH2Indexing.UPDATE_RESULT_META);
return res;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index bb04dcc..ad731ff 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -82,7 +82,6 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -91,7 +90,9 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
@@ -212,6 +213,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
";ROW_FACTORY=\"" + GridH2RowFactory.class.getName() + "\"" +
";DEFAULT_TABLE_ENGINE=" + GridH2DefaultTableEngine.class.getName();
+ /** Dummy metadata for update result. */
+ static final List<GridQueryFieldMetadata> UPDATE_RESULT_META = Collections.<GridQueryFieldMetadata>
+ singletonList(new SqlFieldMetadata(null, null, "UPDATED", Long.class.getName()));
+
/** */
private static final int PREPARED_STMT_CACHE_SIZE = 256;
@@ -262,6 +267,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
+ /** For tests. */
+ public static Class<? extends DdlStatementsProcessor> ddlProcCls;
+
/** Logger. */
@LoggerResource
private IgniteLogger log;
@@ -345,10 +353,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private volatile GridKernalContext ctx;
/** */
- private final DmlStatementsProcessor dmlProc = new DmlStatementsProcessor(this);
+ private final DmlStatementsProcessor dmlProc = new DmlStatementsProcessor();
/** */
- private final DdlStatementsProcessor ddlProc = new DdlStatementsProcessor(this);
+ private DdlStatementsProcessor ddlProc;
/** */
private final ConcurrentMap<String, GridH2Table> dataTables = new ConcurrentHashMap8<>();
@@ -380,6 +388,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * @return DDL statements processor.
+ */
+ public DdlStatementsProcessor getDdlStatementsProcessor() {
+ return ddlProc;
+ }
+
+ /**
* @param space Space.
* @return Connection.
*/
@@ -395,7 +410,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* @return Logger.
*/
- IgniteLogger getLogger() {
+ public IgniteLogger getLogger() {
return log;
}
@@ -442,8 +457,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException {
- return prepareStatement(connectionForSpace(schema), sql, false);
+ @Override public PreparedStatement prepareNativeStatement(String space, String sql) throws SQLException {
+ return prepareStatement(connectionForSpace(space), sql, true);
}
/**
@@ -817,8 +832,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
@SuppressWarnings("unchecked")
@Override public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry,
@Nullable final Collection<Object> params, final IndexingQueryFilter filters, boolean enforceJoinOrder,
- final int timeout, final GridQueryCancel cancel)
- throws IgniteCheckedException {
+ final int timeout, final GridQueryCancel cancel) throws IgniteCheckedException {
final Connection conn = connectionForSpace(spaceName);
setupConnection(conn, false, enforceJoinOrder);
@@ -827,7 +841,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
Prepared p = GridSqlQueryParser.prepared((JdbcPreparedStatement)stmt);
- if (!p.isQuery()) {
+ if (DmlStatementsProcessor.isDmlStatement(p)) {
SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry);
if (params != null)
@@ -836,8 +850,17 @@ public class IgniteH2Indexing implements GridQueryIndexing {
fldsQry.setEnforceJoinOrder(enforceJoinOrder);
fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
- return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel);
+ try {
+ return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSQLException("Failed to execute DML statement [stmt=" + qry + ", params=" +
+ Arrays.deepToString(fldsQry.getArgs()) + "]", e);
+ }
}
+ else if (DdlStatementsProcessor.isDdlStatement(p))
+ throw new IgniteSQLException("DDL statements are supported for the whole cluster only",
+ IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
List<GridQueryFieldMetadata> meta;
@@ -1296,7 +1319,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
IgniteQueryErrorCode.STMT_TYPE_MISMATCH);
if (!prepared.isQuery()) {
- if (dmlProc.isDmlStatement(prepared)) {
+ if (DmlStatementsProcessor.isDmlStatement(prepared)) {
try {
return dmlProc.updateSqlFieldsTwoStep(cctx.namexx(), stmt, qry, cancel);
}
@@ -1305,13 +1328,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
Arrays.deepToString(qry.getArgs()) + "]", e);
}
}
- else {
+
+ if (DdlStatementsProcessor.isDdlStatement(prepared)) {
try {
- return ddlProc.runDdlStatement(cctx, prepared);
+ return ddlProc.runDdlStatement(stmt);
}
catch (IgniteCheckedException e) {
- throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + sqlQry + ", params=" +
- Arrays.deepToString(qry.getArgs()) + "]", e);
+ throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + sqlQry + ']', e);
}
}
}
@@ -1855,6 +1878,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
cleanupStatementCache();
}
}, CLEANUP_STMT_CACHE_PERIOD, CLEANUP_STMT_CACHE_PERIOD);
+
+ try {
+ ddlProc = ddlProcCls == null ? new DdlStatementsProcessor() : ddlProcCls.newInstance();
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException("Failed to initialize DDL statements processor", e);
+ }
+
+ dmlProc.start(this);
+ ddlProc.start(ctx, this);
}
// TODO https://issues.apache.org/jira/browse/IGNITE-2139
@@ -2002,6 +2035,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
// unregisterMBean(); TODO https://issues.apache.org/jira/browse/IGNITE-2139
+ if (ddlProc != null)
+ ddlProc.stop();
+
for (Schema schema : schemas.values())
schema.onDrop();
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlAbstractOperation.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlAbstractOperation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlAbstractOperation.java
new file mode 100644
index 0000000..ec73e7e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlAbstractOperation.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.ddl;
+
+import org.apache.ignite.lang.IgniteUuid;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ * DDL operation.
+ */
+public abstract class DdlAbstractOperation implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Operation ID. */
+ private final IgniteUuid opId;
+
+ /** ID of node that initiated this operation. */
+ private final UUID cliNodeId;
+
+ /**
+ * Constructor.
+ *
+ * @param opId Operation ID.
+ * @param cliNodeId Client node ID.
+ */
+ public DdlAbstractOperation(IgniteUuid opId, UUID cliNodeId) {
+ this.opId = opId;
+ this.cliNodeId = cliNodeId;
+ }
+
+ /**
+ * @return Operation id.
+ */
+ public IgniteUuid operationId() {
+ return opId;
+ }
+
+ /**
+ * @return Client node ID.
+ */
+ public UUID clientNodeId() {
+ return cliNodeId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlCreateIndexOperation.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlCreateIndexOperation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlCreateIndexOperation.java
new file mode 100644
index 0000000..88c980d
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlCreateIndexOperation.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.ddl;
+
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+import java.util.UUID;
+
+/**
+ * Arguments for {@code CREATE INDEX}.
+ */
+public class DdlCreateIndexOperation extends DdlAbstractOperation {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Index params. */
+ @GridToStringInclude
+ private final QueryIndex idx;
+
+ /** Cache name. */
+ private final String schemaName;
+
+ /** Table name. */
+ private final String tblName;
+
+ /** Ignore operation if index exists. */
+ private final boolean ifNotExists;
+
+ /**
+ * Constructor.
+ *
+ * @param opId Operation id.
+ * @param cliNodeId Id of node that initiated this operation.
+ * @param idx Index params.
+ * @param schemaName Schema name.
+ * @param tblName Table name.
+ * @param ifNotExists Ignore operation if index exists.
+ */
+ DdlCreateIndexOperation(IgniteUuid opId, UUID cliNodeId, QueryIndex idx, String schemaName, String tblName,
+ boolean ifNotExists) {
+ super(opId, cliNodeId);
+
+ this.idx = idx;
+ this.schemaName = schemaName;
+ this.tblName = tblName;
+ this.ifNotExists = ifNotExists;
+ }
+
+ /**
+ * @return Index params.
+ */
+ public QueryIndex index() {
+ return idx;
+ }
+
+ /**
+ * @return Schema name.
+ */
+ public String schemaName() {
+ return schemaName;
+ }
+
+ /**
+ * @return Table name.
+ */
+ public String tableName() {
+ return tblName;
+ }
+
+ /**
+ * @return Ignore operation if index exists.
+ */
+ public boolean ifNotExists() {
+ return ifNotExists;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DdlCreateIndexOperation.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
new file mode 100644
index 0000000..1c4eeaa
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.ddl;
+
+import java.sql.PreparedStatement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.ddl.DdlOperationNodeResult;
+import org.apache.ignite.internal.processors.query.ddl.DdlOperationResult;
+import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessor;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.ddl.msg.DdlAckDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.h2.ddl.msg.DdlInitDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlCreateIndex;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDropIndex;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.h2.command.Prepared;
+import org.h2.command.ddl.CreateIndex;
+import org.h2.command.ddl.DropIndex;
+import org.h2.jdbc.JdbcPreparedStatement;
+
+/**
+ * DDL statements processor.<p>
+ * Contains higher level logic to handle operations as a whole and communicate with the client.
+ */
+public class DdlStatementsProcessor {
+ /** Kernal context. */
+ GridKernalContext ctx;
+
+ /** Indexing engine. */
+ private IgniteH2Indexing idx;
+
+ /** State flag. */
+ private AtomicBoolean isStopped = new AtomicBoolean();
+
+ /** Running operations originating at this node as a client. */
+ private Map<IgniteUuid, GridFutureAdapter> operations = new ConcurrentHashMap<>();
+
+ /**
+ * Initialize message handlers and this' fields needed for further operation.
+ *
+ * @param ctx Kernal context.
+ * @param idx Indexing.
+ */
+ public void start(final GridKernalContext ctx, IgniteH2Indexing idx) {
+ this.ctx = ctx;
+ this.idx = idx;
+
+ ctx.discovery().setCustomEventListener(DdlInitDiscoveryMessage.class, new CustomEventListener<DdlInitDiscoveryMessage>() {
+ /** {@inheritDoc} */
+ @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
+ @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, DdlInitDiscoveryMessage msg) {
+ onInit(msg);
+ }
+ });
+
+ ctx.discovery().setCustomEventListener(DdlAckDiscoveryMessage.class, new CustomEventListener<DdlAckDiscoveryMessage>() {
+ /** {@inheritDoc} */
+ @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, DdlAckDiscoveryMessage msg) {
+ onAck(snd, msg);
+ }
+ });
+
+ ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ if (msg instanceof DdlOperationResult) {
+ DdlOperationResult res = (DdlOperationResult) msg;
+
+ onResult(res.getOperationId(), bytesToException(res.getError()));
+ }
+
+ if (msg instanceof DdlOperationNodeResult) {
+ DdlOperationNodeResult res = (DdlOperationNodeResult) msg;
+
+ onNodeResult(res.getOperationId(), bytesToException(res.getError()));
+ }
+ }
+ });
+ }
+
+ /**
+ * Handle {@code ACK} message on a <b>peer node</b> - do local portion of actual DDL job and notify
+ * <b>coordinator</b> about success or failure.
+ *
+ * @param snd Sender.
+ * @param msg Message.
+ */
+ @SuppressWarnings({"ThrowableInstanceNeverThrown", "unchecked"})
+ private void onAck(ClusterNode snd, DdlAckDiscoveryMessage msg) {
+ // Don't do anything if we didn't choose to participate.
+ if (!msg.nodeIds().contains(ctx.localNodeId()))
+ return;
+
+ IgniteCheckedException ex = null;
+
+ DdlAbstractOperation args = msg.operation();
+
+ try {
+ doAck(args);
+ }
+ catch (Throwable e) {
+ ex = wrapThrowableIfNeeded(e);
+ }
+
+ try {
+ DdlOperationNodeResult res = new DdlOperationNodeResult();
+
+ res.setOperationId(msg.operation().operationId());
+ res.setError(exceptionToBytes(ex));
+
+ ctx.io().send(snd, GridTopic.TOPIC_QUERY, res, GridIoPolicy.IDX_POOL);
+ }
+ catch (Throwable e) {
+ idx.getLogger().error("Failed to notify coordinator about local DLL operation completion [opId=" +
+ msg.operation().operationId() + ", clientNodeId=" + snd.id() + ']', e);
+ }
+ }
+
+ /**
+ * Perform local portion of DDL operation.
+ * Exists as a separate method to allow overriding it in tests to check behavior in case of errors.
+ *
+ * @param args Operation arguments.
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings("unchecked")
+ void doAck(DdlAbstractOperation args) throws IgniteCheckedException {
+ if (args instanceof DdlCreateIndexOperation) {
+ // No-op.
+ }
+ }
+
+ /**
+ * Handle local DDL operation result from <b>a peer node</b> on <b>the coordinator</b>.
+ *
+ * @param opId DDL operation ID.
+ * @param err Exception that occurred on the <b>peer</b>, or null if the local operation has been successful.
+ */
+ @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "SynchronizationOnLocalVariableOrMethodParameter", "ForLoopReplaceableByForEach"})
+ private void onNodeResult(IgniteUuid opId, IgniteCheckedException err) {
+ // No-op.
+ }
+
+ /**
+ * Process result of executing {@link DdlInitDiscoveryMessage} and react accordingly.
+ * Called from {@link DdlInitDiscoveryMessage#ackMessage()}.
+ *
+ * @param msg {@link DdlInitDiscoveryMessage} message.
+ * @return {@link DiscoveryCustomMessage} to return from {@link DdlInitDiscoveryMessage#ackMessage()}.
+ */
+ @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "UnnecessaryInitCause"})
+ public DiscoveryCustomMessage onInitFinished(DdlInitDiscoveryMessage msg) {
+ Map<UUID, IgniteCheckedException> nodesState = msg.nodeState();
+
+ assert nodesState != null;
+
+ Map<UUID, IgniteCheckedException> errors = new HashMap<>();
+
+ for (Map.Entry<UUID, IgniteCheckedException> e : nodesState.entrySet())
+ if (e.getValue() != null)
+ errors.put(e.getKey(), e.getValue());
+
+ if (!errors.isEmpty()) {
+ IgniteCheckedException resEx = new IgniteCheckedException("DDL operation has been cancelled at INIT stage");
+
+ if (errors.size() > 1) {
+ for (IgniteCheckedException e : errors.values())
+ resEx.addSuppressed(e);
+ }
+ else
+ resEx.initCause(errors.values().iterator().next());
+
+ sendResult(msg.operation(), resEx);
+
+ return null;
+ }
+ else
+ return new DdlAckDiscoveryMessage(msg.operation(), msg.nodeState().keySet());
+ }
+
+ /**
+ * Notify client about result.
+ *
+ * @param args Operation arguments.
+ * @param err Error, if any.
+ */
+ private void sendResult(DdlAbstractOperation args, IgniteCheckedException err) {
+ assert args != null;
+
+ DdlOperationResult res = new DdlOperationResult();
+
+ res.setError(exceptionToBytes(err));
+ res.setOperationId(args.operationId());
+
+ try {
+ ctx.io().send(args.clientNodeId(), GridTopic.TOPIC_QUERY, res, GridIoPolicy.IDX_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ idx.getLogger().error("Failed to notify client node about DDL operation failure " +
+ "[opId=" + args.operationId() + ", clientNodeId=" + args.clientNodeId() + ']', e);
+ }
+ }
+
+ /**
+ * Callback handling whole DDL operation result <b>on the client</b>.
+ *
+ * @param opId DDL operation ID.
+ * @param err Error, if any.
+ */
+ @SuppressWarnings("unchecked")
+ private void onResult(IgniteUuid opId, IgniteCheckedException err) {
+ GridFutureAdapter fut = operations.get(opId);
+
+ if (fut == null) {
+ idx.getLogger().warning("DDL operation not found at its client [opId=" + opId + ", nodeId=" +
+ ctx.localNodeId() + ']');
+
+ return;
+ }
+
+ fut.onDone(null, err);
+ }
+
+ /**
+ * Perform preliminary actions and checks for {@code INIT} stage of DDL statement execution <b>on a peer node</b>.
+ *
+ * @param msg {@code INIT} message.
+ */
+ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+ private void onInit(DdlInitDiscoveryMessage msg) {
+ try {
+ // Let's tell everyone that we're participating if our init is successful...
+ if (doInit(msg.operation()))
+ msg.nodeState().put(ctx.localNodeId(), null);
+ }
+ catch (Throwable e) {
+ // Or tell everyone about the error that occurred
+ msg.nodeState().put(ctx.localNodeId(), wrapThrowableIfNeeded(e));
+ }
+ }
+
+ /**
+ * Perform actual INIT actions.
+ * Exists as a separate method to allow overriding it in tests to check behavior in case of errors.
+ *
+ * @param args Operation arguments.
+ * @return Whether this node participates in this operation, or not.
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings("unchecked")
+ boolean doInit(DdlAbstractOperation args) throws IgniteCheckedException {
+ if (args instanceof DdlCreateIndexOperation) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Optionally wrap a {@link Throwable} into an {@link IgniteCheckedException}.
+ *
+ * @param e Throwable to wrap.
+ * @return {@code e} if it's an {@link IgniteCheckedException} or an {@link IgniteCheckedException} wrapping it.
+ */
+ private static IgniteCheckedException wrapThrowableIfNeeded(Throwable e) {
+ if (e instanceof IgniteCheckedException)
+ return (IgniteCheckedException) e;
+ else
+ return new IgniteCheckedException(e);
+ }
+
+ /**
+ * Do cleanup.
+ */
+ public void stop() throws IgniteCheckedException {
+ if (!isStopped.compareAndSet(false, true))
+ throw new IgniteCheckedException(new IllegalStateException("DDL processor has been stopped already"));
+
+ for (Map.Entry<IgniteUuid, GridFutureAdapter> e : operations.entrySet())
+ e.getValue().onDone(new IgniteCheckedException("Operation has been cancelled [opId=" + e.getKey() +']'));
+ }
+
+ /**
+ * Execute DDL statement.
+ *
+ * @param stmt H2 statement to parse and execute.
+ */
+ public QueryCursor<List<?>> runDdlStatement(PreparedStatement stmt)
+ throws IgniteCheckedException {
+ if (isStopped.get())
+ throw new IgniteCheckedException(new IllegalStateException("DDL processor has been stopped"));
+
+ assert stmt instanceof JdbcPreparedStatement;
+
+ GridSqlStatement gridStmt = new GridSqlQueryParser().parse(GridSqlQueryParser
+ .prepared((JdbcPreparedStatement) stmt));
+
+ DdlAbstractOperation op;
+
+ if (gridStmt instanceof GridSqlCreateIndex) {
+ GridSqlCreateIndex createIdx = (GridSqlCreateIndex) gridStmt;
+
+ op = new DdlCreateIndexOperation(IgniteUuid.randomUuid(), ctx.localNodeId(), createIdx.index(),
+ createIdx.schemaName(), createIdx.tableName(), createIdx.ifNotExists());
+ }
+ else if (gridStmt instanceof GridSqlDropIndex)
+ throw new UnsupportedOperationException("DROP INDEX");
+ else
+ throw new IgniteSQLException("Unexpected DDL operation [type=" + gridStmt.getClass() + ']',
+ IgniteQueryErrorCode.UNEXPECTED_OPERATION);
+
+ GridFutureAdapter opFut = new GridFutureAdapter();
+
+ operations.put(op.operationId(), opFut);
+
+ try {
+ ctx.discovery().sendCustomEvent(new DdlInitDiscoveryMessage(op));
+
+ opFut.get();
+ }
+ finally {
+ operations.remove(op.operationId());
+ }
+
+ return DmlStatementsProcessor.cursorForUpdateResult(0L);
+ }
+
+ /**
+ * Serialize exception or at least its message to bytes.
+ *
+ * @param ex Exception.
+ * @return Serialized exception.
+ */
+ private byte[] exceptionToBytes(IgniteCheckedException ex) {
+ if (ex == null)
+ return null;
+
+ try {
+ return U.marshal(ctx, ex);
+ }
+ catch (IgniteCheckedException e) {
+ IgniteCheckedException resEx;
+
+ // Let's try to serialize at least the message
+ try {
+ resEx = new IgniteCheckedException("Failed to serialize exception " +
+ "[msg=" + ex.getMessage() + ']');
+ }
+ catch (Throwable ignored) {
+ resEx = new IgniteCheckedException("Failed to serialize exception");
+ }
+
+ try {
+ return U.marshal(ctx, resEx);
+ }
+ catch (IgniteCheckedException exx) {
+ // Why would it fail? We've sanitized it...
+ throw new AssertionError(exx);
+ }
+ }
+ }
+
+ /**
+ * Deserialize exception from bytes.
+ *
+ * @param ex Exception.
+ * @return Serialized exception.
+ */
+ private IgniteCheckedException bytesToException(byte[] ex) {
+ if (ex == null)
+ return null;
+
+ try {
+ return U.unmarshal(ctx, ex, U.resolveClassLoader(ctx.config()));
+ }
+ catch (Throwable e) {
+ return new IgniteCheckedException("Failed to deserialize exception", e);
+ }
+ }
+
+ /**
+ * @param cmd Statement.
+ * @return Whether {@code cmd} is a DDL statement we're able to handle.
+ */
+ public static boolean isDdlStatement(Prepared cmd) {
+ return cmd instanceof CreateIndex || cmd instanceof DropIndex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAbstractDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAbstractDiscoveryMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAbstractDiscoveryMessage.java
new file mode 100644
index 0000000..f9fd641
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAbstractDiscoveryMessage.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.ddl.msg;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.query.h2.ddl.DdlAbstractOperation;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Abstract discovery message.
+ */
+public abstract class DdlAbstractDiscoveryMessage implements DiscoveryCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** ID */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** Operation. */
+ private final DdlAbstractOperation op;
+
+ /**
+ * Constructor.
+ *
+ * @param op Operation.
+ */
+ protected DdlAbstractDiscoveryMessage(DdlAbstractOperation op) {
+ this.op = op;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /**
+ * @return Operation.
+ */
+ public DdlAbstractOperation operation() {
+ return op;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DdlAbstractDiscoveryMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAckDiscoveryMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAckDiscoveryMessage.java
new file mode 100644
index 0000000..f33cece
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlAckDiscoveryMessage.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.ddl.msg;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.query.h2.ddl.DdlAbstractOperation;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * {@code ACK} message - triggers actual execution of local portion of DDL operation.
+ */
+public class DdlAckDiscoveryMessage extends DdlAbstractDiscoveryMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Ids of participating nodes. */
+ private final Set<UUID> nodeIds;
+
+ /**
+ * Constructor.
+ *
+ * @param op Operation.
+ * @param nodeIds Ids of participating nodes.
+ */
+ public DdlAckDiscoveryMessage(DdlAbstractOperation op, Set<UUID> nodeIds) {
+ super(op);
+ this.nodeIds = nodeIds;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /**
+ * @return Ids of participating nodes.
+ */
+ public Set<UUID> nodeIds() {
+ return nodeIds;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DdlAckDiscoveryMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java
new file mode 100644
index 0000000..753eb0c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.ddl.msg;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.ContextAware;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.ddl.DdlAbstractOperation;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@code INIT} part of a distributed DDL operation.
+ */
+public class DdlInitDiscoveryMessage extends DdlAbstractDiscoveryMessage implements ContextAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Kernal context. */
+ @GridToStringExclude
+ private transient GridKernalContext ctx;
+
+ /**
+ * Map {@code node id} -> {@code init exception, if any}.
+ * If this field is null, then this message is being processed by coordinator.
+ * Note that this map not just helps to track errors but also contains node ids eligible for the operation
+ * filtered at coordinator, so its key set is important.
+ */
+ private Map<UUID, IgniteCheckedException> nodesState = new HashMap<>();
+
+ /**
+ * Constructor.
+ *
+ * @param op Operation.
+ */
+ public DdlInitDiscoveryMessage(DdlAbstractOperation op) {
+ super(op);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return ((IgniteH2Indexing)ctx.query().getIndexing()).getDdlStatementsProcessor().onInitFinished(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return true;
+ }
+
+ /**
+ * @return Nodes state.
+ */
+ public Map<UUID, IgniteCheckedException> nodeState() {
+ return nodesState;
+ }
+
+ /**
+ * @param nodesState Nodes state.
+ */
+ public void nodeState(Map<UUID, IgniteCheckedException> nodesState) {
+ this.nodesState = nodesState;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void context(GridKernalContext ctx) {
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DdlInitDiscoveryMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridCreateIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridCreateIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridCreateIndex.java
deleted file mode 100644
index 56c7dd3..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridCreateIndex.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.sql;
-
-import java.util.Map;
-import org.apache.ignite.cache.QueryIndex;
-import org.apache.ignite.cache.QueryIndexType;
-import org.h2.command.Parser;
-
-/**
- * CREATE INDEX statement.
- */
-public class GridCreateIndex extends GridSqlStatement {
- /** Schema name. */
- private String schemaName;
-
- /** Table name. */
- private String tblName;
-
- /** Attempt to create the index only if it does not exist. */
- private boolean ifNotExists;
-
- /** Index to create. */
- private QueryIndex idx;
-
- /**
- * @return Schema name for new index.
- */
- public String schemaName() {
- return schemaName;
- }
-
- /**
- * @param schemaName Schema name for new index.
- */
- public void schemaName(String schemaName) {
- this.schemaName = schemaName;
- }
-
- /**
- * @return Table name.
- */
- public String tableName() {
- return tblName;
- }
-
- /**
- * @param tblName Table name.
- */
- public void tableName(String tblName) {
- this.tblName = tblName;
- }
-
- /**
- * @return whether attempt to create the index should be made only if it does not exist.
- */
- public boolean ifNotExists() {
- return ifNotExists;
- }
-
- /**
- * @param ifNotExists whether attempt to create the index should be made only if it does not exist.
- */
- public void ifNotExists(boolean ifNotExists) {
- this.ifNotExists = ifNotExists;
- }
-
- /**
- * @return Index to create.
- */
- public QueryIndex index() {
- return idx;
- }
-
- /**
- * @param idx Index to create.
- */
- public void index(QueryIndex idx) {
- this.idx = idx;
- }
-
- /** {@inheritDoc} */
- @Override public String getSQL() {
- StringBuilder sb = new StringBuilder("CREATE ")
- .append(idx.getIndexType() == QueryIndexType.GEOSPATIAL ? "SPATIAL " : "")
- .append("INDEX ").append(ifNotExists ? "IF NOT EXISTS " : "")
- .append(Parser.quoteIdentifier(schemaName)).append('.')
- .append(Parser.quoteIdentifier(idx.getName())).append(" ON ")
- .append(Parser.quoteIdentifier(tblName)).append(" (");
-
- boolean first = true;
-
- for (Map.Entry<String, Boolean> e : idx.getFields().entrySet()) {
- if (first)
- first = false;
- else
- sb.append(", ");
-
- sb.append(Parser.quoteIdentifier(e.getKey())).append(e.getValue() ? " ASC" : " DESC");
- }
-
- sb.append(')');
-
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridDropIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridDropIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridDropIndex.java
deleted file mode 100644
index 07c39df..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridDropIndex.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.sql;
-
-import org.h2.command.Parser;
-
-/**
- * DROP INDEX statement.
- */
-public class GridDropIndex extends GridSqlStatement {
- /** Index name. */
- private String name;
-
- /** Schema name. */
- private String schemaName;
-
- /** Attempt to drop the index only if it exists. */
- private boolean ifExists;
-
- /**
- * @return Index name.
- */
- public String name() {
- return name;
- }
-
- /**
- * @param name Index name.
- */
- public void name(String name) {
- this.name = name;
- }
-
- /**
- * @return Schema name.
- */
- public String schemaName() {
- return schemaName;
- }
-
- /**
- * @param schemaName Schema name.
- */
- public void schemaName(String schemaName) {
- this.schemaName = schemaName;
- }
-
- /**
- * @return whether attempt to drop the index should be made only if it exists.
- */
- public boolean ifExists() {
- return ifExists;
- }
-
- /**
- * @param ifExists whether attempt to drop the index should be made only if it exists.
- */
- public void ifExists(boolean ifExists) {
- this.ifExists = ifExists;
- }
-
- /** {@inheritDoc} */
- @Override public String getSQL() {
- return "DROP INDEX " + (ifExists ? "IF EXISTS " : "") + Parser.quoteIdentifier(schemaName) + '.' +
- Parser.quoteIdentifier(name);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c05be524/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlCreateIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlCreateIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlCreateIndex.java
new file mode 100644
index 0000000..50d455c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlCreateIndex.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.sql;
+
+import java.util.Map;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.QueryIndexType;
+import org.h2.command.Parser;
+
+/**
+ * CREATE INDEX statement.
+ */
+public class GridSqlCreateIndex extends GridSqlStatement {
+ /** Schema name. */
+ private String schemaName;
+
+ /** Table name. */
+ private String tblName;
+
+ /** Attempt to create the index only if it does not exist. */
+ private boolean ifNotExists;
+
+ /** Index to create. */
+ private QueryIndex idx;
+
+ /**
+ * @return Schema name for new index.
+ */
+ public String schemaName() {
+ return schemaName;
+ }
+
+ /**
+ * @param schemaName Schema name for new index.
+ */
+ public void schemaName(String schemaName) {
+ this.schemaName = schemaName;
+ }
+
+ /**
+ * @return Table name.
+ */
+ public String tableName() {
+ return tblName;
+ }
+
+ /**
+ * @param tblName Table name.
+ */
+ public void tableName(String tblName) {
+ this.tblName = tblName;
+ }
+
+ /**
+ * @return whether attempt to create the index should be made only if it does not exist.
+ */
+ public boolean ifNotExists() {
+ return ifNotExists;
+ }
+
+ /**
+ * @param ifNotExists whether attempt to create the index should be made only if it does not exist.
+ */
+ public void ifNotExists(boolean ifNotExists) {
+ this.ifNotExists = ifNotExists;
+ }
+
+ /**
+ * @return Index to create.
+ */
+ public QueryIndex index() {
+ return idx;
+ }
+
+ /**
+ * @param idx Index to create.
+ */
+ public void index(QueryIndex idx) {
+ this.idx = idx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getSQL() {
+ StringBuilder sb = new StringBuilder("CREATE ")
+ .append(idx.getIndexType() == QueryIndexType.GEOSPATIAL ? "SPATIAL " : "")
+ .append("INDEX ").append(ifNotExists ? "IF NOT EXISTS " : "")
+ .append(Parser.quoteIdentifier(schemaName)).append('.')
+ .append(Parser.quoteIdentifier(idx.getName())).append(" ON ")
+ .append(Parser.quoteIdentifier(tblName)).append(" (");
+
+ boolean first = true;
+
+ for (Map.Entry<String, Boolean> e : idx.getFields().entrySet()) {
+ if (first)
+ first = false;
+ else
+ sb.append(", ");
+
+ sb.append(Parser.quoteIdentifier(e.getKey())).append(e.getValue() ? " ASC" : " DESC");
+ }
+
+ sb.append(')');
+
+ return sb.toString();
+ }
+}