You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/17 15:27:07 UTC
ignite git commit: Wired up exchange with handlers.
Repository: ignite
Updated Branches:
refs/heads/ignite-4565-ddl 4538526d8 -> 137054969
Wired up exchange with handlers.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/13705496
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/13705496
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/13705496
Branch: refs/heads/ignite-4565-ddl
Commit: 137054969c75156242e51be24fa14ecf38f32a89
Parents: 4538526
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 18:25:48 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 18:25:48 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheProcessor.java | 53 +++++++++++++++---
.../query/ddl/IndexExchangeWorkerTask.java | 57 ++++++++++++++++++++
2 files changed, 102 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/13705496/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 0b17e5a..089b399 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
import org.apache.ignite.internal.processors.query.ddl.IndexAbstractDiscoveryMessage;
import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexExchangeWorkerTask;
import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
@@ -385,6 +386,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Task or {@code null} if message doesn't require any special processing.
*/
public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) {
+ if (msg instanceof IndexAbstractDiscoveryMessage) {
+ IndexAbstractDiscoveryMessage msg0 = (IndexAbstractDiscoveryMessage)msg;
+
+ if (msg0.exchange())
+ return new IndexExchangeWorkerTask(msg0);
+ }
+
return null;
}
@@ -394,7 +402,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param task Task.
*/
public void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) {
- // No-op.
+ if (task instanceof IndexExchangeWorkerTask) {
+ IndexAbstractDiscoveryMessage msg = ((IndexExchangeWorkerTask)task).message();
+
+ if (msg instanceof IndexAcceptDiscoveryMessage)
+ onIndexAcceptMessageExchange((IndexAcceptDiscoveryMessage)msg);
+ else if (msg instanceof IndexFinishDiscoveryMessage)
+ onIndexFinishMessageExchange((IndexFinishDiscoveryMessage)msg);
+ else
+ U.warn(log, "Unsupported index discovery message: " + msg);
+ }
+ else
+ U.warn(log, "Unsupported custom exchange task: " + task);
}
/**
@@ -2720,11 +2739,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
if (msg instanceof IndexProposeDiscoveryMessage)
- onIndexProposeMessage((IndexProposeDiscoveryMessage)msg);
+ onIndexProposeMessageDiscovery((IndexProposeDiscoveryMessage)msg);
else if (msg instanceof IndexAcceptDiscoveryMessage)
- onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg);
+ onIndexAcceptMessageDiscovery((IndexAcceptDiscoveryMessage)msg);
else if (msg instanceof IndexFinishDiscoveryMessage)
- onIndexFinishMessage((IndexFinishDiscoveryMessage)msg);
+ onIndexFinishMessageDiscovery((IndexFinishDiscoveryMessage)msg);
else
U.warn(log, "Unsupported index discovery message type (will ignore): " + msg);
@@ -2742,7 +2761,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*
* @param msg Message.
*/
- private void onIndexProposeMessage(IndexProposeDiscoveryMessage msg) {
+ private void onIndexProposeMessageDiscovery(IndexProposeDiscoveryMessage msg) {
UUID locNodeId = ctx.localNodeId();
AbstractIndexOperation op = msg.operation();
@@ -2768,11 +2787,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * Handle cache index ack discovery message.
+ * Handle cache index accept discovery message.
*
* @param msg Message.
*/
- private void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) {
+ private void onIndexAcceptMessageDiscovery(IndexAcceptDiscoveryMessage msg) {
AbstractIndexOperation op = msg.operation();
DynamicCacheDescriptor desc = cacheDescriptor(op.space());
@@ -2784,11 +2803,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * Handle index accept message in exchange thread.
+ *
+ * @param msg Message.
+ */
+ private void onIndexAcceptMessageExchange(IndexAcceptDiscoveryMessage msg) {
+ // TODO
+ }
+
+ /**
* Handle cache index ack discovery message.
*
* @param msg Message.
*/
- private void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
+ private void onIndexFinishMessageDiscovery(IndexFinishDiscoveryMessage msg) {
AbstractIndexOperation op = msg.operation();
DynamicCacheDescriptor desc = cacheDescriptor(op.space());
@@ -2800,6 +2828,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * Handle index finish message in exchange thread.
+ *
+ * @param msg Message.
+ */
+ private void onIndexFinishMessageExchange(IndexFinishDiscoveryMessage msg) {
+ // TODO
+ }
+
+ /**
* @param batch Change request batch.
* @param topVer Current topology version.
* @return {@code True} if minor topology version should be increased.
http://git-wip-us.apache.org/repos/asf/ignite/blob/13705496/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexExchangeWorkerTask.java
new file mode 100644
index 0000000..4cf83fe
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexExchangeWorkerTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Cache index exchange worker task.
+ */
+public class IndexExchangeWorkerTask implements CachePartitionExchangeWorkerTask {
+ /** Message. */
+ private final IndexAbstractDiscoveryMessage msg;
+
+ /**
+ * Constructor.
+ *
+ * @param msg Message.
+ */
+ public IndexExchangeWorkerTask(IndexAbstractDiscoveryMessage msg) {
+ assert msg != null;
+
+ this.msg = msg;
+ }
+
+ /**
+ * @return Message.
+ */
+ public IndexAbstractDiscoveryMessage message() {
+ return msg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isExchange() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IndexExchangeWorkerTask.class, this);
+ }
+}