You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/22 11:15:19 UTC
[1/2] ignite git commit: IGNITE-4761: Fix ServiceProcessor hanging on
node stop. This closes #1602.
Repository: ignite
Updated Branches:
refs/heads/ignite-3477-master 16d653158 -> aaf23fd19
IGNITE-4761: Fix ServiceProcessor hanging on node stop. This closes #1602.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6cc1403f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6cc1403f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6cc1403f
Branch: refs/heads/ignite-3477-master
Commit: 6cc1403f9b2188b6566de13c0b83e99806b6b4db
Parents: 16d6531
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Tue Mar 14 15:50:03 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Mar 22 12:25:11 2017 +0300
----------------------------------------------------------------------
.../service/GridServiceProcessor.java | 21 +++---
.../GridServiceProcessorStopSelfTest.java | 75 ++++++++++++++++++++
2 files changed, 87 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6cc1403f/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 94cf6e0..cd433f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -311,8 +311,12 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
if (ctx.isDaemon())
return;
- if (!ctx.clientNode())
- ctx.event().removeDiscoveryEventListener(topLsnr);
+ busyLock.block();
+
+ U.shutdownNow(GridServiceProcessor.class, depExe, log);
+
+ if (!ctx.clientNode())
+ ctx.event().removeDiscoveryEventListener(topLsnr);
Collection<ServiceContextImpl> ctxs = new ArrayList<>();
@@ -348,8 +352,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
}
}
- U.shutdownNow(GridServiceProcessor.class, depExe, log);
-
Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
cancelFutures(depFuts, err);
@@ -1417,7 +1419,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
return;
try {
- depExe.execute(new BusyRunnable() {
+ depExe.execute(new DepRunnable() {
@Override public void run0() {
onSystemCacheUpdated(deps);
}
@@ -1614,7 +1616,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
else
topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
- depExe.execute(new BusyRunnable() {
+ depExe.execute(new DepRunnable() {
@Override public void run0() {
// In case the cache instance isn't tracked by DiscoveryManager anymore.
discoCache.updateAlives(ctx.discovery());
@@ -1819,12 +1821,15 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
/**
*
*/
- private abstract class BusyRunnable implements Runnable {
+ private abstract class DepRunnable implements Runnable {
/** {@inheritDoc} */
@Override public void run() {
if (!busyLock.enterBusy())
return;
+ // Won't block ServiceProcessor stopping process.
+ busyLock.leaveBusy();
+
svcName.set(null);
try {
@@ -1837,8 +1842,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
throw t;
}
finally {
- busyLock.leaveBusy();
-
svcName.set(null);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6cc1403f/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
index 08595f7..03b00f4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
@@ -20,10 +20,16 @@ package org.apache.ignite.internal.processors.service;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.services.Service;
@@ -91,6 +97,75 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testStopDuringHangedDeployment() throws Exception {
+ final CountDownLatch depLatch = new CountDownLatch(1);
+
+ final CountDownLatch finishLatch = new CountDownLatch(1);
+
+ final IgniteEx node0 = startGrid(0);
+ final IgniteEx node1 = startGrid(1);
+ final IgniteEx node2 = startGrid(2);
+
+ final IgniteCache<Object, Object> cache = node2.getOrCreateCache(new CacheConfiguration<Object, Object>("def")
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+ node0.services().deployNodeSingleton("myService", new TestServiceImpl());
+
+ // Guarantee lock owner will never left topology unexpectedly.
+ final Integer lockKey = keyForNode(node2.affinity("def"), new AtomicInteger(1),
+ node2.cluster().localNode());
+
+ // Lock to hold topology version undone.
+ final Lock lock = cache.lock(lockKey);
+
+ // Try to change topology once service has deployed.
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ depLatch.await();
+
+ node1.close();
+
+ return null;
+ }
+ }, "top-change-thread");
+
+ // Stop node on unstable topology.
+ GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ depLatch.await();
+
+ Thread.sleep(1000);
+
+ node0.close();
+
+ finishLatch.countDown();
+
+ return null;
+ }
+ }, "stopping-node-thread");
+
+ assertNotNull(node0.services().service("myService"));
+
+ // Freeze topology changing
+ lock.lock();
+
+ depLatch.countDown();
+
+ boolean wait = finishLatch.await(15, TimeUnit.SECONDS);
+
+ if (!wait)
+ U.dumpThreads(log);
+
+ assertTrue("Deploy future isn't completed", wait);
+
+ fut.get();
+
+ Ignition.stopAll(true);
+ }
+
+ /**
* Simple map service.
*/
public interface TestService {
[2/2] ignite git commit: IGNITE-3477 - Fixing query parallelism
Posted by ag...@apache.org.
IGNITE-3477 - Fixing query parallelism
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aaf23fd1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aaf23fd1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aaf23fd1
Branch: refs/heads/ignite-3477-master
Commit: aaf23fd19b13ab630e01fcd3cbb26d625083db6f
Parents: 6cc1403
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Wed Mar 22 12:25:33 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Mar 22 12:25:33 2017 +0300
----------------------------------------------------------------------
.../processors/query/GridQueryProcessor.java | 2 +-
.../processors/database/BPlusTreeSelfTest.java | 4 +-
.../processors/query/h2/IgniteH2Indexing.java | 4 +-
.../query/h2/database/H2TreeIndex.java | 104 +++++++++++++------
.../query/h2/opt/GridH2TreeIndex.java | 2 +-
5 files changed, 80 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf23fd1/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 3607a5b..3fdea50 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -1761,7 +1761,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param valCls value class
* @param pathStr property name
* @param resType property type
- * @return
+ * @return Property initialization exception.
*/
public static String propertyInitializationExceptionMessage(Class<?> keyCls, Class<?> valCls, String pathStr,
Class<?> resType) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf23fd1/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 22bc17a..9a90c1d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -1203,7 +1203,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
final Map<Long,Long> map = new ConcurrentHashMap8<>();
- final int loops = reuseList == null ? 200_000 : 400_000;
+ final int loops = reuseList == null ? 100_000 : 200_000;
final GridStripedLock lock = new GridStripedLock(256);
@@ -1295,7 +1295,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
return null;
}
- }, 16, "put-remove");
+ }, Runtime.getRuntime().availableProcessors(), "put-remove");
final AtomicBoolean stop = new AtomicBoolean();
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf23fd1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index fa0437f..59f5c21 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -2978,7 +2978,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (log.isInfoEnabled())
log.info("Creating cache index [cacheId=" + cctx.cacheId() + ", idxName=" + name + ']');
- return new H2TreeIndex(cctx, tbl, name, pk, cols, inlineSize);
+ final int segments = tbl.rowDescriptor().configuration().getQueryParallelism();
+
+ return new H2TreeIndex(cctx, tbl, name, pk, cols, inlineSize, segments);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf23fd1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index 0087fab..0185fa5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -50,12 +50,13 @@ import org.jetbrains.annotations.Nullable;
/**
* H2 Index over {@link BPlusTree}.
*/
+@SuppressWarnings({"TypeMayBeWeakened", "unchecked"})
public class H2TreeIndex extends GridH2IndexBase {
/** Default value for {@code IGNITE_MAX_INDEX_PAYLOAD_SIZE} */
public static final int IGNITE_MAX_INDEX_PAYLOAD_SIZE_DEFAULT = 10;
/** */
- private final H2Tree tree;
+ private final H2Tree[] segments;
/** */
private final List<InlineIndexHelper> inlineIdxs;
@@ -78,8 +79,11 @@ public class H2TreeIndex extends GridH2IndexBase {
String name,
boolean pk,
List<IndexColumn> colsList,
- int inlineSize
+ int inlineSize,
+ int segmentsCnt
) throws IgniteCheckedException {
+ assert segmentsCnt > 0 : segmentsCnt;
+
this.cctx = cctx;
IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]);
@@ -95,21 +99,35 @@ public class H2TreeIndex extends GridH2IndexBase {
if (cctx.affinityNode()) {
IgniteCacheDatabaseSharedManager dbMgr = cctx.shared().database();
- RootPage page = getMetaPage(name);
-
inlineIdxs = getAvailableInlineColumns(cols);
- tree = new H2Tree(name, cctx.offheap().reuseListForIndex(name), cctx.cacheId(),
- dbMgr.pageMemory(), cctx.shared().wal(), cctx.offheap().globalRemoveId(),
- tbl.rowFactory(), page.pageId().pageId(), page.isAllocated(), cols, inlineIdxs, computeInlineSize(inlineIdxs, inlineSize)) {
- @Override public int compareValues(Value v1, Value v2) {
- return v1 == v2 ? 0 : table.compareTypeSafe(v1, v2);
- }
- };
+ segments = new H2Tree[segmentsCnt];
+
+ for (int i = 0; i < segments.length; i++) {
+ RootPage page = getMetaPage(name, i);
+
+ segments[i] = new H2Tree(
+ name,
+ cctx.offheap().reuseListForIndex(name),
+ cctx.cacheId(),
+ dbMgr.pageMemory(),
+ cctx.shared().wal(),
+ cctx.offheap().globalRemoveId(),
+ tbl.rowFactory(),
+ page.pageId().pageId(),
+ page.isAllocated(),
+ cols,
+ inlineIdxs,
+ computeInlineSize(inlineIdxs, inlineSize)) {
+ @Override public int compareValues(Value v1, Value v2) {
+ return v1 == v2 ? 0 : table.compareTypeSafe(v1, v2);
+ }
+ };
+ }
}
else {
// We need indexes on the client node, but index will not contain any data.
- tree = null;
+ segments = null;
inlineIdxs = null;
}
@@ -123,9 +141,7 @@ public class H2TreeIndex extends GridH2IndexBase {
private List<InlineIndexHelper> getAvailableInlineColumns(IndexColumn[] cols) {
List<InlineIndexHelper> res = new ArrayList<>();
- for (int i = 0; i < cols.length; i++) {
- IndexColumn col = cols[i];
-
+ for (IndexColumn col : cols) {
if (!InlineIndexHelper.AVAILABLE_TYPES.contains(col.column.getType()))
break;
@@ -154,6 +170,10 @@ public class H2TreeIndex extends GridH2IndexBase {
p = f.forSpace(spaceName);
}
+ int seg = threadLocalSegment();
+
+ H2Tree tree = treeForRead(seg);
+
return new H2Cursor(tree.find(lower, upper), p);
}
catch (IgniteCheckedException e) {
@@ -164,6 +184,10 @@ public class H2TreeIndex extends GridH2IndexBase {
/** {@inheritDoc} */
@Override public GridH2Row findOne(GridH2Row row) {
try {
+ int seg = threadLocalSegment();
+
+ H2Tree tree = treeForRead(seg);
+
return tree.findOne(row);
}
catch (IgniteCheckedException e) {
@@ -176,6 +200,10 @@ public class H2TreeIndex extends GridH2IndexBase {
try {
InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
+ int seg = threadLocalSegment();
+
+ H2Tree tree = treeForRead(seg);
+
return tree.put(row);
}
catch (IgniteCheckedException e) {
@@ -191,6 +219,10 @@ public class H2TreeIndex extends GridH2IndexBase {
try {
InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
+ int seg = threadLocalSegment();
+
+ H2Tree tree = treeForRead(seg);
+
return tree.putx(row);
}
catch (IgniteCheckedException e) {
@@ -205,6 +237,11 @@ public class H2TreeIndex extends GridH2IndexBase {
@Override public GridH2Row remove(SearchRow row) {
try {
InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
+
+ int seg = threadLocalSegment();
+
+ H2Tree tree = treeForRead(seg);
+
return tree.remove(row);
}
catch (IgniteCheckedException e) {
@@ -219,6 +256,11 @@ public class H2TreeIndex extends GridH2IndexBase {
@Override public void removex(SearchRow row) {
try {
InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
+
+ int seg = threadLocalSegment();
+
+ H2Tree tree = treeForRead(seg);
+
tree.removex(row);
}
catch (IgniteCheckedException e) {
@@ -271,9 +313,11 @@ public class H2TreeIndex extends GridH2IndexBase {
@Override public void destroy() {
try {
if (cctx.affinityNode()) {
- tree.destroy();
+ for (H2Tree tree : segments) {
+ tree.destroy();
- cctx.offheap().dropRootPageForIndex(tree.getName());
+ cctx.offheap().dropRootPageForIndex(tree.getName());
+ }
}
}
catch (IgniteCheckedException e) {
@@ -286,17 +330,14 @@ public class H2TreeIndex extends GridH2IndexBase {
/** {@inheritDoc} */
@Nullable @Override protected IgniteTree<SearchRow, GridH2Row> doTakeSnapshot() {
- return tree;
- }
+ int seg = threadLocalSegment();
- /** {@inheritDoc} */
- protected IgniteTree<SearchRow, GridH2Row> treeForRead() {
- return tree;
+ return treeForRead(seg);
}
/** {@inheritDoc} */
- @Override protected <K, V> IgniteTree<K, V> treeForRead(int segment) {
- return (IgniteTree<K, V>)tree;
+ @Override protected H2Tree treeForRead(int segment) {
+ return segments[segment];
}
/** {@inheritDoc} */
@@ -306,9 +347,11 @@ public class H2TreeIndex extends GridH2IndexBase {
boolean includeFirst,
@Nullable SearchRow last,
IndexingQueryFilter filter) {
- includeFirst &= first != null;
-
try {
+ int seg = threadLocalSegment();
+
+ H2Tree tree = treeForRead(seg);
+
GridCursor<GridH2Row> range = tree.find(first, last);
if (range == null)
@@ -344,8 +387,7 @@ public class H2TreeIndex extends GridH2IndexBase {
int size = 0;
- for (int i = 0; i < inlineIdxs.size(); i++) {
- InlineIndexHelper idxHelper = inlineIdxs.get(i);
+ for (InlineIndexHelper idxHelper : inlineIdxs) {
if (idxHelper.size() <= 0) {
size = propSize;
break;
@@ -363,9 +405,9 @@ public class H2TreeIndex extends GridH2IndexBase {
/**
* @param name Name.
* @return RootPage for meta page.
- * @throws IgniteCheckedException
+ * @throws IgniteCheckedException If failed.
*/
- private RootPage getMetaPage(String name) throws IgniteCheckedException {
- return cctx.offheap().rootPageForIndex(name);
+ private RootPage getMetaPage(String name, int segIdx) throws IgniteCheckedException {
+ return cctx.offheap().rootPageForIndex(name + "%" + segIdx);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf23fd1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index ea0b64b..aaf8310 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -517,7 +517,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
while(cursor.next()) {
GridH2Row row = cursor.get();
- // Check for interruptions every 1000 iterations.
+ // Check for interruptions every 1024 iterations.
if ((++j & 1024) == 0 && thread.isInterrupted())
throw new InterruptedException();