You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/20 12:49:06 UTC

[1/2] ignite git commit: WIP on operation handler.

Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl 2b7c1a2c9 -> 1b2a3dedb


WIP on operation handler.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee37450a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee37450a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee37450a

Branch: refs/heads/ignite-4565-ddl
Commit: ee37450a69634f29586b5f7e897c6a41322a602f
Parents: 2b7c1a2
Author: devozerov <vo...@gridgain.com>
Authored: Mon Mar 20 15:34:35 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Mar 20 15:34:35 2017 +0300

----------------------------------------------------------------------
 .../processors/query/GridQueryProcessor.java    |  76 +++++----
 .../query/ddl/IndexOperationHandler.java        | 161 +++++++++++++++++++
 2 files changed, 197 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ee37450a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 1bb3c1c..7fe83ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -222,8 +222,17 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (initIdxStates != null) {
             Map<String, QueryIndexState> readyIdxStates = initIdxStates.readyOperations();
 
-            for (QueryTypeCandidate cand : cands)
-                applyReadyDynamicOperations(cand.descriptor(), readyIdxStates);
+            for (QueryTypeCandidate cand : cands) {
+                QueryTypeDescriptorImpl desc = cand.descriptor();
+
+                for (Map.Entry<String, QueryIndexState> entry : readyIdxStates.entrySet()) {
+                    String idxName = entry.getKey();
+                    QueryIndexState idxState = entry.getValue();
+
+                    if (F.eq(desc.tableName(), idxState.tableName()))
+                        QueryUtils.processDynamicIndexChange(idxName, idxState.index(), desc);
+                }
+            }
         }
 
         // Ready to register at this point.
@@ -245,19 +254,19 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Apply ready dynamic index states to not-yet-registered descriptor.
+     * Find current coordinator.
      *
-     * @param desc Descriptor.
-     * @param idxStates Index states.
+     * @return {@code True} if node is coordinator.
      */
-    private void applyReadyDynamicOperations(QueryTypeDescriptorImpl desc, Map<String, QueryIndexState> idxStates)
-        throws IgniteCheckedException {
-        for (Map.Entry<String, QueryIndexState> entry : idxStates.entrySet()) {
-            String idxName = entry.getKey();
-            QueryIndexState idxState = entry.getValue();
+    private ClusterNode findCoordinator() {
+        ClusterNode res = null;
 
-            QueryUtils.processDynamicIndexChange(idxName, idxState.index(), desc);
+        for (ClusterNode node : ctx.discovery().aliveServerNodes()) {
+            if (res == null || res.order() > node.order())
+                res = node;
         }
+
+        return res;
     }
 
     /** {@inheritDoc} */
@@ -308,6 +317,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Handle cache start. Invoked either from GridCacheProcessor.onKernalStart() method or from exchange worker.
+     * When called for the first time, we initialize topology thus understanding whether current node is coordinator
+     * or not.
+     *
      * @param cctx Cache context.
      * @param idxStates Index states.
      * @throws IgniteCheckedException If failed.
@@ -353,33 +366,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      */
     public void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) {
         idxWorker.onAccept(msg);
-    }
-
-    /**
-     * Handle index accept message.
-     *
-     * @param msg Message.
-     */
-    public void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
-        idxWorker.onFinish(msg);
-    }
-
-    /**
-     * Handle node leave.
-     *
-     * @param node Node.
-     */
-    public void onNodeLeave(ClusterNode node) {
-        // TODO.
-    }
 
-    /**
-     * Handle index init discovery message.
-     *
-     * @param space Space.
-     * @param op Operation.
-     */
-    public void onIndexAccept(String space, AbstractIndexOperation op) {
         idxLock.writeLock().lock();
 
         // TODO
@@ -456,12 +443,21 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Handle index ack discovery message.
+     * Handle index accept message.
      *
      * @param msg Message.
      */
-    private void onIndexAckDiscoveryMessage(String space, IndexAcceptDiscoveryMessage msg) {
-        // TODO
+    public void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
+        idxWorker.onFinish(msg);
+    }
+
+    /**
+     * Handle node leave.
+     *
+     * @param node Node.
+     */
+    public void onNodeLeave(ClusterNode node) {
+        // TODO.
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee37450a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
new file mode 100644
index 0000000..6932724
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Index change handler.
+ */
+public class IndexOperationHandler {
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Query processor */
+    private final GridQueryProcessor qryProc;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Target operation. */
+    private final AbstractIndexOperation op;
+
+    /** Operation future. */
+    private final GridFutureAdapter opFut;
+
+    /** Mutex for concurrent access. */
+    private final Object mux = new Object();
+
+    /** Init flag. */
+    private boolean init;
+
+    /** Cancel flag. */
+    private boolean cancel;
+
+    /** Worker. */
+    private IndexWorker worker;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     * @param qryProc Query processor.
+     * @param op Target operation.
+     */
+    public IndexOperationHandler(GridKernalContext ctx, GridQueryProcessor qryProc, AbstractIndexOperation op) {
+        this.ctx = ctx;
+        this.qryProc = qryProc;
+        this.op = op;
+
+        log = ctx.log(IndexOperationHandler.class);
+        opFut = new GridFutureAdapter();
+    }
+
+    /**
+     * Perform initialization routine.
+     */
+    public void init() {
+        synchronized (mux) {
+            if (!init) {
+                init = true;
+
+                if (!cancel) {
+                    worker = new IndexWorker(ctx.igniteInstanceName(), workerName(), log);
+
+                    new IgniteThread(worker).start();
+
+                    worker.awaitStart();
+                }
+            }
+        }
+    }
+
+    /**
+     * @return Worker name.
+     */
+    private String workerName() {
+        return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName();
+    }
+
+    /**
+     * Cancel operation.
+     */
+    public void cancel() {
+        synchronized (mux) {
+            if (!cancel) {
+                cancel = true;
+
+                if (worker != null)
+                    worker.cancel();
+            }
+
+            // TODO
+        }
+    }
+
+    /**
+     * Single-shot index worker responsible for operation execution.
+     */
+    private class IndexWorker extends GridWorker {
+        /** Worker start latch. */
+        private final CountDownLatch startLatch = new CountDownLatch(1);
+
+        /**
+         * Constructor.
+         *
+         * @param igniteInstanceName Ignite instance name.
+         * @param name Worker name.
+         * @param log Logger.
+         */
+        public IndexWorker(@Nullable String igniteInstanceName, String name, IgniteLogger log) {
+            super(igniteInstanceName, name, log);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            startLatch.countDown();
+
+            // TODO: Do actual create/drop.
+        }
+
+        /**
+         * Await start.
+         */
+        private void awaitStart() {
+            try {
+                startLatch.await();
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                throw new IgniteInterruptedException("Interrupted while waiting index operation worker start: " +
+                    name(), e);
+            }
+        }
+    }
+}


[2/2] ignite git commit: Added operation handler.

Posted by vo...@apache.org.
Added operation handler.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b2a3ded
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b2a3ded
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b2a3ded

Branch: refs/heads/ignite-4565-ddl
Commit: 1b2a3dedb43f4950e3ac67204b717bc44460d28c
Parents: ee37450
Author: devozerov <vo...@gridgain.com>
Authored: Mon Mar 20 15:48:56 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Mar 20 15:48:56 2017 +0300

----------------------------------------------------------------------
 .../processors/query/GridQueryProcessor.java    | 11 ++++
 .../ddl/IndexAbstractDiscoveryMessage.java      | 17 -------
 .../ddl/IndexOperationCancellationToken.java    | 53 ++++++++++++++++++++
 .../query/ddl/IndexOperationHandler.java        | 37 +++++++-------
 4 files changed, 84 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 7fe83ab..517a30d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
 import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexOperationCancellationToken;
 import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
 import org.apache.ignite.internal.processors.query.ddl.task.IndexingAcceptTask;
 import org.apache.ignite.internal.processors.query.ddl.task.IndexingCacheStartTask;
@@ -461,6 +462,16 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Process index operation.
+     *
+     * @param op Operation.
+     * @param cancelToken Cancel token.
+     */
+    public void processIndexOperation(AbstractIndexOperation op, IndexOperationCancellationToken cancelToken) {
+        // TODO.
+    }
+
+    /**
      * Register cache in indexing SPI.
      *
      * @param space Space.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
index 11d8f93..3de525b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
@@ -41,9 +41,6 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe
     /** Whether request must be propagated to exchange worker for final processing. */
     private transient boolean exchange;
 
-    /** Local cache index state at the moment of message receive. */
-    private transient QueryIndexStates idxStates;
-
     /**
      * Constructor.
      *
@@ -66,20 +63,6 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe
     }
 
     /**
-     * @return Index states.
-     */
-    @Nullable public QueryIndexStates indexStates() {
-        return idxStates;
-    }
-
-    /**
-     * @param idxStates Index states.
-     */
-    public void indexStates(QueryIndexStates idxStates) {
-        this.idxStates = idxStates;
-    }
-
-    /**
      * @return Whether request must be propagated to exchange worker for final processing.
      */
     public boolean exchange() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java
new file mode 100644
index 0000000..e8b2c2b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Index operation cancellation token.
+ */
+public class IndexOperationCancellationToken {
+    /** Cancel flag. */
+    private final AtomicBoolean flag = new AtomicBoolean();
+
+    /**
+     * Get cancel state.
+     *
+     * @return {@code True} if cancelled.
+     */
+    public boolean isCancelled() {
+        return flag.get();
+    }
+
+    /**
+     * Do cancel.
+     *
+     * @return {@code True} if cancel flag was set by this call.
+     */
+    public boolean cancel() {
+        return flag.compareAndSet(false, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IndexOperationCancellationToken.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
index 6932724..116b613 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
@@ -51,12 +51,12 @@ public class IndexOperationHandler {
     /** Mutex for concurrent access. */
     private final Object mux = new Object();
 
+    /** Cancellation token. */
+    private final IndexOperationCancellationToken cancelToken = new IndexOperationCancellationToken();
+
     /** Init flag. */
     private boolean init;
 
-    /** Cancel flag. */
-    private boolean cancel;
-
     /** Worker. */
     private IndexWorker worker;
 
@@ -84,7 +84,7 @@ public class IndexOperationHandler {
             if (!init) {
                 init = true;
 
-                if (!cancel) {
+                if (!cancelToken.isCancelled()) {
                     worker = new IndexWorker(ctx.igniteInstanceName(), workerName(), log);
 
                     new IgniteThread(worker).start();
@@ -96,29 +96,25 @@ public class IndexOperationHandler {
     }
 
     /**
-     * @return Worker name.
-     */
-    private String workerName() {
-        return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName();
-    }
-
-    /**
      * Cancel operation.
      */
     public void cancel() {
         synchronized (mux) {
-            if (!cancel) {
-                cancel = true;
-
+            if (!cancelToken.cancel()) {
                 if (worker != null)
                     worker.cancel();
             }
-
-            // TODO
         }
     }
 
     /**
+     * @return Worker name.
+     */
+    private String workerName() {
+        return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName();
+    }
+
+    /**
      * Single-shot index worker responsible for operation execution.
      */
     private class IndexWorker extends GridWorker {
@@ -140,7 +136,14 @@ public class IndexOperationHandler {
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
             startLatch.countDown();
 
-            // TODO: Do actual create/drop.
+            try {
+                qryProc.processIndexOperation(op, cancelToken);
+
+                opFut.onDone();
+            }
+            catch (Exception e) {
+                opFut.onDone(e);
+            }
         }
 
         /**