You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/22 11:15:19 UTC

[1/2] ignite git commit: IGNITE-4761: Fix ServiceProcessor hanging on node stop. This closes #1602.

Repository: ignite
Updated Branches:
  refs/heads/ignite-3477-master 16d653158 -> aaf23fd19


IGNITE-4761: Fix ServiceProcessor hanging on node stop. This closes #1602.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6cc1403f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6cc1403f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6cc1403f

Branch: refs/heads/ignite-3477-master
Commit: 6cc1403f9b2188b6566de13c0b83e99806b6b4db
Parents: 16d6531
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Mar 14 15:50:03 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Mar 22 12:25:11 2017 +0300

----------------------------------------------------------------------
 .../service/GridServiceProcessor.java           | 21 +++---
 .../GridServiceProcessorStopSelfTest.java       | 75 ++++++++++++++++++++
 2 files changed, 87 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6cc1403f/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 94cf6e0..cd433f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -311,8 +311,12 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
             if (ctx.isDaemon())
                 return;
 
-            if (!ctx.clientNode())
-                ctx.event().removeDiscoveryEventListener(topLsnr);
+        busyLock.block();
+
+        U.shutdownNow(GridServiceProcessor.class, depExe, log);
+
+        if (!ctx.clientNode())
+            ctx.event().removeDiscoveryEventListener(topLsnr);
 
             Collection<ServiceContextImpl> ctxs = new ArrayList<>();
 
@@ -348,8 +352,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                 }
             }
 
-            U.shutdownNow(GridServiceProcessor.class, depExe, log);
-
             Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
 
             cancelFutures(depFuts, err);
@@ -1417,7 +1419,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                 return;
 
             try {
-                depExe.execute(new BusyRunnable() {
+                depExe.execute(new DepRunnable() {
                     @Override public void run0() {
                         onSystemCacheUpdated(deps);
                     }
@@ -1614,7 +1616,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                 else
                     topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
 
-                depExe.execute(new BusyRunnable() {
+                depExe.execute(new DepRunnable() {
                     @Override public void run0() {
                         // In case the cache instance isn't tracked by DiscoveryManager anymore.
                         discoCache.updateAlives(ctx.discovery());
@@ -1819,12 +1821,15 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
     /**
      *
      */
-    private abstract class BusyRunnable implements Runnable {
+    private abstract class DepRunnable implements Runnable {
         /** {@inheritDoc} */
         @Override public void run() {
             if (!busyLock.enterBusy())
                 return;
 
+            // Won't block ServiceProcessor stopping process.
+            busyLock.leaveBusy();
+
             svcName.set(null);
 
             try {
@@ -1837,8 +1842,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                     throw t;
             }
             finally {
-                busyLock.leaveBusy();
-
                 svcName.set(null);
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6cc1403f/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
index 08595f7..03b00f4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
@@ -20,10 +20,16 @@ package org.apache.ignite.internal.processors.service;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteServices;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.services.Service;
@@ -91,6 +97,75 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testStopDuringHangedDeployment() throws Exception {
+        final CountDownLatch depLatch = new CountDownLatch(1);
+
+        final CountDownLatch finishLatch = new CountDownLatch(1);
+
+        final IgniteEx node0 = startGrid(0);
+        final IgniteEx node1 = startGrid(1);
+        final IgniteEx node2 = startGrid(2);
+
+        final IgniteCache<Object, Object> cache = node2.getOrCreateCache(new CacheConfiguration<Object, Object>("def")
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        node0.services().deployNodeSingleton("myService", new TestServiceImpl());
+
+        // Guarantee lock owner will never left topology unexpectedly.
+        final Integer lockKey = keyForNode(node2.affinity("def"), new AtomicInteger(1),
+            node2.cluster().localNode());
+
+        // Lock to hold topology version undone.
+        final Lock lock = cache.lock(lockKey);
+
+        // Try to change topology once service has deployed.
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                depLatch.await();
+
+                node1.close();
+
+                return null;
+            }
+        }, "top-change-thread");
+
+        // Stop node on unstable topology.
+        GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                depLatch.await();
+
+                Thread.sleep(1000);
+
+                node0.close();
+
+                finishLatch.countDown();
+
+                return null;
+            }
+        }, "stopping-node-thread");
+
+        assertNotNull(node0.services().service("myService"));
+
+        // Freeze topology changing
+        lock.lock();
+
+        depLatch.countDown();
+
+        boolean wait = finishLatch.await(15, TimeUnit.SECONDS);
+
+        if (!wait)
+            U.dumpThreads(log);
+
+        assertTrue("Deploy future isn't completed", wait);
+
+        fut.get();
+
+        Ignition.stopAll(true);
+    }
+
+    /**
      * Simple map service.
      */
     public interface TestService {


[2/2] ignite git commit: IGNITE-3477 - Fixing query parallelism

Posted by ag...@apache.org.
IGNITE-3477 - Fixing query parallelism


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aaf23fd1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aaf23fd1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aaf23fd1

Branch: refs/heads/ignite-3477-master
Commit: aaf23fd19b13ab630e01fcd3cbb26d625083db6f
Parents: 6cc1403
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Wed Mar 22 12:25:33 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Mar 22 12:25:33 2017 +0300

----------------------------------------------------------------------
 .../processors/query/GridQueryProcessor.java    |   2 +-
 .../processors/database/BPlusTreeSelfTest.java  |   4 +-
 .../processors/query/h2/IgniteH2Indexing.java   |   4 +-
 .../query/h2/database/H2TreeIndex.java          | 104 +++++++++++++------
 .../query/h2/opt/GridH2TreeIndex.java           |   2 +-
 5 files changed, 80 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf23fd1/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 3607a5b..3fdea50 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -1761,7 +1761,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param valCls value class
      * @param pathStr property name
      * @param resType property type
-     * @return
+     * @return Property initialization exception.
      */
     public static String propertyInitializationExceptionMessage(Class<?> keyCls, Class<?> valCls, String pathStr,
         Class<?> resType) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf23fd1/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 22bc17a..9a90c1d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -1203,7 +1203,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         final Map<Long,Long> map = new ConcurrentHashMap8<>();
 
-        final int loops = reuseList == null ? 200_000 : 400_000;
+        final int loops = reuseList == null ? 100_000 : 200_000;
 
         final GridStripedLock lock = new GridStripedLock(256);
 
@@ -1295,7 +1295,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
                 return null;
             }
-        }, 16, "put-remove");
+        }, Runtime.getRuntime().availableProcessors(), "put-remove");
 
         final AtomicBoolean stop = new AtomicBoolean();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf23fd1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index fa0437f..59f5c21 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -2978,7 +2978,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 if (log.isInfoEnabled())
                     log.info("Creating cache index [cacheId=" + cctx.cacheId() + ", idxName=" + name + ']');
 
-                return new H2TreeIndex(cctx, tbl, name, pk, cols, inlineSize);
+                final int segments = tbl.rowDescriptor().configuration().getQueryParallelism();
+
+                return new H2TreeIndex(cctx, tbl, name, pk, cols, inlineSize, segments);
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf23fd1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index 0087fab..0185fa5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -50,12 +50,13 @@ import org.jetbrains.annotations.Nullable;
 /**
  * H2 Index over {@link BPlusTree}.
  */
+@SuppressWarnings({"TypeMayBeWeakened", "unchecked"})
 public class H2TreeIndex extends GridH2IndexBase {
     /** Default value for {@code IGNITE_MAX_INDEX_PAYLOAD_SIZE} */
     public static final int IGNITE_MAX_INDEX_PAYLOAD_SIZE_DEFAULT = 10;
 
     /** */
-    private final H2Tree tree;
+    private final H2Tree[] segments;
 
     /** */
     private final List<InlineIndexHelper> inlineIdxs;
@@ -78,8 +79,11 @@ public class H2TreeIndex extends GridH2IndexBase {
         String name,
         boolean pk,
         List<IndexColumn> colsList,
-        int inlineSize
+        int inlineSize,
+        int segmentsCnt
     ) throws IgniteCheckedException {
+        assert segmentsCnt > 0 : segmentsCnt;
+
         this.cctx = cctx;
         IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]);
 
@@ -95,21 +99,35 @@ public class H2TreeIndex extends GridH2IndexBase {
         if (cctx.affinityNode()) {
             IgniteCacheDatabaseSharedManager dbMgr = cctx.shared().database();
 
-            RootPage page = getMetaPage(name);
-
             inlineIdxs = getAvailableInlineColumns(cols);
 
-            tree = new H2Tree(name, cctx.offheap().reuseListForIndex(name), cctx.cacheId(),
-                dbMgr.pageMemory(), cctx.shared().wal(), cctx.offheap().globalRemoveId(),
-                tbl.rowFactory(), page.pageId().pageId(), page.isAllocated(), cols, inlineIdxs, computeInlineSize(inlineIdxs, inlineSize)) {
-                @Override public int compareValues(Value v1, Value v2) {
-                    return v1 == v2 ? 0 : table.compareTypeSafe(v1, v2);
-                }
-            };
+            segments = new H2Tree[segmentsCnt];
+
+            for (int i = 0; i < segments.length; i++) {
+                RootPage page = getMetaPage(name, i);
+
+                segments[i] = new H2Tree(
+                    name,
+                    cctx.offheap().reuseListForIndex(name),
+                    cctx.cacheId(),
+                    dbMgr.pageMemory(),
+                    cctx.shared().wal(),
+                    cctx.offheap().globalRemoveId(),
+                    tbl.rowFactory(),
+                    page.pageId().pageId(),
+                    page.isAllocated(),
+                    cols,
+                    inlineIdxs,
+                    computeInlineSize(inlineIdxs, inlineSize)) {
+                    @Override public int compareValues(Value v1, Value v2) {
+                        return v1 == v2 ? 0 : table.compareTypeSafe(v1, v2);
+                    }
+                };
+            }
         }
         else {
             // We need indexes on the client node, but index will not contain any data.
-            tree = null;
+            segments = null;
             inlineIdxs = null;
         }
 
@@ -123,9 +141,7 @@ public class H2TreeIndex extends GridH2IndexBase {
     private List<InlineIndexHelper> getAvailableInlineColumns(IndexColumn[] cols) {
         List<InlineIndexHelper> res = new ArrayList<>();
 
-        for (int i = 0; i < cols.length; i++) {
-            IndexColumn col = cols[i];
-
+        for (IndexColumn col : cols) {
             if (!InlineIndexHelper.AVAILABLE_TYPES.contains(col.column.getType()))
                 break;
 
@@ -154,6 +170,10 @@ public class H2TreeIndex extends GridH2IndexBase {
                 p = f.forSpace(spaceName);
             }
 
+            int seg = threadLocalSegment();
+
+            H2Tree tree = treeForRead(seg);
+
             return new H2Cursor(tree.find(lower, upper), p);
         }
         catch (IgniteCheckedException e) {
@@ -164,6 +184,10 @@ public class H2TreeIndex extends GridH2IndexBase {
     /** {@inheritDoc} */
     @Override public GridH2Row findOne(GridH2Row row) {
         try {
+            int seg = threadLocalSegment();
+
+            H2Tree tree = treeForRead(seg);
+
             return tree.findOne(row);
         }
         catch (IgniteCheckedException e) {
@@ -176,6 +200,10 @@ public class H2TreeIndex extends GridH2IndexBase {
         try {
             InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
 
+            int seg = threadLocalSegment();
+
+            H2Tree tree = treeForRead(seg);
+
             return tree.put(row);
         }
         catch (IgniteCheckedException e) {
@@ -191,6 +219,10 @@ public class H2TreeIndex extends GridH2IndexBase {
         try {
             InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
 
+            int seg = threadLocalSegment();
+
+            H2Tree tree = treeForRead(seg);
+
             return tree.putx(row);
         }
         catch (IgniteCheckedException e) {
@@ -205,6 +237,11 @@ public class H2TreeIndex extends GridH2IndexBase {
     @Override public GridH2Row remove(SearchRow row) {
         try {
             InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
+
+            int seg = threadLocalSegment();
+
+            H2Tree tree = treeForRead(seg);
+
             return tree.remove(row);
         }
         catch (IgniteCheckedException e) {
@@ -219,6 +256,11 @@ public class H2TreeIndex extends GridH2IndexBase {
     @Override public void removex(SearchRow row) {
         try {
             InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
+
+            int seg = threadLocalSegment();
+
+            H2Tree tree = treeForRead(seg);
+
             tree.removex(row);
         }
         catch (IgniteCheckedException e) {
@@ -271,9 +313,11 @@ public class H2TreeIndex extends GridH2IndexBase {
     @Override public void destroy() {
         try {
             if (cctx.affinityNode()) {
-                tree.destroy();
+                for (H2Tree tree : segments) {
+                    tree.destroy();
 
-                cctx.offheap().dropRootPageForIndex(tree.getName());
+                    cctx.offheap().dropRootPageForIndex(tree.getName());
+                }
             }
         }
         catch (IgniteCheckedException e) {
@@ -286,17 +330,14 @@ public class H2TreeIndex extends GridH2IndexBase {
 
     /** {@inheritDoc} */
     @Nullable @Override protected IgniteTree<SearchRow, GridH2Row> doTakeSnapshot() {
-        return tree;
-    }
+        int seg = threadLocalSegment();
 
-    /** {@inheritDoc} */
-    protected IgniteTree<SearchRow, GridH2Row> treeForRead() {
-        return tree;
+        return treeForRead(seg);
     }
 
     /** {@inheritDoc} */
-    @Override protected <K, V> IgniteTree<K, V> treeForRead(int segment) {
-        return (IgniteTree<K, V>)tree;
+    @Override protected H2Tree treeForRead(int segment) {
+        return segments[segment];
     }
 
     /** {@inheritDoc} */
@@ -306,9 +347,11 @@ public class H2TreeIndex extends GridH2IndexBase {
         boolean includeFirst,
         @Nullable SearchRow last,
         IndexingQueryFilter filter) {
-        includeFirst &= first != null;
-
         try {
+            int seg = threadLocalSegment();
+
+            H2Tree tree = treeForRead(seg);
+
             GridCursor<GridH2Row> range = tree.find(first, last);
 
             if (range == null)
@@ -344,8 +387,7 @@ public class H2TreeIndex extends GridH2IndexBase {
 
             int size = 0;
 
-            for (int i = 0; i < inlineIdxs.size(); i++) {
-                InlineIndexHelper idxHelper = inlineIdxs.get(i);
+            for (InlineIndexHelper idxHelper : inlineIdxs) {
                 if (idxHelper.size() <= 0) {
                     size = propSize;
                     break;
@@ -363,9 +405,9 @@ public class H2TreeIndex extends GridH2IndexBase {
     /**
      * @param name Name.
      * @return RootPage for meta page.
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException If failed.
      */
-    private RootPage getMetaPage(String name) throws IgniteCheckedException {
-        return cctx.offheap().rootPageForIndex(name);
+    private RootPage getMetaPage(String name, int segIdx) throws IgniteCheckedException {
+        return cctx.offheap().rootPageForIndex(name + "%" + segIdx);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf23fd1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index ea0b64b..aaf8310 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -517,7 +517,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
                 while(cursor.next()) {
                     GridH2Row row = cursor.get();
 
-                    // Check for interruptions every 1000 iterations.
+                    // Check for interruptions every 1024 iterations.
                     if ((++j & 1024) == 0 && thread.isInterrupted())
                         throw new InterruptedException();