You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/20 12:49:06 UTC
[1/2] ignite git commit: WIP on operation handler.
Repository: ignite
Updated Branches:
refs/heads/ignite-4565-ddl 2b7c1a2c9 -> 1b2a3dedb
WIP on operation handler.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee37450a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee37450a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee37450a
Branch: refs/heads/ignite-4565-ddl
Commit: ee37450a69634f29586b5f7e897c6a41322a602f
Parents: 2b7c1a2
Author: devozerov <vo...@gridgain.com>
Authored: Mon Mar 20 15:34:35 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Mar 20 15:34:35 2017 +0300
----------------------------------------------------------------------
.../processors/query/GridQueryProcessor.java | 76 +++++----
.../query/ddl/IndexOperationHandler.java | 161 +++++++++++++++++++
2 files changed, 197 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee37450a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 1bb3c1c..7fe83ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -222,8 +222,17 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (initIdxStates != null) {
Map<String, QueryIndexState> readyIdxStates = initIdxStates.readyOperations();
- for (QueryTypeCandidate cand : cands)
- applyReadyDynamicOperations(cand.descriptor(), readyIdxStates);
+ for (QueryTypeCandidate cand : cands) {
+ QueryTypeDescriptorImpl desc = cand.descriptor();
+
+ for (Map.Entry<String, QueryIndexState> entry : readyIdxStates.entrySet()) {
+ String idxName = entry.getKey();
+ QueryIndexState idxState = entry.getValue();
+
+ if (F.eq(desc.tableName(), idxState.tableName()))
+ QueryUtils.processDynamicIndexChange(idxName, idxState.index(), desc);
+ }
+ }
}
// Ready to register at this point.
@@ -245,19 +254,19 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
- * Apply ready dynamic index states to not-yet-registered descriptor.
+ * Find current coordinator.
*
- * @param desc Descriptor.
- * @param idxStates Index states.
+ * @return {@code True} if node is coordinator.
*/
- private void applyReadyDynamicOperations(QueryTypeDescriptorImpl desc, Map<String, QueryIndexState> idxStates)
- throws IgniteCheckedException {
- for (Map.Entry<String, QueryIndexState> entry : idxStates.entrySet()) {
- String idxName = entry.getKey();
- QueryIndexState idxState = entry.getValue();
+ private ClusterNode findCoordinator() {
+ ClusterNode res = null;
- QueryUtils.processDynamicIndexChange(idxName, idxState.index(), desc);
+ for (ClusterNode node : ctx.discovery().aliveServerNodes()) {
+ if (res == null || res.order() > node.order())
+ res = node;
}
+
+ return res;
}
/** {@inheritDoc} */
@@ -308,6 +317,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Handle cache start. Invoked either from GridCacheProcessor.onKernalStart() method or from exchange worker.
+ * When called for the first time, we initialize topology thus understanding whether current node is coordinator
+ * or not.
+ *
* @param cctx Cache context.
* @param idxStates Index states.
* @throws IgniteCheckedException If failed.
@@ -353,33 +366,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
*/
public void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) {
idxWorker.onAccept(msg);
- }
-
- /**
- * Handle index accept message.
- *
- * @param msg Message.
- */
- public void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
- idxWorker.onFinish(msg);
- }
-
- /**
- * Handle node leave.
- *
- * @param node Node.
- */
- public void onNodeLeave(ClusterNode node) {
- // TODO.
- }
- /**
- * Handle index init discovery message.
- *
- * @param space Space.
- * @param op Operation.
- */
- public void onIndexAccept(String space, AbstractIndexOperation op) {
idxLock.writeLock().lock();
// TODO
@@ -456,12 +443,21 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
- * Handle index ack discovery message.
+ * Handle index accept message.
*
* @param msg Message.
*/
- private void onIndexAckDiscoveryMessage(String space, IndexAcceptDiscoveryMessage msg) {
- // TODO
+ public void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
+ idxWorker.onFinish(msg);
+ }
+
+ /**
+ * Handle node leave.
+ *
+ * @param node Node.
+ */
+ public void onNodeLeave(ClusterNode node) {
+ // TODO.
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee37450a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
new file mode 100644
index 0000000..6932724
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Index change handler.
+ */
+public class IndexOperationHandler {
+ /** Kernal context. */
+ private final GridKernalContext ctx;
+
+ /** Query processor */
+ private final GridQueryProcessor qryProc;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Target operation. */
+ private final AbstractIndexOperation op;
+
+ /** Operation future. */
+ private final GridFutureAdapter opFut;
+
+ /** Mutex for concurrent access. */
+ private final Object mux = new Object();
+
+ /** Init flag. */
+ private boolean init;
+
+ /** Cancel flag. */
+ private boolean cancel;
+
+ /** Worker. */
+ private IndexWorker worker;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Context.
+ * @param qryProc Query processor.
+ * @param op Target operation.
+ */
+ public IndexOperationHandler(GridKernalContext ctx, GridQueryProcessor qryProc, AbstractIndexOperation op) {
+ this.ctx = ctx;
+ this.qryProc = qryProc;
+ this.op = op;
+
+ log = ctx.log(IndexOperationHandler.class);
+ opFut = new GridFutureAdapter();
+ }
+
+ /**
+ * Perform initialization routine.
+ */
+ public void init() {
+ synchronized (mux) {
+ if (!init) {
+ init = true;
+
+ if (!cancel) {
+ worker = new IndexWorker(ctx.igniteInstanceName(), workerName(), log);
+
+ new IgniteThread(worker).start();
+
+ worker.awaitStart();
+ }
+ }
+ }
+ }
+
+ /**
+ * @return Worker name.
+ */
+ private String workerName() {
+ return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName();
+ }
+
+ /**
+ * Cancel operation.
+ */
+ public void cancel() {
+ synchronized (mux) {
+ if (!cancel) {
+ cancel = true;
+
+ if (worker != null)
+ worker.cancel();
+ }
+
+ // TODO
+ }
+ }
+
+ /**
+ * Single-shot index worker responsible for operation execution.
+ */
+ private class IndexWorker extends GridWorker {
+ /** Worker start latch. */
+ private final CountDownLatch startLatch = new CountDownLatch(1);
+
+ /**
+ * Constructor.
+ *
+ * @param igniteInstanceName Ignite instance name.
+ * @param name Worker name.
+ * @param log Logger.
+ */
+ public IndexWorker(@Nullable String igniteInstanceName, String name, IgniteLogger log) {
+ super(igniteInstanceName, name, log);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ startLatch.countDown();
+
+ // TODO: Do actual create/drop.
+ }
+
+ /**
+ * Await start.
+ */
+ private void awaitStart() {
+ try {
+ startLatch.await();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedException("Interrupted while waiting index operation worker start: " +
+ name(), e);
+ }
+ }
+ }
+}
[2/2] ignite git commit: Added operation handler.
Posted by vo...@apache.org.
Added operation handler.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b2a3ded
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b2a3ded
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b2a3ded
Branch: refs/heads/ignite-4565-ddl
Commit: 1b2a3dedb43f4950e3ac67204b717bc44460d28c
Parents: ee37450
Author: devozerov <vo...@gridgain.com>
Authored: Mon Mar 20 15:48:56 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Mar 20 15:48:56 2017 +0300
----------------------------------------------------------------------
.../processors/query/GridQueryProcessor.java | 11 ++++
.../ddl/IndexAbstractDiscoveryMessage.java | 17 -------
.../ddl/IndexOperationCancellationToken.java | 53 ++++++++++++++++++++
.../query/ddl/IndexOperationHandler.java | 37 +++++++-------
4 files changed, 84 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 7fe83ab..517a30d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexOperationCancellationToken;
import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
import org.apache.ignite.internal.processors.query.ddl.task.IndexingAcceptTask;
import org.apache.ignite.internal.processors.query.ddl.task.IndexingCacheStartTask;
@@ -461,6 +462,16 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Process index operation.
+ *
+ * @param op Operation.
+ * @param cancelToken Cancel token.
+ */
+ public void processIndexOperation(AbstractIndexOperation op, IndexOperationCancellationToken cancelToken) {
+ // TODO.
+ }
+
+ /**
* Register cache in indexing SPI.
*
* @param space Space.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
index 11d8f93..3de525b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
@@ -41,9 +41,6 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe
/** Whether request must be propagated to exchange worker for final processing. */
private transient boolean exchange;
- /** Local cache index state at the moment of message receive. */
- private transient QueryIndexStates idxStates;
-
/**
* Constructor.
*
@@ -66,20 +63,6 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe
}
/**
- * @return Index states.
- */
- @Nullable public QueryIndexStates indexStates() {
- return idxStates;
- }
-
- /**
- * @param idxStates Index states.
- */
- public void indexStates(QueryIndexStates idxStates) {
- this.idxStates = idxStates;
- }
-
- /**
* @return Whether request must be propagated to exchange worker for final processing.
*/
public boolean exchange() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java
new file mode 100644
index 0000000..e8b2c2b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Index operation cancellation token.
+ */
+public class IndexOperationCancellationToken {
+ /** Cancel flag. */
+ private final AtomicBoolean flag = new AtomicBoolean();
+
+ /**
+ * Get cancel state.
+ *
+ * @return {@code True} if cancelled.
+ */
+ public boolean isCancelled() {
+ return flag.get();
+ }
+
+ /**
+ * Do cancel.
+ *
+ * @return {@code True} if cancel flag was set by this call.
+ */
+ public boolean cancel() {
+ return flag.compareAndSet(false, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IndexOperationCancellationToken.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
index 6932724..116b613 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
@@ -51,12 +51,12 @@ public class IndexOperationHandler {
/** Mutex for concurrent access. */
private final Object mux = new Object();
+ /** Cancellation token. */
+ private final IndexOperationCancellationToken cancelToken = new IndexOperationCancellationToken();
+
/** Init flag. */
private boolean init;
- /** Cancel flag. */
- private boolean cancel;
-
/** Worker. */
private IndexWorker worker;
@@ -84,7 +84,7 @@ public class IndexOperationHandler {
if (!init) {
init = true;
- if (!cancel) {
+ if (!cancelToken.isCancelled()) {
worker = new IndexWorker(ctx.igniteInstanceName(), workerName(), log);
new IgniteThread(worker).start();
@@ -96,29 +96,25 @@ public class IndexOperationHandler {
}
/**
- * @return Worker name.
- */
- private String workerName() {
- return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName();
- }
-
- /**
* Cancel operation.
*/
public void cancel() {
synchronized (mux) {
- if (!cancel) {
- cancel = true;
-
+ if (!cancelToken.cancel()) {
if (worker != null)
worker.cancel();
}
-
- // TODO
}
}
/**
+ * @return Worker name.
+ */
+ private String workerName() {
+ return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName();
+ }
+
+ /**
* Single-shot index worker responsible for operation execution.
*/
private class IndexWorker extends GridWorker {
@@ -140,7 +136,14 @@ public class IndexOperationHandler {
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
startLatch.countDown();
- // TODO: Do actual create/drop.
+ try {
+ qryProc.processIndexOperation(op, cancelToken);
+
+ opFut.onDone();
+ }
+ catch (Exception e) {
+ opFut.onDone(e);
+ }
}
/**