You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/20 10:12:36 UTC

ignite git commit: WIP on wiring everything up.

Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl b6f49269e -> ae2ad1925


WIP on wiring everything up.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae2ad192
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae2ad192
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae2ad192

Branch: refs/heads/ignite-4565-ddl
Commit: ae2ad192585c1a1b41a605e56f4d52de82b3fb4b
Parents: b6f4926
Author: devozerov <vo...@gridgain.com>
Authored: Mon Mar 20 13:02:33 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Mar 20 13:02:33 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    | 22 +----
 .../processors/query/GridQueryProcessor.java    | 98 ++++++++++++++------
 .../query/ddl/task/IndexingAcceptTask.java      | 51 ++++++++++
 .../query/ddl/task/IndexingFinishTask.java      | 52 +++++++++++
 4 files changed, 175 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2ad192/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 089b399..89629ea 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -406,9 +406,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             IndexAbstractDiscoveryMessage msg = ((IndexExchangeWorkerTask)task).message();
 
             if (msg instanceof IndexAcceptDiscoveryMessage)
-                onIndexAcceptMessageExchange((IndexAcceptDiscoveryMessage)msg);
+                ctx.query().onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg);
             else if (msg instanceof IndexFinishDiscoveryMessage)
-                onIndexFinishMessageExchange((IndexFinishDiscoveryMessage)msg);
+                ctx.query().onIndexFinishMessage((IndexFinishDiscoveryMessage)msg);
             else
                 U.warn(log, "Unsupported index discovery message: " + msg);
         }
@@ -2803,15 +2803,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Handle index accept message in exchange thread.
-     *
-     * @param msg Message.
-     */
-    private void onIndexAcceptMessageExchange(IndexAcceptDiscoveryMessage msg) {
-        // TODO
-    }
-
-    /**
      * Handle cache index ack discovery message.
      *
      * @param msg Message.
@@ -2828,15 +2819,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Handle index finish message in exchange thread.
-     *
-     * @param msg Message.
-     */
-    private void onIndexFinishMessageExchange(IndexFinishDiscoveryMessage msg) {
-        // TODO
-    }
-
-    /**
      * @param batch Change request batch.
      * @param topVer Current topology version.
      * @return {@code True} if minor topology version should be increased.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2ad192/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 6b9d9af..bdc892f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -64,9 +64,12 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
 import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.task.IndexingAcceptTask;
 import org.apache.ignite.internal.processors.query.ddl.task.IndexingCacheStartTask;
 import org.apache.ignite.internal.processors.query.ddl.task.IndexingCacheStopTask;
+import org.apache.ignite.internal.processors.query.ddl.task.IndexingFinishTask;
 import org.apache.ignite.internal.processors.query.ddl.task.IndexingTask;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -344,6 +347,24 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Handle index accept message.
+     *
+     * @param msg Message.
+     */
+    public void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) {
+        idxWorker.onAccept(msg);
+    }
+
+    /**
+     * Handle index accept message.
+     *
+     * @param msg Message.
+     */
+    public void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
+        idxWorker.onFinish(msg);
+    }
+
+    /**
      * Handle index init discovery message.
      *
      * @param space Space.
@@ -1100,6 +1121,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param ldr Class loader to undeploy.
      * @throws IgniteCheckedException If undeploy failed.
      */
+    // TODO: Can we remove this method? Handle undeploy for indexing otherwise.
     public void onUndeploy(@Nullable String space, ClassLoader ldr) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Undeploy [space=" + space + "]");
@@ -1293,10 +1315,17 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                 IndexingTask task = tasks.take();
 
                 if (task != null) {
-                    if (task instanceof IndexingCacheStartTask)
-                        handleCacheStart((IndexingCacheStartTask)task);
+                    if (task instanceof IndexingCacheStartTask) {
+                        IndexingCacheStartTask task0 = (IndexingCacheStartTask)task;
+
+                        handleCacheStart(task0.space(), task0.initialIndexStates());
+                    }
                     else if (task instanceof IndexingCacheStopTask)
                         handleCacheStop((IndexingCacheStopTask)task);
+                    else if (task instanceof IndexingAcceptTask)
+                        handleAccept(((IndexingAcceptTask)task).message());
+                    else if (task instanceof IndexingFinishTask)
+                        handleFinish(((IndexingFinishTask)task).message());
                     else
                         U.warn(log, "Unsupported task: " + task);
                 }
@@ -1325,9 +1354,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         /**
          * Handle cache start task.
          *
-         * @param task Task.
+         * @param space Space.
+         * @param initIdxStates Initial index states.
          */
-        private void handleCacheStart(IndexingCacheStartTask task) {
+        private void handleCacheStart(String space, QueryIndexStates initIdxStates) {
             // TODO: Start active operations.
         }
 
@@ -1353,6 +1383,42 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         }
 
         /**
+         * Index accept callback.
+         *
+         * @param msg Message.
+         */
+        public void onAccept(IndexAcceptDiscoveryMessage msg) {
+            submit(new IndexingAcceptTask(msg));
+        }
+
+        /**
+         * Handle index accept.
+         *
+         * @param msg Message.
+         */
+        private void handleAccept(IndexAcceptDiscoveryMessage msg) {
+            // TODO
+        }
+
+        /**
+         * Index finish callback.
+         *
+         * @param msg Message.
+         */
+        public void onFinish(IndexFinishDiscoveryMessage msg) {
+            submit(new IndexingFinishTask(msg));
+        }
+
+        /**
+         * Handle index finish.
+         *
+         * @param msg Message.
+         */
+        private void handleFinish(IndexFinishDiscoveryMessage msg) {
+            // TODO
+        }
+
+        /**
          * Update topology in response to node leave event.
          */
         private void updateTopology() {
@@ -1405,30 +1471,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Change index task.
-     */
-    private static class ChangeIndexingTask implements IndexingTask {
-        /** Operation. */
-        private final AbstractIndexOperation op;
-
-        /**
-         * Constructor.
-         *
-         * @param op Operation.
-         */
-        public ChangeIndexingTask(AbstractIndexOperation op) {
-            this.op = op;
-        }
-
-        /**
-         * @return Operation.
-         */
-        public AbstractIndexOperation operation() {
-            return op;
-        }
-    }
-
-    /**
      * Node leave task.
      */
     private static class NodeLeaveTask implements IndexingTask {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2ad192/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingAcceptTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingAcceptTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingAcceptTask.java
new file mode 100644
index 0000000..dd73501
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingAcceptTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl.task;
+
+import org.apache.ignite.internal.processors.query.ddl.IndexAbstractDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Indexing accept task.
+ */
+public class IndexingAcceptTask implements IndexingTask {
+    /** Message */
+    private final IndexAcceptDiscoveryMessage msg;
+
+    /**
+     * Constructor.
+     *
+     * @param msg Message.
+     */
+    public IndexingAcceptTask(IndexAcceptDiscoveryMessage msg) {
+        this.msg = msg;
+    }
+
+    /**
+     * @return Message.
+     */
+    public IndexAcceptDiscoveryMessage message() {
+        return msg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IndexingAcceptTask.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2ad192/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingFinishTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingFinishTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingFinishTask.java
new file mode 100644
index 0000000..2149b13
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingFinishTask.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl.task;
+
+import org.apache.ignite.internal.processors.query.ddl.IndexAbstractDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Indexing finish task.
+ */
+public class IndexingFinishTask implements IndexingTask {
+    /** Message */
+    private final IndexFinishDiscoveryMessage msg;
+
+    /**
+     * Constructor.
+     *
+     * @param msg Message.
+     */
+    public IndexingFinishTask(IndexFinishDiscoveryMessage msg) {
+        this.msg = msg;
+    }
+
+    /**
+     * @return Message.
+     */
+    public IndexFinishDiscoveryMessage message() {
+        return msg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IndexingFinishTask.class, this);
+    }
+}