You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/28 09:28:31 UTC
ignite git commit: ignite-80 Fixed dynamic cache start future
completion
Repository: ignite
Updated Branches:
refs/heads/ignite-80-2 [created] 5a110a1d7
ignite-80 Fixed dynamic cache start future completion
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a110a1d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a110a1d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a110a1d
Branch: refs/heads/ignite-80-2
Commit: 5a110a1d7e57952d82089022d78842a8fcb692b2
Parents: dc379fe
Author: sboikov <sb...@gridgain.com>
Authored: Mon Sep 28 10:17:31 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Sep 28 10:17:31 2015 +0300
----------------------------------------------------------------------
.../managers/discovery/CustomEventListener.java | 4 +++-
.../discovery/GridDiscoveryManager.java | 2 +-
.../cache/DynamicCacheChangeRequest.java | 19 +++++++++++++++++++
.../cache/DynamicCacheDescriptor.java | 19 +++++++++++++++++++
.../GridCachePartitionExchangeManager.java | 20 +++++++++++++++++---
.../processors/cache/GridCacheProcessor.java | 15 +++++++++++----
.../continuous/GridContinuousProcessor.java | 17 +++++++++++++----
.../datastructures/DataStructuresProcessor.java | 2 ++
...omicOffheapQueueCreateMultiNodeSelfTest.java | 5 -----
...ionedAtomicQueueCreateMultiNodeSelfTest.java | 5 -----
...PartitionedQueueCreateMultiNodeSelfTest.java | 16 ++++++++++------
11 files changed, 95 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a110a1d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
index ab143fb..8db4e67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.managers.discovery;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
/**
* Listener interface.
@@ -26,6 +27,7 @@ public interface CustomEventListener<T extends DiscoveryCustomMessage> {
/**
* @param snd Sender.
* @param msg Message.
+ * @param topVer Current topology version.
*/
- public void onCustomEvent(ClusterNode snd, T msg);
+ public void onCustomEvent(ClusterNode snd, T msg, AffinityTopologyVersion topVer);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a110a1d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 3a09b2c..785613d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -527,7 +527,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (list != null) {
for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) {
try {
- lsnr.onCustomEvent(node, customMsg);
+ lsnr.onCustomEvent(node, customMsg, nextTopVer);
}
catch (Exception e) {
U.error(log, "Failed to notify direct custom event listener: " + customMsg, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a110a1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 583e346..b23be41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -21,9 +21,11 @@ import java.io.Serializable;
import java.util.UUID;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
/**
* Cache start/stop request.
@@ -69,6 +71,9 @@ public class DynamicCacheChangeRequest implements Serializable {
/** */
private transient boolean exchangeNeeded;
+ /** */
+ private transient AffinityTopologyVersion cacheFutTopVer;
+
/**
* Constructor creates cache stop request.
*
@@ -88,6 +93,20 @@ public class DynamicCacheChangeRequest implements Serializable {
}
/**
+ * @param cacheFutTopVer Ready topology version when dynamic cache future should be completed.
+ */
+ public void cacheFutureTopologyVersion(AffinityTopologyVersion cacheFutTopVer) {
+ this.cacheFutTopVer = cacheFutTopVer;
+ }
+
+ /**
+ * @return Ready topology version when dynamic cache future should be completed.
+ */
+ @Nullable public AffinityTopologyVersion cacheFutureTopologyVersion() {
+ return cacheFutTopVer;
+ }
+
+ /**
* @param exchangeNeeded {@code True} if request should trigger partition exchange.
*/
public void exchangeNeeded(boolean exchangeNeeded) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a110a1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 3cfc34e..24df7e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -22,11 +22,13 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
/**
* Cache start descriptor.
@@ -63,6 +65,9 @@ public class DynamicCacheDescriptor {
/** */
private boolean updatesAllowed = true;
+ /** */
+ private AffinityTopologyVersion startTopVer;
+
/**
* @param ctx Context.
* @param cacheCfg Cache configuration.
@@ -84,6 +89,20 @@ public class DynamicCacheDescriptor {
}
/**
+ * @return Start topology version.
+ */
+ @Nullable public AffinityTopologyVersion startTopologyVersion() {
+ return startTopVer;
+ }
+
+ /**
+ * @param startTopVer Start topology version.
+ */
+ public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
+ this.startTopVer = startTopVer;
+ }
+
+ /**
* @return {@code True} if this is template configuration.
*/
public boolean template() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a110a1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index eb76233..3e77e0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -199,11 +199,25 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
// Validate requests to check if event should trigger partition exchange.
- for (DynamicCacheChangeRequest req : batch.requests()) {
+ for (final DynamicCacheChangeRequest req : batch.requests()) {
if (req.exchangeNeeded())
valid.add(req);
- else
- cctx.cache().completeStartFuture(req);
+ else {
+ IgniteInternalFuture<?> fut = null;
+
+ if (req.cacheFutureTopologyVersion() != null)
+ fut = affinityReadyFuture(req.cacheFutureTopologyVersion());
+
+ if (fut == null || fut.isDone())
+ cctx.cache().completeStartFuture(req);
+ else {
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ cctx.cache().completeStartFuture(req);
+ }
+ });
+ }
+ }
}
if (!F.isEmpty(valid)) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a110a1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 74124bf..c86dfd9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -115,7 +115,6 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
-import org.apache.ignite.internal.portable.api.PortableMarshaller;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.jetbrains.annotations.Nullable;
@@ -615,8 +614,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class,
new CustomEventListener<DynamicCacheChangeBatch>() {
- @Override public void onCustomEvent(ClusterNode snd, DynamicCacheChangeBatch msg) {
- onCacheChangeRequested(msg);
+ @Override public void onCustomEvent(ClusterNode snd,
+ DynamicCacheChangeBatch msg,
+ AffinityTopologyVersion topVer) {
+ onCacheChangeRequested(msg, topVer);
}
});
@@ -2363,8 +2364,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Callback invoked from discovery thread when cache deployment request is received.
*
* @param batch Change request batch.
+ * @param topVer Current topology version.
*/
- private void onCacheChangeRequested(DynamicCacheChangeBatch batch) {
+ private void onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) {
for (DynamicCacheChangeRequest req : batch.requests()) {
if (req.template()) {
CacheConfiguration ccfg = req.startCacheConfiguration();
@@ -2421,6 +2423,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheDescriptor startDesc =
new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
+ startDesc.startTopologyVersion(topVer);
+
DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc);
assert old == null :
@@ -2469,6 +2473,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
}
+
+ if (!needExchange && desc != null)
+ req.cacheFutureTopologyVersion(desc.startTopologyVersion());
}
else {
assert req.stop() ^ req.close() : req;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a110a1d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index e29bdd4..d1cb3a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.managers.discovery.CustomEventListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -186,7 +187,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessage.class,
new CustomEventListener<StartRoutineDiscoveryMessage>() {
- @Override public void onCustomEvent(ClusterNode snd, StartRoutineDiscoveryMessage msg) {
+ @Override public void onCustomEvent(ClusterNode snd,
+ StartRoutineDiscoveryMessage msg,
+ AffinityTopologyVersion topVer) {
if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping())
processStartRequest(snd, msg);
}
@@ -194,7 +197,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class,
new CustomEventListener<StartRoutineAckDiscoveryMessage>() {
- @Override public void onCustomEvent(ClusterNode snd, StartRoutineAckDiscoveryMessage msg) {
+ @Override public void onCustomEvent(ClusterNode snd,
+ StartRoutineAckDiscoveryMessage msg,
+ AffinityTopologyVersion topVer) {
StartFuture fut = startFuts.remove(msg.routineId());
if (fut != null) {
@@ -213,7 +218,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.discovery().setCustomEventListener(StopRoutineDiscoveryMessage.class,
new CustomEventListener<StopRoutineDiscoveryMessage>() {
- @Override public void onCustomEvent(ClusterNode snd, StopRoutineDiscoveryMessage msg) {
+ @Override public void onCustomEvent(ClusterNode snd,
+ StopRoutineDiscoveryMessage msg,
+ AffinityTopologyVersion topVer) {
if (!snd.id().equals(ctx.localNodeId())) {
UUID routineId = msg.routineId();
@@ -231,7 +238,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.discovery().setCustomEventListener(StopRoutineAckDiscoveryMessage.class,
new CustomEventListener<StopRoutineAckDiscoveryMessage>() {
- @Override public void onCustomEvent(ClusterNode snd, StopRoutineAckDiscoveryMessage msg) {
+ @Override public void onCustomEvent(ClusterNode snd,
+ StopRoutineAckDiscoveryMessage msg,
+ AffinityTopologyVersion topVer) {
StopFuture fut = stopFuts.remove(msg.routineId());
if (fut != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a110a1d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index a5561e9..17f5f8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -897,6 +897,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
if (ctx.cache().cache(cacheName) == null)
ctx.cache().dynamicStartCache(newCfg, cacheName, null, CacheType.INTERNAL, false, true).get();
+ assert ctx.cache().cache(cacheName) != null : cacheName;
+
return cacheName;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a110a1d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicOffheapQueueCreateMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicOffheapQueueCreateMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicOffheapQueueCreateMultiNodeSelfTest.java
index 7a31363..49d5092 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicOffheapQueueCreateMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicOffheapQueueCreateMultiNodeSelfTest.java
@@ -26,11 +26,6 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
public class GridCachePartitionedAtomicOffheapQueueCreateMultiNodeSelfTest
extends GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest {
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-80");
- }
-
- /** {@inheritDoc} */
@Override protected CacheMemoryMode collectionMemoryMode() {
return OFFHEAP_TIERED;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a110a1d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest.java
index e334ad6..caeb9b6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest.java
@@ -29,11 +29,6 @@ import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
public class GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest
extends GridCachePartitionedQueueCreateMultiNodeSelfTest {
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-80");
- }
-
- /** {@inheritDoc} */
@Override protected CacheMemoryMode collectionMemoryMode() {
return ONHEAP_TIERED;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a110a1d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java
index 181682f..2146fc1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.datastructures.IgniteCollectionAbstractTest;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -48,11 +49,6 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
*/
public class GridCachePartitionedQueueCreateMultiNodeSelfTest extends IgniteCollectionAbstractTest {
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-80");
- }
-
- /** {@inheritDoc} */
@Override protected int gridCount() {
return 1;
}
@@ -127,7 +123,7 @@ public class GridCachePartitionedQueueCreateMultiNodeSelfTest extends IgniteColl
Thread.currentThread().setName("createQueue-" + idx0);
- Ignite ignite = startGrid(idx0);
+ final Ignite ignite = startGrid(idx0);
UUID locNodeId = ignite.cluster().localNode().id();
@@ -135,6 +131,14 @@ public class GridCachePartitionedQueueCreateMultiNodeSelfTest extends IgniteColl
info("Creating queue: " + locNodeId);
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ignite.queue("queue", 1, config(true));
+
+ return null;
+ }
+ }, 10, "create-queue-" + ignite.name());
+
IgniteQueue<String> q = ignite.queue("queue", 1, config(true));
assert q != null;