You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/22 11:13:08 UTC
[22/23] ignite git commit: Cleaning up indexing module.
Cleaning up indexing module.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f39de03c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f39de03c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f39de03c
Branch: refs/heads/ignite-4565-ddl
Commit: f39de03c6d12ae01a77a162f83a6c95742cb8f27
Parents: b35568e
Author: devozerov <vo...@gridgain.com>
Authored: Wed Mar 22 13:04:18 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Mar 22 13:04:18 2017 +0300
----------------------------------------------------------------------
.../processors/query/h2/IgniteH2Indexing.java | 6 +-
.../query/h2/ddl/DdlStatementsProcessor.java | 466 ++-----------------
.../h2/ddl/msg/DdlInitDiscoveryMessage.java | 2 +-
.../query/h2/ddl/GridDdlProtoTest.java | 188 --------
4 files changed, 30 insertions(+), 632 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f39de03c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 07c0650..e61bfd6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1498,7 +1498,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (DdlStatementsProcessor.isDdlStatement(prepared)) {
try {
- return ddlProc.runDdlStatement(stmt);
+ return ddlProc.runDdlStatement(cctx.name(), stmt);
}
catch (IgniteCheckedException e) {
throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + sqlQry + ']',
@@ -2200,10 +2200,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
log.debug("Stopping cache query index...");
// unregisterMBean(); TODO https://issues.apache.org/jira/browse/IGNITE-2139
-
- if (ddlProc != null)
- ddlProc.stop();
-
for (Schema schema : schemas.values())
schema.onDrop();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f39de03c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
index 8bb831b..32e6da5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
@@ -17,50 +17,27 @@
package org.apache.ignite.internal.processors.query.h2.ddl;
-import java.sql.PreparedStatement;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.GridTopic;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
-import org.apache.ignite.internal.managers.discovery.CustomEventListener;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.ddl.DdlOperationNodeResult;
-import org.apache.ignite.internal.processors.query.ddl.DdlOperationResult;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.processors.query.h2.ddl.msg.DdlAckDiscoveryMessage;
-import org.apache.ignite.internal.processors.query.h2.ddl.msg.DdlInitDiscoveryMessage;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlCreateIndex;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDropIndex;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.thread.IgniteThread;
import org.h2.command.Prepared;
import org.h2.command.ddl.CreateIndex;
import org.h2.command.ddl.DropIndex;
import org.h2.jdbc.JdbcPreparedStatement;
-import org.jetbrains.annotations.Nullable;
+
+import java.sql.PreparedStatement;
+import java.util.Collections;
+import java.util.List;
import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META;
@@ -75,15 +52,6 @@ public class DdlStatementsProcessor {
/** Logger. */
private IgniteLogger log;
- /** State flag. */
- private AtomicBoolean isStopped = new AtomicBoolean();
-
- /** Running operations originating at this node as a client. */
- private Map<IgniteUuid, GridFutureAdapter> operations = new ConcurrentHashMap<>();
-
- /** Worker. */
- private volatile DdlWorker worker;
-
/**
* Initialize message handlers and this' fields needed for further operation.
*
@@ -94,378 +62,49 @@ public class DdlStatementsProcessor {
this.ctx = ctx;
log = ctx.log(DdlStatementsProcessor.class);
-
- worker = new DdlWorker(ctx.igniteInstanceName(), log);
-
- IgniteThread workerThread = new IgniteThread(worker);
-
- workerThread.setDaemon(true);
-
- workerThread.start();
-
- ctx.discovery().setCustomEventListener(DdlInitDiscoveryMessage.class,
- new CustomEventListener<DdlInitDiscoveryMessage>() {
- /** {@inheritDoc} */
- @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
- @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd,
- DdlInitDiscoveryMessage msg) {
- onInit(msg);
- }
- });
-
- ctx.discovery().setCustomEventListener(DdlAckDiscoveryMessage.class,
- new CustomEventListener<DdlAckDiscoveryMessage>() {
- /** {@inheritDoc} */
- @Override public void onCustomEvent(AffinityTopologyVersion topVer, final ClusterNode snd,
- final DdlAckDiscoveryMessage msg) {
- submitTask(new DdlTask() {
- @Override public void run() {
- onAck(snd, msg);
- }
- });
- }
- });
-
- ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
- /** {@inheritDoc} */
- @Override public void onMessage(UUID nodeId, Object msg) {
- if (msg instanceof DdlOperationResult) {
- DdlOperationResult res = (DdlOperationResult) msg;
-
- onResult(res.getOperationId(), bytesToException(res.getError()));
- }
-
- if (msg instanceof DdlOperationNodeResult) {
- DdlOperationNodeResult res = (DdlOperationNodeResult) msg;
-
- onNodeResult(res.getOperationId(), bytesToException(res.getError()));
- }
- }
- });
- }
-
- /**
- * Submit a task to {@link #worker} for async execution.
- *
- * @param task Task.
- */
- private void submitTask(DdlTask task) {
- DdlWorker worker0 = worker;
-
- if (worker0 != null)
- worker0.submit(task);
- else
- log.debug("Cannot submit DDL task because worker is null (node is stopping): " + task);
- }
-
- /**
- * Handle {@code ACK} message on a <b>peer node</b> - do local portion of actual DDL job and notify
- * <b>coordinator</b> about success or failure.
- *
- * @param snd Sender.
- * @param msg Message.
- */
- @SuppressWarnings({"ThrowableInstanceNeverThrown", "unchecked"})
- private void onAck(ClusterNode snd, DdlAckDiscoveryMessage msg) {
- // Don't do anything if we didn't choose to participate.
- if (!msg.nodeIds().contains(ctx.localNodeId()))
- return;
-
- IgniteCheckedException ex = null;
-
- DdlAbstractOperation args = msg.operation();
-
- try {
- doAck(args);
- }
- catch (Throwable e) {
- ex = wrapThrowableIfNeeded(e);
- }
-
- try {
- DdlOperationNodeResult res = new DdlOperationNodeResult();
-
- res.setOperationId(msg.operation().operationId());
- res.setError(exceptionToBytes(ex));
-
- ctx.io().sendToGridTopic(snd, GridTopic.TOPIC_QUERY, res, GridIoPolicy.IDX_POOL);
- }
- catch (Throwable e) {
- U.error(log, "Failed to notify coordinator about local DLL operation completion [opId=" +
- msg.operation().operationId() + ", clientNodeId=" + snd.id() + ']', e);
- }
- }
-
- /**
- * Perform local portion of DDL operation.
- * Exists as a separate method to allow overriding it in tests to check behavior in case of errors.
- *
- * @param args Operation arguments.
- * @throws IgniteCheckedException if failed.
- */
- @SuppressWarnings("unchecked")
- void doAck(DdlAbstractOperation args) throws IgniteCheckedException {
- if (args instanceof DdlCreateIndexOperation) {
- // No-op.
- }
- }
-
- /**
- * Handle local DDL operation result from <b>a peer node</b> on <b>the coordinator</b>.
- *
- * @param opId DDL operation ID.
- * @param err Exception that occurred on the <b>peer</b>, or null if the local operation has been successful.
- */
- @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "SynchronizationOnLocalVariableOrMethodParameter", "ForLoopReplaceableByForEach"})
- private void onNodeResult(IgniteUuid opId, IgniteCheckedException err) {
- // No-op.
- }
-
- /**
- * Process result of executing {@link DdlInitDiscoveryMessage} and react accordingly.
- * Called from {@link DdlInitDiscoveryMessage#ackMessage()}.
- *
- * @param msg {@link DdlInitDiscoveryMessage} message.
- * @return {@link DiscoveryCustomMessage} to return from {@link DdlInitDiscoveryMessage#ackMessage()}.
- */
- @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "UnnecessaryInitCause"})
- public DiscoveryCustomMessage onInitFinished(DdlInitDiscoveryMessage msg) {
- Map<UUID, IgniteCheckedException> nodesState = msg.nodeState();
-
- assert nodesState != null;
-
- Map<UUID, IgniteCheckedException> errors = new HashMap<>();
-
- for (Map.Entry<UUID, IgniteCheckedException> e : nodesState.entrySet())
- if (e.getValue() != null)
- errors.put(e.getKey(), e.getValue());
-
- if (!errors.isEmpty()) {
- IgniteCheckedException resEx = new IgniteCheckedException("DDL operation has been cancelled at INIT stage");
-
- if (errors.size() > 1) {
- for (IgniteCheckedException e : errors.values())
- resEx.addSuppressed(e);
- }
- else
- resEx.initCause(errors.values().iterator().next());
-
- sendResult(msg.operation(), resEx);
-
- return null;
- }
- else
- return new DdlAckDiscoveryMessage(msg.operation(), msg.nodeState().keySet());
- }
-
- /**
- * Notify client about result.
- *
- * @param args Operation arguments.
- * @param err Error, if any.
- */
- private void sendResult(DdlAbstractOperation args, IgniteCheckedException err) {
- assert args != null;
-
- DdlOperationResult res = new DdlOperationResult();
-
- res.setError(exceptionToBytes(err));
- res.setOperationId(args.operationId());
-
- try {
- ctx.io().sendToGridTopic(args.clientNodeId(), GridTopic.TOPIC_QUERY, res, GridIoPolicy.IDX_POOL);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to notify client node about DDL operation failure " +
- "[opId=" + args.operationId() + ", clientNodeId=" + args.clientNodeId() + ']', e);
- }
- }
-
- /**
- * Callback handling whole DDL operation result <b>on the client</b>.
- *
- * @param opId DDL operation ID.
- * @param err Error, if any.
- */
- @SuppressWarnings("unchecked")
- private void onResult(IgniteUuid opId, IgniteCheckedException err) {
- GridFutureAdapter fut = operations.get(opId);
-
- if (fut == null) {
- U.warn(log, "DDL operation not found at its client [opId=" + opId + ", nodeId=" + ctx.localNodeId() + ']');
-
- return;
- }
-
- fut.onDone(null, err);
- }
-
- /**
- * Perform preliminary actions and checks for {@code INIT} stage of DDL statement execution <b>on a peer node</b>.
- *
- * @param msg {@code INIT} message.
- */
- @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
- private void onInit(DdlInitDiscoveryMessage msg) {
- try {
- // Let's tell everyone that we're participating if our init is successful...
- if (doInit(msg.operation()))
- msg.nodeState().put(ctx.localNodeId(), null);
- }
- catch (Throwable e) {
- // Or tell everyone about the error that occurred
- msg.nodeState().put(ctx.localNodeId(), wrapThrowableIfNeeded(e));
- }
- }
-
- /**
- * Perform actual INIT actions.
- * Exists as a separate method to allow overriding it in tests to check behavior in case of errors.
- *
- * @param args Operation arguments.
- * @throws IgniteCheckedException if failed.
- */
- @SuppressWarnings("unchecked")
- boolean doInit(DdlAbstractOperation args) throws IgniteCheckedException {
- if (args instanceof DdlCreateIndexOperation) {
- return true;
- }
-
- return false;
- }
-
- /**
- * Optionally wrap a {@link Throwable} into an {@link IgniteCheckedException}.
- *
- * @param e Throwable to wrap.
- * @return {@code e} if it's an {@link IgniteCheckedException} or an {@link IgniteCheckedException} wrapping it.
- */
- private static IgniteCheckedException wrapThrowableIfNeeded(Throwable e) {
- if (e instanceof IgniteCheckedException)
- return (IgniteCheckedException) e;
- else
- return new IgniteCheckedException(e);
- }
-
- /**
- * Do cleanup.
- */
- public void stop() throws IgniteCheckedException {
- if (!isStopped.compareAndSet(false, true))
- throw new IgniteCheckedException(new IllegalStateException("DDL processor has been stopped already"));
-
- DdlWorker worker0 = worker;
-
- if (worker0 != null) {
- worker0.cancel();
-
- worker = null;
- }
-
- for (Map.Entry<IgniteUuid, GridFutureAdapter> e : operations.entrySet())
- e.getValue().onDone(new IgniteCheckedException("Operation has been cancelled [opId=" + e.getKey() +']'));
}
/**
* Execute DDL statement.
*
+ * @param cacheName Cache name.
* @param stmt H2 statement to parse and execute.
*/
@SuppressWarnings("unchecked")
- public QueryCursor<List<?>> runDdlStatement(PreparedStatement stmt)
+ public QueryCursor<List<?>> runDdlStatement(String cacheName, PreparedStatement stmt)
throws IgniteCheckedException {
- if (isStopped.get())
- throw new IgniteCheckedException(new IllegalStateException("DDL processor has been stopped"));
-
assert stmt instanceof JdbcPreparedStatement;
- GridSqlStatement gridStmt = new GridSqlQueryParser(false).parse(GridSqlQueryParser.prepared(stmt));
-
- DdlAbstractOperation op;
-
- if (gridStmt instanceof GridSqlCreateIndex) {
- GridSqlCreateIndex createIdx = (GridSqlCreateIndex) gridStmt;
-
- op = new DdlCreateIndexOperation(IgniteUuid.randomUuid(), ctx.localNodeId(), createIdx.index(),
- createIdx.schemaName(), createIdx.tableName(), createIdx.ifNotExists());
- }
- else if (gridStmt instanceof GridSqlDropIndex)
- throw new UnsupportedOperationException("DROP INDEX");
- else
- throw new IgniteSQLException("Unexpected DDL operation [type=" + gridStmt.getClass() + ']',
- IgniteQueryErrorCode.UNEXPECTED_OPERATION);
-
- GridFutureAdapter opFut = new GridFutureAdapter();
-
- operations.put(op.operationId(), opFut);
+ IgniteInternalFuture fut;
try {
- ctx.discovery().sendCustomEvent(new DdlInitDiscoveryMessage(op));
+ GridSqlStatement gridStmt = new GridSqlQueryParser(false).parse(GridSqlQueryParser.prepared(stmt));
- opFut.get();
- }
- finally {
- operations.remove(op.operationId());
- }
+ if (gridStmt instanceof GridSqlCreateIndex) {
+ GridSqlCreateIndex createIdx = (GridSqlCreateIndex) gridStmt;
- QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
- (Collections.singletonList(0L)), null, false);
-
- resCur.fieldsMeta(UPDATE_RESULT_META);
-
- return resCur;
- }
-
- /**
- * Serialize exception or at least its message to bytes.
- *
- * @param ex Exception.
- * @return Serialized exception.
- */
- private byte[] exceptionToBytes(IgniteCheckedException ex) {
- if (ex == null)
- return null;
-
- try {
- return U.marshal(ctx, ex);
- }
- catch (IgniteCheckedException e) {
- IgniteCheckedException resEx;
-
- // Let's try to serialize at least the message
- try {
- resEx = new IgniteCheckedException("Failed to serialize exception " +
- "[msg=" + ex.getMessage() + ']');
- }
- catch (Throwable ignored) {
- resEx = new IgniteCheckedException("Failed to serialize exception");
+ // TODO: How to handle schema name properly?
+ fut = ctx.cache().dynamicIndexCreate(
+ cacheName, createIdx.tableName(), createIdx.index(), createIdx.ifNotExists());
}
+ else if (gridStmt instanceof GridSqlDropIndex)
+ throw new UnsupportedOperationException("DROP INDEX");
+ else
+ throw new IgniteSQLException("Unexpected DDL operation [type=" + gridStmt.getClass() + ']',
+ IgniteQueryErrorCode.UNEXPECTED_OPERATION);
- try {
- return U.marshal(ctx, resEx);
- }
- catch (IgniteCheckedException exx) {
- // Why would it fail? We've sanitized it...
- throw new AssertionError(exx);
- }
- }
- }
+ fut.get();
- /**
- * Deserialize exception from bytes.
- *
- * @param ex Exception.
- * @return Serialized exception.
- */
- private IgniteCheckedException bytesToException(byte[] ex) {
- if (ex == null)
- return null;
+ QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
+ (Collections.singletonList(0L)), null, false);
- try {
- return U.unmarshal(ctx, ex, U.resolveClassLoader(ctx.config()));
+ resCur.fieldsMeta(UPDATE_RESULT_META);
+
+ return resCur;
}
- catch (Throwable e) {
- return new IgniteCheckedException("Failed to deserialize exception", e);
+ catch (Exception e) {
+ // TODO: Proper error handling.
+ throw new IgniteSQLException("DLL operation failed.", e);
}
}
@@ -476,53 +115,4 @@ public class DdlStatementsProcessor {
public static boolean isDdlStatement(Prepared cmd) {
return cmd instanceof CreateIndex || cmd instanceof DropIndex;
}
-
- /**
- * DDL worker.
- */
- private class DdlWorker extends GridWorker {
- /** Worker queue. */
- private final BlockingQueue<DdlTask> queue = new LinkedBlockingDeque<>();
-
- /**
- * Constructor.
- *
- * @param gridName Gird name.
- * @param log Logger.
- */
- public DdlWorker(@Nullable String gridName, IgniteLogger log) {
- super(gridName, "indexing-ddl-worker", log);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- while (!isCancelled()) {
- DdlTask task = queue.take();
-
- try {
- task.run();
- }
- catch (Exception e) {
- U.error(log, "Unexpected exception during DDL task processing [task=" + task + ']', e);
- }
- catch (Throwable t) {
- U.error(log, "Unexpected error during DDL task processing (worker will be stopped) [task=" +
- task + ']', t);
-
- throw t;
- }
- }
- }
-
- /**
- * Submit task.
- *
- * @param task Task.
- */
- public void submit(DdlTask task) {
- assert task != null;
-
- queue.add(task);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f39de03c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java
index 753eb0c..4e00d54 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java
@@ -61,7 +61,7 @@ public class DdlInitDiscoveryMessage extends DdlAbstractDiscoveryMessage impleme
/** {@inheritDoc} */
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Nullable @Override public DiscoveryCustomMessage ackMessage() {
- return ((IgniteH2Indexing)ctx.query().getIndexing()).getDdlStatementsProcessor().onInitFinished(this);
+ return null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/f39de03c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ddl/GridDdlProtoTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ddl/GridDdlProtoTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ddl/GridDdlProtoTest.java
deleted file mode 100644
index d507dff..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ddl/GridDdlProtoTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.ddl;
-
-import java.io.Serializable;
-import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.cache.query.annotations.QuerySqlField;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.processors.query.h2.ddl.msg.DdlInitDiscoveryMessage;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- *
- */
-public class GridDdlProtoTest extends GridCommonAbstractTest {
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override protected void beforeTestsStarted() throws Exception {
- IgniteH2Indexing.ddlProcCls = DdlProc.class;
-
- startGridsMultiThreaded(3, true);
-
- ignite(0).createCache(cacheConfig("S2P", true, false).setIndexedTypes(String.class, Person.class));
-
- startGrid(getTestIgniteInstanceName(3), getConfiguration().setClientMode(true));
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- DdlProc.testName = null;
-
- ignite(0).cache("S2P").clear();
-
- ignite(0).cache("S2P").put("FirstKey", new Person(1, "John", "White"));
- ignite(0).cache("S2P").put("SecondKey", new Person(2, "Joe", "Black"));
- ignite(0).cache("S2P").put("k3", new Person(3, "Sylvia", "Green"));
- ignite(0).cache("S2P").put("f0u4thk3y", new Person(4, "Jane", "Silver"));
- }
-
- /** Test behavior in case of INIT failure (cancel via {@link DdlInitDiscoveryMessage#ackMessage}). */
- public void testInitFailure() {
- DdlProc.testName = GridTestUtils.getGridTestName();
-
- assertCreateIndexThrowsWithMessage("DDL operation has been cancelled at INIT stage", false);
- }
-
- /**
- * Test error handling.
- *
- * @param msg Expected message.
- * @param loc Run query locally on single node.
- */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- private void assertCreateIndexThrowsWithMessage(String msg, final boolean loc) {
- final Throwable e = GridTestUtils.assertThrows(null, new Callable<Object>() {
- /** {@inheritDoc} */
- @Override public Object call() throws Exception {
- ignite(3).cache("S2P").query(new SqlFieldsQuery("create index idx on Person(id desc)").setLocal(loc));
- return null;
- }
- }, IgniteSQLException.class, "Failed to execute DDL statement");
-
- GridTestUtils.assertThrows(null, new Callable<Object>() {
- @Override public Object call() throws Exception {
- throw (Exception) e.getCause();
- }
- }, IgniteCheckedException.class, msg);
- }
-
- /**
- * @param name Cache name.
- * @param partitioned Partition or replicated cache.
- * @param escapeSql whether identifiers should be quoted - see {@link CacheConfiguration#setSqlEscapeAll}
- * @return Cache configuration.
- */
- protected static CacheConfiguration cacheConfig(String name, boolean partitioned, boolean escapeSql) {
- return new CacheConfiguration()
- .setName(name)
- .setCacheMode(partitioned ? CacheMode.PARTITIONED : CacheMode.REPLICATED)
- .setAtomicityMode(CacheAtomicityMode.ATOMIC)
- .setBackups(1)
- .setSqlEscapeAll(escapeSql);
- }
-
- /**
- *
- */
- static class Person implements Serializable {
- /** */
- public Person(int id, String name, String secondName) {
- this.id = id;
- this.name = name;
- this.secondName = secondName;
- }
-
- /** */
- @QuerySqlField
- protected int id;
-
- /** */
- @QuerySqlField(name = "firstName")
- protected final String name;
-
- /** */
- @QuerySqlField
- final String secondName;
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- Person person = (Person) o;
-
- return id == person.id && name.equals(person.name) && secondName.equals(person.secondName);
-
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = id;
- res = 31 * res + name.hashCode();
- res = 31 * res + secondName.hashCode();
- return res;
- }
- }
-
- /**
- * Custom implementation to test behavior on failure during various stages.
- */
- public final static class DdlProc extends DdlStatementsProcessor {
- /** Name of current test. */
- private static volatile String testName;
-
- /** {@inheritDoc} */
- @Override boolean doInit(DdlAbstractOperation args) {
- // Let's throw an exception on a single node in the ring
- if ("InitFailure".equals(testName) && ctx.igniteInstanceName().endsWith("2"))
- throw new RuntimeException("Hello from DdlProc Init");
- else
- try {
- return super.doInit(args);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
-
- /** {@inheritDoc}
- * @param args*/
- @Override void doAck(DdlAbstractOperation args) {
- if ("AckFailure".equals(testName) && ctx.igniteInstanceName().endsWith("1"))
- throw new RuntimeException("Hello from DdlProc Ack");
- else
- try {
- super.doInit(args);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }
-}