You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/20 12:49:07 UTC
[2/2] ignite git commit: Added operation handler.
Added operation handler.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b2a3ded
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b2a3ded
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b2a3ded
Branch: refs/heads/ignite-4565-ddl
Commit: 1b2a3dedb43f4950e3ac67204b717bc44460d28c
Parents: ee37450
Author: devozerov <vo...@gridgain.com>
Authored: Mon Mar 20 15:48:56 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Mar 20 15:48:56 2017 +0300
----------------------------------------------------------------------
.../processors/query/GridQueryProcessor.java | 11 ++++
.../ddl/IndexAbstractDiscoveryMessage.java | 17 -------
.../ddl/IndexOperationCancellationToken.java | 53 ++++++++++++++++++++
.../query/ddl/IndexOperationHandler.java | 37 +++++++-------
4 files changed, 84 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 7fe83ab..517a30d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexOperationCancellationToken;
import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
import org.apache.ignite.internal.processors.query.ddl.task.IndexingAcceptTask;
import org.apache.ignite.internal.processors.query.ddl.task.IndexingCacheStartTask;
@@ -461,6 +462,16 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Process index operation.
+ *
+ * @param op Operation.
+ * @param cancelToken Cancel token.
+ */
+ public void processIndexOperation(AbstractIndexOperation op, IndexOperationCancellationToken cancelToken) {
+ // TODO.
+ }
+
+ /**
* Register cache in indexing SPI.
*
* @param space Space.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
index 11d8f93..3de525b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
@@ -41,9 +41,6 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe
/** Whether request must be propagated to exchange worker for final processing. */
private transient boolean exchange;
- /** Local cache index state at the moment of message receive. */
- private transient QueryIndexStates idxStates;
-
/**
* Constructor.
*
@@ -66,20 +63,6 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe
}
/**
- * @return Index states.
- */
- @Nullable public QueryIndexStates indexStates() {
- return idxStates;
- }
-
- /**
- * @param idxStates Index states.
- */
- public void indexStates(QueryIndexStates idxStates) {
- this.idxStates = idxStates;
- }
-
- /**
* @return Whether request must be propagated to exchange worker for final processing.
*/
public boolean exchange() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java
new file mode 100644
index 0000000..e8b2c2b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Index operation cancellation token.
+ */
+public class IndexOperationCancellationToken {
+ /** Cancel flag. */
+ private final AtomicBoolean flag = new AtomicBoolean();
+
+ /**
+ * Get cancel state.
+ *
+ * @return {@code True} if cancelled.
+ */
+ public boolean isCancelled() {
+ return flag.get();
+ }
+
+ /**
+ * Do cancel.
+ *
+ * @return {@code True} if cancel flag was set by this call.
+ */
+ public boolean cancel() {
+ return flag.compareAndSet(false, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IndexOperationCancellationToken.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
index 6932724..116b613 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
@@ -51,12 +51,12 @@ public class IndexOperationHandler {
/** Mutex for concurrent access. */
private final Object mux = new Object();
+ /** Cancellation token. */
+ private final IndexOperationCancellationToken cancelToken = new IndexOperationCancellationToken();
+
/** Init flag. */
private boolean init;
- /** Cancel flag. */
- private boolean cancel;
-
/** Worker. */
private IndexWorker worker;
@@ -84,7 +84,7 @@ public class IndexOperationHandler {
if (!init) {
init = true;
- if (!cancel) {
+ if (!cancelToken.isCancelled()) {
worker = new IndexWorker(ctx.igniteInstanceName(), workerName(), log);
new IgniteThread(worker).start();
@@ -96,29 +96,25 @@ public class IndexOperationHandler {
}
/**
- * @return Worker name.
- */
- private String workerName() {
- return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName();
- }
-
- /**
* Cancel operation.
*/
public void cancel() {
synchronized (mux) {
- if (!cancel) {
- cancel = true;
-
+ if (!cancelToken.cancel()) {
if (worker != null)
worker.cancel();
}
-
- // TODO
}
}
/**
+ * @return Worker name.
+ */
+ private String workerName() {
+ return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName();
+ }
+
+ /**
* Single-shot index worker responsible for operation execution.
*/
private class IndexWorker extends GridWorker {
@@ -140,7 +136,14 @@ public class IndexOperationHandler {
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
startLatch.countDown();
- // TODO: Do actual create/drop.
+ try {
+ qryProc.processIndexOperation(op, cancelToken);
+
+ opFut.onDone();
+ }
+ catch (Exception e) {
+ opFut.onDone(e);
+ }
}
/**