You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/08/14 10:30:19 UTC
[09/50] ignite git commit: WIP.
WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/34fa8a08
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/34fa8a08
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/34fa8a08
Branch: refs/heads/ignite-5991-6019
Commit: 34fa8a08a8e25db6e6cb0a41a515517c3a062fba
Parents: 0b88846
Author: devozerov <vo...@gridgain.com>
Authored: Thu Aug 10 15:53:52 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Aug 10 15:53:52 2017 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoPolicy.java | 3 +
.../ignite/thread/IgniteThreadFactory.java | 6 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 8 ++
.../h2/twostep/lazy/MapQueryLazyExecutor.java | 106 +++++++++++++++++++
.../twostep/lazy/MapQueryLazyIgniteThread.java | 65 ++++++++++++
.../twostep/lazy/MapQueryLazyWorkerFactory.java | 47 ++++++++
6 files changed, 232 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/34fa8a08/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 3f31f92..6900a6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -61,6 +61,9 @@ public class GridIoPolicy {
/** Schema pool. */
public static final byte SCHEMA_POOL = 12;
+ /** Pool for lazy query execution. */
+ public static final byte QUERY_LAZY_POOL = 13;
+
/**
* Defines the range of reserved pools that are not available for plugins.
* @param key The key.
http://git-wip-us.apache.org/repos/asf/ignite/blob/34fa8a08/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
index 062c973..18f6763 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
@@ -30,16 +30,16 @@ import org.jetbrains.annotations.NotNull;
*/
public class IgniteThreadFactory implements ThreadFactory {
/** Ignite instance name. */
- private final String igniteInstanceName;
+ protected final String igniteInstanceName;
/** Thread name. */
- private final String threadName;
+ protected final String threadName;
/** Index generator for threads. */
private final AtomicInteger idxGen = new AtomicInteger();
/** */
- private final byte plc;
+ protected final byte plc;
/**
* Constructs new thread factory for given grid. All threads will belong
http://git-wip-us.apache.org/repos/asf/ignite/blob/34fa8a08/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index b6227f8..b0d40cc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
+import org.apache.ignite.internal.processors.query.h2.twostep.lazy.MapQueryLazyIgniteThread;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@ -162,6 +163,13 @@ public class GridMapQueryExecutor {
}
/**
+ * @return {@code True} if running in lazy mode.
+ */
+ private static boolean lazyThread() {
+ return Thread.currentThread() instanceof MapQueryLazyIgniteThread;
+ }
+
+ /**
* @param nodeId Node ID.
* @param msg Message.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/34fa8a08/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyExecutor.java
new file mode 100644
index 0000000..d85e455
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyExecutor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.lazy;
+
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
+
+/**
+ * Pool of workers for lazy query execution.
+ */
+public class MapQueryLazyExecutor {
+ /** Executor responsible for shared task dispatch. */
+ private final IgniteThreadPoolExecutor exec;
+
+ /** Threads. */
+ private final ConcurrentHashMap<Long, MapQueryLazyIgniteThread> threads = new ConcurrentHashMap<>();
+
+ /**
+ * Constructor.
+ *
+ * @param igniteInstanceName Ignite instance name.
+ * @param poolSize Thread pool size.
+ */
+ public MapQueryLazyExecutor(String igniteInstanceName, int poolSize) {
+ assert poolSize > 0;
+
+ exec = new IgniteThreadPoolExecutor(
+ "query-lazy",
+ igniteInstanceName,
+ poolSize,
+ poolSize,
+ DFLT_THREAD_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<Runnable>(),
+ GridIoPolicy.SYSTEM_POOL);
+ }
+
+ /**
+ * Submit a command to a dedicated worker, or enqueue in case no workers are available.
+ *
+ * @param cmd Shared command to be executed.
+ */
+ public void submitShared(final Runnable cmd) {
+ Runnable r = new Runnable() {
+ @Override public void run() {
+ // Register thread in a shared map.
+ MapQueryLazyIgniteThread thread = (MapQueryLazyIgniteThread)Thread.currentThread();
+
+ long threadKey = thread.index();
+
+ threads.put(threadKey, thread);
+
+ try {
+ cmd.run();
+ }
+ finally {
+ threads.remove(threadKey);
+ }
+ }
+ };
+
+ exec.submit(r);
+ }
+
+ /**
+ * Submit a task for specific thread.
+ *
+ * @param idx Index.
+ * @param cmd Command to be executed.
+ */
+ public void submit(long idx, Runnable cmd) {
+ MapQueryLazyIgniteThread thread = threads.get(idx);
+
+ if (thread != null)
+ thread.submit(cmd);
+ }
+
+ /**
+ * Shutdown the cluster.
+ */
+ public void shutdown() {
+ exec.shutdown();
+
+ for (MapQueryLazyIgniteThread thread : threads.values())
+ thread.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/34fa8a08/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyIgniteThread.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyIgniteThread.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyIgniteThread.java
new file mode 100644
index 0000000..a207ef0
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyIgniteThread.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.lazy;
+
+import org.apache.ignite.thread.IgniteThread;
+
+/**
+ * Ignite thread for lazy map query execution.
+ */
+public class MapQueryLazyIgniteThread extends IgniteThread {
+ /** Thread index. */
+ private final long idx;
+
+ /**
+ * Constructor.
+ *
+ * @param igniteInstanceName Ignite instance name.
+ * @param threadName Thread name.
+ * @param r Runnable.
+ * @param idx Index.
+ */
+ public MapQueryLazyIgniteThread(String igniteInstanceName, String threadName, Runnable r, long idx) {
+ super(igniteInstanceName, threadName, r);
+
+ this.idx = idx;
+ }
+
+ /**
+ * @return Thread index.
+ */
+ public long index() {
+ return idx;
+ }
+
+ /**
+ * Submit new runnable.
+ *
+ * @param r Runnable to be executed.
+ */
+ public void submit(Runnable r) {
+ // TODO
+ }
+
+ /**
+ * Shutdown the thread.
+ */
+ public void shutdown() {
+ // TODO
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/34fa8a08/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyWorkerFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyWorkerFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyWorkerFactory.java
new file mode 100644
index 0000000..693853b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyWorkerFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.lazy;
+
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.thread.IgniteThreadFactory;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Factory for lazy mapper workers.
+ */
+public class MapQueryLazyWorkerFactory extends IgniteThreadFactory {
+ /** Thread ID generator. */
+ private static final AtomicLong THREAD_ID_GEN = new AtomicLong();
+
+ /**
+ * Constructor.
+ *
+ * @param igniteInstanceName Ignite instance name.
+ * @param threadName Thread name.
+ */
+ public MapQueryLazyWorkerFactory(String igniteInstanceName, String threadName) {
+ super(igniteInstanceName, threadName, GridIoPolicy.QUERY_POOL);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Thread newThread(@NotNull Runnable r) {
+ return new MapQueryLazyIgniteThread(igniteInstanceName, threadName, r, THREAD_ID_GEN.incrementAndGet());
+ }
+}