You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/08/14 10:30:19 UTC

[09/50] ignite git commit: WIP.

WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/34fa8a08
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/34fa8a08
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/34fa8a08

Branch: refs/heads/ignite-5991-6019
Commit: 34fa8a08a8e25db6e6cb0a41a515517c3a062fba
Parents: 0b88846
Author: devozerov <vo...@gridgain.com>
Authored: Thu Aug 10 15:53:52 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Aug 10 15:53:52 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoPolicy.java    |   3 +
 .../ignite/thread/IgniteThreadFactory.java      |   6 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |   8 ++
 .../h2/twostep/lazy/MapQueryLazyExecutor.java   | 106 +++++++++++++++++++
 .../twostep/lazy/MapQueryLazyIgniteThread.java  |  65 ++++++++++++
 .../twostep/lazy/MapQueryLazyWorkerFactory.java |  47 ++++++++
 6 files changed, 232 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/34fa8a08/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 3f31f92..6900a6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -61,6 +61,9 @@ public class GridIoPolicy {
     /** Schema pool.  */
     public static final byte SCHEMA_POOL = 12;
 
+    /** Pool for lazy query execution. */
+    public static final byte QUERY_LAZY_POOL = 13;
+
     /**
      * Defines the range of reserved pools that are not available for plugins.
      * @param key The key.

http://git-wip-us.apache.org/repos/asf/ignite/blob/34fa8a08/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
index 062c973..18f6763 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
@@ -30,16 +30,16 @@ import org.jetbrains.annotations.NotNull;
  */
 public class IgniteThreadFactory implements ThreadFactory {
     /** Ignite instance name. */
-    private final String igniteInstanceName;
+    protected final String igniteInstanceName;
 
     /** Thread name. */
-    private final String threadName;
+    protected final String threadName;
 
     /** Index generator for threads. */
     private final AtomicInteger idxGen = new AtomicInteger();
 
     /** */
-    private final byte plc;
+    protected final byte plc;
 
     /**
      * Constructs new thread factory for given grid. All threads will belong

http://git-wip-us.apache.org/repos/asf/ignite/blob/34fa8a08/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index b6227f8..b0d40cc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
+import org.apache.ignite.internal.processors.query.h2.twostep.lazy.MapQueryLazyIgniteThread;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@ -162,6 +163,13 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * @return {@code True} if running in lazy mode.
+     */
+    private static boolean lazyThread() {
+        return Thread.currentThread() instanceof MapQueryLazyIgniteThread;
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param msg Message.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/34fa8a08/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyExecutor.java
new file mode 100644
index 0000000..d85e455
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyExecutor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.lazy;
+
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
+
+/**
+ * Pool of workers for lazy query execution.
+ */
+public class MapQueryLazyExecutor {
+    /** Executor responsible for shared task dispatch. */
+    private final IgniteThreadPoolExecutor exec;
+
+    /** Threads. */
+    private final ConcurrentHashMap<Long, MapQueryLazyIgniteThread> threads = new ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param igniteInstanceName Ignite instance name.
+     * @param poolSize Thread pool size.
+     */
+    public MapQueryLazyExecutor(String igniteInstanceName, int poolSize) {
+        assert poolSize > 0;
+
+        exec = new IgniteThreadPoolExecutor(
+            "query-lazy",
+            igniteInstanceName,
+            poolSize,
+            poolSize,
+            DFLT_THREAD_KEEP_ALIVE_TIME,
+            new LinkedBlockingQueue<Runnable>(),
+            GridIoPolicy.SYSTEM_POOL);
+    }
+
+    /**
+     * Submit a command to a dedicated worker, or enqueue in case no workers are available.
+     *
+     * @param cmd Shared command to be executed.
+     */
+    public void submitShared(final Runnable cmd) {
+        Runnable r = new Runnable() {
+            @Override public void run() {
+                // Register thread in a shared map.
+                MapQueryLazyIgniteThread thread = (MapQueryLazyIgniteThread)Thread.currentThread();
+
+                long threadKey = thread.index();
+
+                threads.put(threadKey, thread);
+
+                try {
+                    cmd.run();
+                }
+                finally {
+                    threads.remove(threadKey);
+                }
+            }
+        };
+
+        exec.submit(r);
+    }
+
+    /**
+     * Submit a task for specific thread.
+     *
+     * @param idx Index.
+     * @param cmd Command to be executed.
+     */
+    public void submit(long idx, Runnable cmd) {
+        MapQueryLazyIgniteThread thread = threads.get(idx);
+
+        if (thread != null)
+            thread.submit(cmd);
+    }
+
+    /**
+     * Shutdown the cluster.
+     */
+    public void shutdown() {
+        exec.shutdown();
+
+        for (MapQueryLazyIgniteThread thread : threads.values())
+            thread.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/34fa8a08/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyIgniteThread.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyIgniteThread.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyIgniteThread.java
new file mode 100644
index 0000000..a207ef0
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyIgniteThread.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.lazy;
+
+import org.apache.ignite.thread.IgniteThread;
+
+/**
+ * Ignite thread for lazy map query execution.
+ */
+public class MapQueryLazyIgniteThread extends IgniteThread {
+    /** Thread index. */
+    private final long idx;
+
+    /**
+     * Constructor.
+     *
+     * @param igniteInstanceName Ignite instance name.
+     * @param threadName Thread name.
+     * @param r Runnable.
+     * @param idx Index.
+     */
+    public MapQueryLazyIgniteThread(String igniteInstanceName, String threadName, Runnable r, long idx) {
+        super(igniteInstanceName, threadName, r);
+
+        this.idx = idx;
+    }
+
+    /**
+     * @return Thread index.
+     */
+    public long index() {
+        return idx;
+    }
+
+    /**
+     * Submit new runnable.
+     *
+     * @param r Runnable to be executed.
+     */
+    public void submit(Runnable r) {
+        // TODO
+    }
+
+    /**
+     * Shutdown the thread.
+     */
+    public void shutdown() {
+        // TODO
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/34fa8a08/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyWorkerFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyWorkerFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyWorkerFactory.java
new file mode 100644
index 0000000..693853b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyWorkerFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.lazy;
+
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.thread.IgniteThreadFactory;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Factory for lazy mapper workers.
+ */
+public class MapQueryLazyWorkerFactory extends IgniteThreadFactory {
+    /** Thread ID generator. */
+    private static final AtomicLong THREAD_ID_GEN = new AtomicLong();
+
+    /**
+     * Constructor.
+     *
+     * @param igniteInstanceName Ignite instance name.
+     * @param threadName Thread name.
+     */
+    public MapQueryLazyWorkerFactory(String igniteInstanceName, String threadName) {
+        super(igniteInstanceName, threadName, GridIoPolicy.QUERY_POOL);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Thread newThread(@NotNull Runnable r) {
+        return new MapQueryLazyIgniteThread(igniteInstanceName, threadName, r, THREAD_ID_GEN.incrementAndGet());
+    }
+}