You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/20 10:12:36 UTC
ignite git commit: WIP on wiring everything up.
Repository: ignite
Updated Branches:
refs/heads/ignite-4565-ddl b6f49269e -> ae2ad1925
WIP on wiring everything up.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae2ad192
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae2ad192
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae2ad192
Branch: refs/heads/ignite-4565-ddl
Commit: ae2ad192585c1a1b41a605e56f4d52de82b3fb4b
Parents: b6f4926
Author: devozerov <vo...@gridgain.com>
Authored: Mon Mar 20 13:02:33 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Mar 20 13:02:33 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheProcessor.java | 22 +----
.../processors/query/GridQueryProcessor.java | 98 ++++++++++++++------
.../query/ddl/task/IndexingAcceptTask.java | 51 ++++++++++
.../query/ddl/task/IndexingFinishTask.java | 52 +++++++++++
4 files changed, 175 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2ad192/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 089b399..89629ea 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -406,9 +406,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
IndexAbstractDiscoveryMessage msg = ((IndexExchangeWorkerTask)task).message();
if (msg instanceof IndexAcceptDiscoveryMessage)
- onIndexAcceptMessageExchange((IndexAcceptDiscoveryMessage)msg);
+ ctx.query().onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg);
else if (msg instanceof IndexFinishDiscoveryMessage)
- onIndexFinishMessageExchange((IndexFinishDiscoveryMessage)msg);
+ ctx.query().onIndexFinishMessage((IndexFinishDiscoveryMessage)msg);
else
U.warn(log, "Unsupported index discovery message: " + msg);
}
@@ -2803,15 +2803,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * Handle index accept message in exchange thread.
- *
- * @param msg Message.
- */
- private void onIndexAcceptMessageExchange(IndexAcceptDiscoveryMessage msg) {
- // TODO
- }
-
- /**
* Handle cache index ack discovery message.
*
* @param msg Message.
@@ -2828,15 +2819,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * Handle index finish message in exchange thread.
- *
- * @param msg Message.
- */
- private void onIndexFinishMessageExchange(IndexFinishDiscoveryMessage msg) {
- // TODO
- }
-
- /**
* @param batch Change request batch.
* @param topVer Current topology version.
* @return {@code True} if minor topology version should be increased.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2ad192/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 6b9d9af..bdc892f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -64,9 +64,12 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.task.IndexingAcceptTask;
import org.apache.ignite.internal.processors.query.ddl.task.IndexingCacheStartTask;
import org.apache.ignite.internal.processors.query.ddl.task.IndexingCacheStopTask;
+import org.apache.ignite.internal.processors.query.ddl.task.IndexingFinishTask;
import org.apache.ignite.internal.processors.query.ddl.task.IndexingTask;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -344,6 +347,24 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Handle index accept message.
+ *
+ * @param msg Message.
+ */
+ public void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) {
+ idxWorker.onAccept(msg);
+ }
+
+ /**
+ * Handle index accept message.
+ *
+ * @param msg Message.
+ */
+ public void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
+ idxWorker.onFinish(msg);
+ }
+
+ /**
* Handle index init discovery message.
*
* @param space Space.
@@ -1100,6 +1121,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param ldr Class loader to undeploy.
* @throws IgniteCheckedException If undeploy failed.
*/
+ // TODO: Can we remove this method? Handle undeploy for indexing otherwise.
public void onUndeploy(@Nullable String space, ClassLoader ldr) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Undeploy [space=" + space + "]");
@@ -1293,10 +1315,17 @@ public class GridQueryProcessor extends GridProcessorAdapter {
IndexingTask task = tasks.take();
if (task != null) {
- if (task instanceof IndexingCacheStartTask)
- handleCacheStart((IndexingCacheStartTask)task);
+ if (task instanceof IndexingCacheStartTask) {
+ IndexingCacheStartTask task0 = (IndexingCacheStartTask)task;
+
+ handleCacheStart(task0.space(), task0.initialIndexStates());
+ }
else if (task instanceof IndexingCacheStopTask)
handleCacheStop((IndexingCacheStopTask)task);
+ else if (task instanceof IndexingAcceptTask)
+ handleAccept(((IndexingAcceptTask)task).message());
+ else if (task instanceof IndexingFinishTask)
+ handleFinish(((IndexingFinishTask)task).message());
else
U.warn(log, "Unsupported task: " + task);
}
@@ -1325,9 +1354,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/**
* Handle cache start task.
*
- * @param task Task.
+ * @param space Space.
+ * @param initIdxStates Initial index states.
*/
- private void handleCacheStart(IndexingCacheStartTask task) {
+ private void handleCacheStart(String space, QueryIndexStates initIdxStates) {
// TODO: Start active operations.
}
@@ -1353,6 +1383,42 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Index accept callback.
+ *
+ * @param msg Message.
+ */
+ public void onAccept(IndexAcceptDiscoveryMessage msg) {
+ submit(new IndexingAcceptTask(msg));
+ }
+
+ /**
+ * Handle index accept.
+ *
+ * @param msg Message.
+ */
+ private void handleAccept(IndexAcceptDiscoveryMessage msg) {
+ // TODO
+ }
+
+ /**
+ * Index finish callback.
+ *
+ * @param msg Message.
+ */
+ public void onFinish(IndexFinishDiscoveryMessage msg) {
+ submit(new IndexingFinishTask(msg));
+ }
+
+ /**
+ * Handle index finish.
+ *
+ * @param msg Message.
+ */
+ private void handleFinish(IndexFinishDiscoveryMessage msg) {
+ // TODO
+ }
+
+ /**
* Update topology in response to node leave event.
*/
private void updateTopology() {
@@ -1405,30 +1471,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
- * Change index task.
- */
- private static class ChangeIndexingTask implements IndexingTask {
- /** Operation. */
- private final AbstractIndexOperation op;
-
- /**
- * Constructor.
- *
- * @param op Operation.
- */
- public ChangeIndexingTask(AbstractIndexOperation op) {
- this.op = op;
- }
-
- /**
- * @return Operation.
- */
- public AbstractIndexOperation operation() {
- return op;
- }
- }
-
- /**
* Node leave task.
*/
private static class NodeLeaveTask implements IndexingTask {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2ad192/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingAcceptTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingAcceptTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingAcceptTask.java
new file mode 100644
index 0000000..dd73501
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingAcceptTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl.task;
+
+import org.apache.ignite.internal.processors.query.ddl.IndexAbstractDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Indexing accept task.
+ */
+public class IndexingAcceptTask implements IndexingTask {
+ /** Message */
+ private final IndexAcceptDiscoveryMessage msg;
+
+ /**
+ * Constructor.
+ *
+ * @param msg Message.
+ */
+ public IndexingAcceptTask(IndexAcceptDiscoveryMessage msg) {
+ this.msg = msg;
+ }
+
+ /**
+ * @return Message.
+ */
+ public IndexAcceptDiscoveryMessage message() {
+ return msg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IndexingAcceptTask.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2ad192/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingFinishTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingFinishTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingFinishTask.java
new file mode 100644
index 0000000..2149b13
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/task/IndexingFinishTask.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl.task;
+
+import org.apache.ignite.internal.processors.query.ddl.IndexAbstractDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Indexing finish task.
+ */
+public class IndexingFinishTask implements IndexingTask {
+ /** Message */
+ private final IndexFinishDiscoveryMessage msg;
+
+ /**
+ * Constructor.
+ *
+ * @param msg Message.
+ */
+ public IndexingFinishTask(IndexFinishDiscoveryMessage msg) {
+ this.msg = msg;
+ }
+
+ /**
+ * @return Message.
+ */
+ public IndexFinishDiscoveryMessage message() {
+ return msg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IndexingFinishTask.class, this);
+ }
+}