You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/17 15:27:07 UTC

ignite git commit: Wired up exchange with handlers.

Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl 4538526d8 -> 137054969


Wired up exchange with handlers.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/13705496
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/13705496
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/13705496

Branch: refs/heads/ignite-4565-ddl
Commit: 137054969c75156242e51be24fa14ecf38f32a89
Parents: 4538526
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 18:25:48 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 18:25:48 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    | 53 +++++++++++++++---
 .../query/ddl/IndexExchangeWorkerTask.java      | 57 ++++++++++++++++++++
 2 files changed, 102 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/13705496/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 0b17e5a..089b399 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.IndexAbstractDiscoveryMessage;
 import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexExchangeWorkerTask;
 import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
 import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
 import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
@@ -385,6 +386,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Task or {@code null} if message doesn't require any special processing.
      */
     public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) {
+        if (msg instanceof IndexAbstractDiscoveryMessage) {
+            IndexAbstractDiscoveryMessage msg0 = (IndexAbstractDiscoveryMessage)msg;
+
+            if (msg0.exchange())
+                return new IndexExchangeWorkerTask(msg0);
+        }
+
         return null;
     }
 
@@ -394,7 +402,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param task Task.
      */
     public void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) {
-        // No-op.
+        if (task instanceof IndexExchangeWorkerTask) {
+            IndexAbstractDiscoveryMessage msg = ((IndexExchangeWorkerTask)task).message();
+
+            if (msg instanceof IndexAcceptDiscoveryMessage)
+                onIndexAcceptMessageExchange((IndexAcceptDiscoveryMessage)msg);
+            else if (msg instanceof IndexFinishDiscoveryMessage)
+                onIndexFinishMessageExchange((IndexFinishDiscoveryMessage)msg);
+            else
+                U.warn(log, "Unsupported index discovery message: " + msg);
+        }
+        else
+            U.warn(log, "Unsupported custom exchange task: " + task);
     }
 
     /**
@@ -2720,11 +2739,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             }
 
             if (msg instanceof IndexProposeDiscoveryMessage)
-                onIndexProposeMessage((IndexProposeDiscoveryMessage)msg);
+                onIndexProposeMessageDiscovery((IndexProposeDiscoveryMessage)msg);
             else if (msg instanceof IndexAcceptDiscoveryMessage)
-                onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg);
+                onIndexAcceptMessageDiscovery((IndexAcceptDiscoveryMessage)msg);
             else if (msg instanceof IndexFinishDiscoveryMessage)
-                onIndexFinishMessage((IndexFinishDiscoveryMessage)msg);
+                onIndexFinishMessageDiscovery((IndexFinishDiscoveryMessage)msg);
             else
                 U.warn(log, "Unsupported index discovery message type (will ignore): " + msg);
 
@@ -2742,7 +2761,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      *
      * @param msg Message.
      */
-    private void onIndexProposeMessage(IndexProposeDiscoveryMessage msg) {
+    private void onIndexProposeMessageDiscovery(IndexProposeDiscoveryMessage msg) {
         UUID locNodeId = ctx.localNodeId();
 
         AbstractIndexOperation op = msg.operation();
@@ -2768,11 +2787,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Handle cache index ack discovery message.
+     * Handle cache index accept discovery message.
      *
      * @param msg Message.
      */
-    private void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) {
+    private void onIndexAcceptMessageDiscovery(IndexAcceptDiscoveryMessage msg) {
         AbstractIndexOperation op = msg.operation();
 
         DynamicCacheDescriptor desc = cacheDescriptor(op.space());
@@ -2784,11 +2803,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Handle index accept message in exchange thread.
+     *
+     * @param msg Message.
+     */
+    private void onIndexAcceptMessageExchange(IndexAcceptDiscoveryMessage msg) {
+        // TODO
+    }
+
+    /**
      * Handle cache index ack discovery message.
      *
      * @param msg Message.
      */
-    private void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
+    private void onIndexFinishMessageDiscovery(IndexFinishDiscoveryMessage msg) {
         AbstractIndexOperation op = msg.operation();
 
         DynamicCacheDescriptor desc = cacheDescriptor(op.space());
@@ -2800,6 +2828,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Handle index finish message in exchange thread.
+     *
+     * @param msg Message.
+     */
+    private void onIndexFinishMessageExchange(IndexFinishDiscoveryMessage msg) {
+        // TODO
+    }
+
+    /**
      * @param batch Change request batch.
      * @param topVer Current topology version.
      * @return {@code True} if minor topology version should be increased.

http://git-wip-us.apache.org/repos/asf/ignite/blob/13705496/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexExchangeWorkerTask.java
new file mode 100644
index 0000000..4cf83fe
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexExchangeWorkerTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Cache index exchange worker task.
+ */
+public class IndexExchangeWorkerTask implements CachePartitionExchangeWorkerTask {
+    /** Message. */
+    private final IndexAbstractDiscoveryMessage msg;
+
+    /**
+     * Constructor.
+     *
+     * @param msg Message.
+     */
+    public IndexExchangeWorkerTask(IndexAbstractDiscoveryMessage msg) {
+        assert msg != null;
+
+        this.msg = msg;
+    }
+
+    /**
+     * @return Message.
+     */
+    public IndexAbstractDiscoveryMessage message() {
+        return msg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isExchange() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IndexExchangeWorkerTask.class, this);
+    }
+}