You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/11 14:18:54 UTC
[05/17] ignite git commit: IGNITE-500
CacheLoadingConcurrentGridStartSelfTest fails (DataStreamer data loss at
unstable topology in !allowOverwrite mode fixed)
IGNITE-500 CacheLoadingConcurrentGridStartSelfTest fails (DataStreamer data loss at unstable topology in !allowOverwrite mode fixed)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7499828
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7499828
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7499828
Branch: refs/heads/ignite-4154-opt2
Commit: b7499828c928e02e8e554f960f3754e4d08bfbe0
Parents: 8b59f4e
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Nov 10 16:10:21 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Nov 10 16:10:21 2016 +0300
----------------------------------------------------------------------
.../org/apache/ignite/IgniteDataStreamer.java | 2 +-
.../processors/cache/GridCacheMapEntry.java | 5 +-
.../processors/cache/GridCacheMvccManager.java | 77 +++
.../GridCachePartitionExchangeManager.java | 5 +
.../cache/GridCacheSharedContext.java | 1 +
.../datastreamer/DataStreamProcessor.java | 104 +++-
.../datastreamer/DataStreamerImpl.java | 603 ++++++++++++++-----
.../ignite/internal/util/GridLogThrottle.java | 29 +-
.../cache/IgniteCacheDynamicStopSelfTest.java | 48 +-
...CacheLoadingConcurrentGridStartSelfTest.java | 251 +++++++-
...ncurrentGridStartSelfTestAllowOverwrite.java | 30 +
.../DataStreamProcessorSelfTest.java | 4 +-
.../datastreamer/DataStreamerImplSelfTest.java | 170 ++++--
.../DataStreamerMultiThreadedSelfTest.java | 2 -
.../datastreamer/DataStreamerTimeoutTest.java | 92 ++-
.../testsuites/IgniteCacheTestSuite2.java | 2 +
16 files changed, 1120 insertions(+), 305 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index 484fee9..4e00d66 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -291,7 +291,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
* @throws IllegalStateException If grid has been concurrently stopped or
* {@link #close(boolean)} has already been called on streamer.
*/
- public IgniteFuture<?> removeData(K key) throws CacheException, IgniteInterruptedException, IllegalStateException;
+ public IgniteFuture<?> removeData(K key) throws CacheException, IgniteInterruptedException, IllegalStateException;
/**
* Adds data for streaming on remote node. This method can be called from multiple
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 5996672..950153f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -3444,11 +3444,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (val == null) {
skipQryNtf = true;
- if (cctx.deferredDelete() && !isInternal()) {
- assert !deletedUnlocked();
-
+ if (cctx.deferredDelete() && !deletedUnlocked() && !isInternal())
deletedUnlocked(true);
- }
}
else if (deletedUnlocked())
deletedUnlocked(false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index c4db01e..c57e17c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridConcurrentFactory;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -108,6 +109,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts =
new ConcurrentHashMap8<>();
+ /** Pending data streamer futures. */
+ private final GridConcurrentHashSet<DataStreamerFuture> dataStreamerFuts = new GridConcurrentHashSet<>();
+
/** */
private final ConcurrentMap<IgniteUuid, GridCacheFuture<?>> futs = new ConcurrentHashMap8<>();
@@ -446,6 +450,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @return Collection of pending data streamer futures.
+ */
+ public Collection<DataStreamerFuture> dataStreamerFutures() {
+ return dataStreamerFuts;
+ }
+
+ /**
* Gets future by given future ID.
*
* @param futVer Future ID.
@@ -476,6 +487,21 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param topVer Topology version.
+ */
+ public GridFutureAdapter addDataStreamerFuture(AffinityTopologyVersion topVer) {
+ final DataStreamerFuture fut = new DataStreamerFuture(topVer);
+
+ boolean add = dataStreamerFuts.add(fut);
+
+ assert add;
+
+ return fut;
+ }
+
+ /**
+
+ /**
* Adds future.
*
* @param fut Future.
@@ -1056,6 +1082,22 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ *
+ * @return Finish update future.
+ */
+ @SuppressWarnings("unchecked")
+ public IgniteInternalFuture<?> finishDataStreamerUpdates() {
+ GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>();
+
+ for (IgniteInternalFuture fut : dataStreamerFuts)
+ res.add(fut);
+
+ res.markInitialized();
+
+ return res;
+ }
+
+ /**
* @param keys Key for which locks should be released.
* @param cacheId Cache ID.
* @param topVer Topology version.
@@ -1294,4 +1336,39 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
CachePartialUpdateCheckedException.class.isAssignableFrom(cls);
}
}
+
+ /**
+ *
+ */
+ private class DataStreamerFuture extends GridFutureAdapter<Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Topology version. Instance field for toString method only. */
+ @GridToStringInclude
+ private final AffinityTopologyVersion topVer;
+
+ /**
+ * @param topVer Topology version.
+ */
+ DataStreamerFuture(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ dataStreamerFuts.remove(this);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DataStreamerFuture.class, this, super.toString());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a901e2a..00d2d16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1309,6 +1309,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (GridCacheFuture<?> fut : mvcc.atomicFutures())
U.warn(log, ">>> " + fut);
+ U.warn(log, "Pending data streamer futures:");
+
+ for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures())
+ U.warn(log, ">>> " + fut);
+
if (tm != null) {
U.warn(log, "Pending transaction deadlock detection futures:");
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 8f39235..117a5c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -636,6 +636,7 @@ public class GridCacheSharedContext<K, V> {
f.add(mvcc().finishExplicitLocks(topVer));
f.add(tm().finishTxs(topVer));
f.add(mvcc().finishAtomicUpdates(topVer));
+ f.add(mvcc().finishDataStreamerUpdates());
f.markInitialized();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 7663735..32fda87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.DelayQueue;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -29,13 +30,18 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.stream.StreamReceiver;
import org.apache.ignite.thread.IgniteThread;
@@ -288,32 +294,94 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
return;
}
- Collection<DataStreamerEntry> col = req.entries();
+ localUpdate(nodeId, req, updater, topic);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
- DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx,
- log,
- req.cacheName(),
- col,
- req.ignoreDeploymentOwnership(),
- req.skipStore(),
- req.keepBinary(),
- updater);
+ /**
+ * @param nodeId Node id.
+ * @param req Request.
+ * @param updater Updater.
+ * @param topic Topic.
+ */
+ private void localUpdate(final UUID nodeId,
+ final DataStreamerRequest req,
+ final StreamReceiver<K, V> updater,
+ final Object topic) {
+ final boolean allowOverwrite = !(updater instanceof DataStreamerImpl.IsolatedUpdater);
- Exception err = null;
+ try {
+ GridCacheAdapter cache = ctx.cache().internalCache(req.cacheName());
+
+ if (cache == null)
+ throw new IgniteCheckedException("Cache not created or already destroyed.");
+
+ GridCacheContext cctx = cache.context();
+
+ DataStreamerUpdateJob job = null;
+
+ GridFutureAdapter waitFut = null;
+
+ if (!allowOverwrite)
+ cctx.topology().readLock();
try {
- job.call();
- }
- catch (Exception e) {
- U.error(log, "Failed to finish update job.", e);
+ GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
- err = e;
+ AffinityTopologyVersion topVer = fut.topologyVersion();
+
+ if (!allowOverwrite && !topVer.equals(req.topologyVersion())) {
+ Exception err = new IgniteCheckedException(
+ "DataStreamer will retry data transfer at stable topology " +
+ "[reqTop=" + req.topologyVersion() + ", topVer=" + topVer + ", node=remote]");
+
+ sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
+ }
+ else if (allowOverwrite || fut.isDone()) {
+ job = new DataStreamerUpdateJob(ctx,
+ log,
+ req.cacheName(),
+ req.entries(),
+ req.ignoreDeploymentOwnership(),
+ req.skipStore(),
+ req.keepBinary(),
+ updater);
+
+ waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(topVer);
+ }
+ else {
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
+ localUpdate(nodeId, req, updater, topic);
+ }
+ });
+ }
+ }
+ finally {
+ if (!allowOverwrite)
+ cctx.topology().readUnlock();
}
- sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
+ if (job != null) {
+ try {
+ job.call();
+
+ sendResponse(nodeId, topic, req.requestId(), null, req.forceLocalDeployment());
+ }
+ finally {
+ if (waitFut != null)
+ waitFut.onDone();
+ }
+ }
}
- finally {
- busyLock.leaveBusy();
+ catch (Throwable e) {
+ sendResponse(nodeId, topic, req.requestId(), e, req.forceLocalDeployment());
+
+ if (e instanceof Error)
+ throw (Error)e;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index a6065dd..443783b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
@@ -39,15 +40,15 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
import javax.cache.expiry.ExpiryPolicy;
-
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteDataStreamerTimeoutException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteDataStreamerTimeoutException;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -72,11 +73,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheGateway;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IgniteCacheFutureImpl;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.dr.GridDrType;
@@ -92,6 +95,8 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.GPC;
+import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
@@ -102,6 +107,7 @@ import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.stream.StreamReceiver;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.LongAdder8;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -113,12 +119,15 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB
*/
@SuppressWarnings("unchecked")
public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
- /** Default policy reoslver. */
+ /** Default policy resolver. */
private static final DefaultIoPolicyResolver DFLT_IO_PLC_RSLVR = new DefaultIoPolicyResolver();
/** Isolated receiver. */
private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater();
+ /** Amount of permissions should be available to continue new data processing. */
+ private static final int REMAP_SEMAPHORE_PERMISSIONS_COUNT = Integer.MAX_VALUE;
+
/** Cache receiver. */
private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;
@@ -178,6 +187,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** {@code True} if data loader has been cancelled. */
private volatile boolean cancelled;
+ /** Fail counter. */
+ private final LongAdder8 failCntr = new LongAdder8();
+
/** Active futures of this data loader. */
@GridToStringInclude
private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>();
@@ -189,6 +201,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
boolean rmv = activeFuts.remove(t);
assert rmv;
+
+ Throwable err = t.error();
+
+ if (err != null && !(err instanceof IgniteClientDisconnectedCheckedException)) {
+ LT.error(log, t.error(), "DataStreamer operation failed.", true);
+
+ failCntr.increment();
+
+ cancelled = true;
+ }
}
};
@@ -231,6 +253,15 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** Whether a warning at {@link DataStreamerImpl#allowOverwrite()} printed */
private static boolean isWarningPrinted;
+ /** Allows to pause new data processing while failed data processing in progress. */
+ private final Semaphore remapSem = new Semaphore(REMAP_SEMAPHORE_PERMISSIONS_COUNT);
+
+ /** */
+ private final ConcurrentLinkedDeque<Runnable> dataToRemap = new ConcurrentLinkedDeque<>();
+
+ /** */
+ private final AtomicBoolean remapOwning = new AtomicBoolean();
+
/**
* @param ctx Grid kernal context.
* @param cacheName Cache name.
@@ -301,7 +332,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
Buffer buf = bufMappings.get(nodeId);
if (buf != null)
- buf.onResponse(res);
+ buf.onResponse(res, nodeId);
else if (log.isDebugEnabled())
log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
@@ -314,6 +345,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
fut = new DataStreamerFuture(this);
publicFut = new IgniteCacheFutureImpl<>(fut);
+
+ GridCacheAdapter cache = ctx.cache().internalCache(cacheName);
+
+ if (cache == null) { // Possible, cache is not configured on node.
+ assert ccfg != null;
+
+ if (ccfg.getCacheMode() == CacheMode.LOCAL)
+ throw new CacheException("Impossible to load Local cache configured remotely.");
+
+ ctx.grid().getOrCreateCache(ccfg);
+ }
}
/**
@@ -358,6 +400,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
throw new IllegalStateException("Data streamer has been closed.");
}
+ else if (cancelled) {
+ busyLock.leaveBusy();
+
+ throw new IllegalStateException("Data streamer has been closed.");
+ }
}
/**
@@ -633,6 +680,37 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
/**
+ *
+ */
+ private void acquireRemapSemaphore() throws IgniteInterruptedCheckedException {
+ try {
+ if (remapSem.availablePermits() != REMAP_SEMAPHORE_PERMISSIONS_COUNT) {
+ if (timeout == DFLT_UNLIMIT_TIMEOUT) {
+ // Wait until failed data being processed.
+ remapSem.acquire(REMAP_SEMAPHORE_PERMISSIONS_COUNT);
+
+ remapSem.release(REMAP_SEMAPHORE_PERMISSIONS_COUNT);
+ }
+ else {
+ // Wait until failed data being processed.
+ boolean res = remapSem.tryAcquire(REMAP_SEMAPHORE_PERMISSIONS_COUNT, timeout, TimeUnit.MILLISECONDS);
+
+ if (res)
+ remapSem.release(REMAP_SEMAPHORE_PERMISSIONS_COUNT);
+ else
+ throw new IgniteDataStreamerTimeoutException("Data streamer exceeded timeout " +
+ "while was waiting for failed data resending finished.");
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedCheckedException(e);
+ }
+ }
+
+ /**
* @param entries Entries.
* @param resFut Result future.
* @param activeKeys Active keys.
@@ -644,170 +722,266 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
@Nullable final Collection<KeyCacheObjectWrapper> activeKeys,
final int remaps
) {
- assert entries != null;
+ try {
+ assert entries != null;
- if (!isWarningPrinted) {
- synchronized (this) {
- if (!allowOverwrite() && !isWarningPrinted) {
- U.warn(log, "Data streamer will not overwrite existing cache entries for better performance " +
- "(to change, set allowOverwrite to true)");
- }
+ final boolean remap = remaps > 0;
- isWarningPrinted = true;
+ if (!remap) { // Failed data should be processed prior to new data.
+ acquireRemapSemaphore();
}
- }
- Map<ClusterNode, Collection<DataStreamerEntry>> mappings = new HashMap<>();
+ if (!isWarningPrinted) {
+ synchronized (this) {
+ if (!allowOverwrite() && !isWarningPrinted) {
+ U.warn(log, "Data streamer will not overwrite existing cache entries for better performance " +
+ "(to change, set allowOverwrite to true)");
+ }
- boolean initPda = ctx.deploy().enabled() && jobPda == null;
+ isWarningPrinted = true;
+ }
+ }
- AffinityTopologyVersion topVer = ctx.cache().context().exchange().readyAffinityVersion();
+ Map<ClusterNode, Collection<DataStreamerEntry>> mappings = new HashMap<>();
- for (DataStreamerEntry entry : entries) {
- List<ClusterNode> nodes;
+ boolean initPda = ctx.deploy().enabled() && jobPda == null;
- try {
- KeyCacheObject key = entry.getKey();
+ GridCacheAdapter cache = ctx.cache().internalCache(cacheName);
- assert key != null;
+ if (cache == null)
+ throw new IgniteCheckedException("Cache not created or already destroyed.");
- if (initPda) {
- if (cacheObjCtx.addDeploymentInfo())
- jobPda = new DataStreamerPda(key.value(cacheObjCtx, false),
- entry.getValue() != null ? entry.getValue().value(cacheObjCtx, false) : null,
- rcvr);
- else if (rcvr != null)
- jobPda = new DataStreamerPda(rcvr);
+ GridCacheContext cctx = cache.context();
- initPda = false;
- }
+ GridCacheGateway gate = null;
- nodes = nodes(key, topVer);
- }
- catch (IgniteCheckedException e) {
- resFut.onDone(e);
+ if (!allowOverwrite() && !cctx.isLocal()) { // Cases where cctx required.
+ gate = cctx.gate();
- return;
+ gate.enter();
}
- if (F.isEmpty(nodes)) {
- resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
- "(no nodes with cache found in topology) [infos=" + entries.size() +
- ", cacheName=" + cacheName + ']'));
-
- return;
- }
+ try {
+ AffinityTopologyVersion topVer = allowOverwrite() || cctx.isLocal() ?
+ ctx.cache().context().exchange().readyAffinityVersion() :
+ cctx.topology().topologyVersion();
- for (ClusterNode node : nodes) {
- Collection<DataStreamerEntry> col = mappings.get(node);
+ for (DataStreamerEntry entry : entries) {
+ List<ClusterNode> nodes;
- if (col == null)
- mappings.put(node, col = new ArrayList<>());
+ try {
+ KeyCacheObject key = entry.getKey();
- col.add(entry);
- }
- }
+ assert key != null;
- for (final Map.Entry<ClusterNode, Collection<DataStreamerEntry>> e : mappings.entrySet()) {
- final UUID nodeId = e.getKey().id();
+ if (initPda) {
+ if (cacheObjCtx.addDeploymentInfo())
+ jobPda = new DataStreamerPda(key.value(cacheObjCtx, false),
+ entry.getValue() != null ? entry.getValue().value(cacheObjCtx, false) : null,
+ rcvr);
+ else if (rcvr != null)
+ jobPda = new DataStreamerPda(rcvr);
- Buffer buf = bufMappings.get(nodeId);
+ initPda = false;
+ }
- if (buf == null) {
- Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
+ nodes = nodes(key, topVer, cctx);
+ }
+ catch (IgniteCheckedException e) {
+ resFut.onDone(e);
- if (old != null)
- buf = old;
- }
+ return;
+ }
- final Collection<DataStreamerEntry> entriesForNode = e.getValue();
+ if (F.isEmpty(nodes)) {
+ resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
+ "(no nodes with cache found in topology) [infos=" + entries.size() +
+ ", cacheName=" + cacheName + ']'));
- IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> t) {
- try {
- t.get();
+ return;
+ }
- if (activeKeys != null) {
- for (DataStreamerEntry e : entriesForNode)
- activeKeys.remove(new KeyCacheObjectWrapper(e.getKey()));
+ for (ClusterNode node : nodes) {
+ Collection<DataStreamerEntry> col = mappings.get(node);
- if (activeKeys.isEmpty())
- resFut.onDone();
- }
- else {
- assert entriesForNode.size() == 1;
+ if (col == null)
+ mappings.put(node, col = new ArrayList<>());
- // That has been a single key,
- // so complete result future right away.
- resFut.onDone();
- }
+ col.add(entry);
}
- catch (IgniteClientDisconnectedCheckedException e1) {
- if (log.isDebugEnabled())
- log.debug("Future finished with disconnect error [nodeId=" + nodeId + ", err=" + e1 + ']');
+ }
- resFut.onDone(e1);
+ for (final Map.Entry<ClusterNode, Collection<DataStreamerEntry>> e : mappings.entrySet()) {
+ final UUID nodeId = e.getKey().id();
+
+ Buffer buf = bufMappings.get(nodeId);
+
+ if (buf == null) {
+ Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
+
+ if (old != null)
+ buf = old;
}
- catch (IgniteCheckedException e1) {
- if (log.isDebugEnabled())
- log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
- if (cancelled) {
- resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
- DataStreamerImpl.this, e1));
+ final Collection<DataStreamerEntry> entriesForNode = e.getValue();
+
+ IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> t) {
+ try {
+ t.get();
+
+ if (activeKeys != null) {
+ for (DataStreamerEntry e : entriesForNode)
+ activeKeys.remove(new KeyCacheObjectWrapper(e.getKey()));
+
+ if (activeKeys.isEmpty())
+ resFut.onDone();
+ }
+ else {
+ assert entriesForNode.size() == 1;
+
+ // That has been a single key,
+ // so complete result future right away.
+ resFut.onDone();
+ }
+ }
+ catch (IgniteClientDisconnectedCheckedException e1) {
+ if (log.isDebugEnabled())
+ log.debug("Future finished with disconnect error [nodeId=" + nodeId + ", err=" + e1 + ']');
+
+ resFut.onDone(e1);
+ }
+ catch (IgniteCheckedException e1) {
+ if (log.isDebugEnabled())
+ log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
+
+ if (cancelled) {
+ resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
+ DataStreamerImpl.this, e1));
+ }
+ else if (remaps + 1 > maxRemapCnt) {
+ resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
+ + remaps, e1));
+ }
+ else {
+ try {
+ remapSem.acquire();
+
+ final Runnable r = new Runnable() {
+ @Override public void run() {
+ try {
+ if (cancelled)
+ throw new IllegalStateException("DataStreamer closed.");
+
+ load0(entriesForNode, resFut, activeKeys, remaps + 1);
+ }
+ catch (Throwable ex) {
+ resFut.onDone(
+ new IgniteCheckedException("DataStreamer remapping failed. ", ex));
+ }
+ finally {
+ remapSem.release();
+ }
+ }
+ };
+
+ dataToRemap.add(r);
+
+ if (!remapOwning.get() && remapOwning.compareAndSet(false, true)) {
+ ctx.closure().callLocalSafe(new GPC<Boolean>() {
+ @Override public Boolean call() {
+ boolean locked = true;
+
+ while (locked || !dataToRemap.isEmpty()) {
+ if (!locked && !remapOwning.compareAndSet(false, true))
+ return false;
+
+ try {
+ Runnable r = dataToRemap.poll();
+
+ if (r != null)
+ r.run();
+ }
+ finally {
+ if (!dataToRemap.isEmpty())
+ locked = true;
+ else {
+ remapOwning.set(false);
+
+ locked = false;
+ }
+ }
+ }
+
+ return true;
+ }
+ }, true);
+ }
+ }
+ catch (InterruptedException e2) {
+ resFut.onDone(e2);
+ }
+ }
+ }
}
- else if (remaps + 1 > maxRemapCnt) {
- resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
- + remaps), e1);
- }
- else
- load0(entriesForNode, resFut, activeKeys, remaps + 1);
- }
- }
- };
+ };
- final GridFutureAdapter<?> f;
+ final GridFutureAdapter<?> f;
- try {
- f = buf.update(entriesForNode, topVer, lsnr);
- }
- catch (IgniteInterruptedCheckedException e1) {
- resFut.onDone(e1);
+ try {
+ f = buf.update(entriesForNode, topVer, lsnr, remap);
+ }
+ catch (IgniteInterruptedCheckedException e1) {
+ resFut.onDone(e1);
- return;
- }
+ return;
+ }
- if (ctx.discovery().node(nodeId) == null) {
- if (bufMappings.remove(nodeId, buf)) {
- final Buffer buf0 = buf;
+ if (ctx.discovery().node(nodeId) == null) {
+ if (bufMappings.remove(nodeId, buf)) {
+ final Buffer buf0 = buf;
- waitAffinityAndRun(new Runnable() {
- @Override public void run() {
- buf0.onNodeLeft();
+ waitAffinityAndRun(new Runnable() {
+ @Override public void run() {
+ buf0.onNodeLeft();
- if (f != null)
- f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
- "(node has left): " + nodeId));
+ if (f != null)
+ f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
+ "(node has left): " + nodeId));
+ }
+ }, ctx.discovery().topologyVersion(), false);
}
- }, ctx.discovery().topologyVersion(), false);
+ }
}
}
+ finally {
+ if (gate != null)
+ gate.leave();
+ }
+ }
+ catch (Exception ex) {
+ resFut.onDone(new IgniteCheckedException("DataStreamer data loading failed.", ex));
}
}
/**
* @param key Key to map.
* @param topVer Topology version.
+ * @param cctx Context.
* @return Nodes to send requests to.
* @throws IgniteCheckedException If failed.
*/
- private List<ClusterNode> nodes(KeyCacheObject key, AffinityTopologyVersion topVer) throws IgniteCheckedException {
+ private List<ClusterNode> nodes(KeyCacheObject key,
+ AffinityTopologyVersion topVer,
+ GridCacheContext cctx) throws IgniteCheckedException {
GridAffinityProcessor aff = ctx.affinity();
List<ClusterNode> res = null;
if (!allowOverwrite())
- res = aff.mapKeyToPrimaryAndBackups(cacheName, key, topVer);
+ res = cctx.isLocal() ?
+ aff.mapKeyToPrimaryAndBackups(cacheName, key, topVer) :
+ cctx.topology().nodes(cctx.affinity().partition(key), topVer);
else {
ClusterNode node = aff.mapKeyToNode(cacheName, key, topVer);
@@ -992,7 +1166,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* @throws IgniteCheckedException If failed.
*/
public void closeEx(boolean cancel) throws IgniteCheckedException {
- closeEx(cancel, null);
+ IgniteCheckedException err = closeEx(cancel, null);
+
+ if (err != null)
+ throw err; // Throws at close().
}
/**
@@ -1000,9 +1177,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* @param err Error.
* @throws IgniteCheckedException If failed.
*/
- public void closeEx(boolean cancel, IgniteCheckedException err) throws IgniteCheckedException {
+ private IgniteCheckedException closeEx(boolean cancel, IgniteCheckedException err) throws IgniteCheckedException {
if (!closed.compareAndSet(false, true))
- return;
+ return null;
busyLock.block();
@@ -1029,7 +1206,14 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
throw e;
}
+ long failed = failCntr.longValue();
+
+ if (failed > 0 && err == null)
+ err = new IgniteCheckedException("Some of DataStreamer operations failed [failedCount=" + failed + "]");
+
fut.onDone(err);
+
+ return err;
}
/**
@@ -1139,6 +1323,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** */
private final Semaphore sem;
+ /** Batch topology. */
+ private AffinityTopologyVersion batchTopVer;
+
/** Closure to signal on task finish. */
@GridToStringExclude
private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
@@ -1169,37 +1356,64 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
/**
+ * @param remap Remapping flag.
+ */
+ private void renewBatch(boolean remap) {
+ entries = newEntries();
+ curFut = new GridFutureAdapter<>();
+
+ batchTopVer = null;
+
+ if (!remap)
+ curFut.listen(signalC);
+ }
+
+ /**
* @param newEntries Infos.
* @param topVer Topology version.
* @param lsnr Listener for the operation future.
+ * @param remap Remapping flag.
* @return Future for operation.
* @throws IgniteInterruptedCheckedException If failed.
*/
@Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries,
AffinityTopologyVersion topVer,
- IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
+ IgniteInClosure<IgniteInternalFuture<?>> lsnr,
+ boolean remap) throws IgniteInterruptedCheckedException {
List<DataStreamerEntry> entries0 = null;
+
GridFutureAdapter<Object> curFut0;
+ AffinityTopologyVersion curBatchTopVer;
+
synchronized (this) {
curFut0 = curFut;
curFut0.listen(lsnr);
+ if (batchTopVer == null)
+ batchTopVer = topVer;
+
+ curBatchTopVer = batchTopVer;
+
for (DataStreamerEntry entry : newEntries)
entries.add(entry);
if (entries.size() >= bufSize) {
entries0 = entries;
- entries = newEntries();
- curFut = new GridFutureAdapter<>();
- curFut.listen(signalC);
+ renewBatch(remap);
}
}
- if (entries0 != null) {
- submit(entries0, topVer, curFut0);
+ if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) {
+ renewBatch(remap);
+
+ curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." +
+ "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]"));
+ }
+ else if (entries0 != null) {
+ submit(entries0, curBatchTopVer, curFut0, remap);
if (cancelled)
curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
@@ -1227,6 +1441,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
List<DataStreamerEntry> entries0 = null;
GridFutureAdapter<Object> curFut0 = null;
+ acquireRemapSemaphore();
+
synchronized (this) {
if (!entries.isEmpty()) {
entries0 = entries;
@@ -1239,7 +1455,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
if (entries0 != null)
- submit(entries0, null, curFut0);
+ submit(entries0, batchTopVer, curFut0, false);
// Create compound future for this flush.
GridCompoundFuture<Object, Object> res = null;
@@ -1290,25 +1506,113 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
/**
+ * @param entries Entries.
+ * @param reqTopVer Request topology version.
+ * @param curFut Current future.
+ */
+ private void localUpdate(final Collection<DataStreamerEntry> entries,
+ final AffinityTopologyVersion reqTopVer,
+ final GridFutureAdapter<Object> curFut) {
+ try {
+ GridCacheContext cctx = ctx.cache().internalCache(cacheName).context();
+
+ final boolean allowOverwrite = allowOverwrite();
+ final boolean loc = cctx.isLocal();
+
+ if (!loc && !allowOverwrite)
+ cctx.topology().readLock();
+
+ try {
+ GridDhtTopologyFuture fut = loc ? null : cctx.topologyVersionFuture();
+
+ AffinityTopologyVersion topVer = loc ? reqTopVer : fut.topologyVersion();
+
+ if (!allowOverwrite && !topVer.equals(reqTopVer)) {
+ curFut.onDone(new IgniteCheckedException(
+ "DataStreamer will retry data transfer at stable topology. " +
+ "[reqTop=" + reqTopVer + " ,topVer=" + topVer + ", node=local]"));
+ }
+ else if (loc || allowOverwrite || fut.isDone()) {
+ IgniteInternalFuture<Object> callFut = ctx.closure().callLocalSafe(
+ new DataStreamerUpdateJob(
+ ctx,
+ log,
+ cacheName,
+ entries,
+ false,
+ skipStore,
+ keepBinary,
+ rcvr),
+ false);
+
+ locFuts.add(callFut);
+
+ final GridFutureAdapter waitFut = (loc || allowOverwrite) ?
+ null :
+ cctx.mvcc().addDataStreamerFuture(topVer);
+
+ callFut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object> t) {
+ try {
+ boolean rmv = locFuts.remove(t);
+
+ assert rmv;
+
+ curFut.onDone(t.get());
+ }
+ catch (IgniteCheckedException e) {
+ curFut.onDone(e);
+ }
+ finally {
+ if (waitFut != null)
+ waitFut.onDone();
+ }
+ }
+ });
+ }
+ else {
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
+ localUpdate(entries, reqTopVer, curFut);
+ }
+ });
+ }
+ }
+ finally {
+ if (!loc && !allowOverwrite)
+ cctx.topology().readUnlock();
+ }
+ }
+ catch (Throwable ex) {
+ curFut.onDone(new IgniteCheckedException("DataStreamer data handling failed.", ex));
+ }
+ }
+
+ /**
* @param entries Entries to submit.
* @param topVer Topology version.
* @param curFut Current future.
+ * @param remap Remapping flag.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
private void submit(final Collection<DataStreamerEntry> entries,
@Nullable AffinityTopologyVersion topVer,
- final GridFutureAdapter<Object> curFut)
+ final GridFutureAdapter<Object> curFut,
+ boolean remap)
throws IgniteInterruptedCheckedException {
assert entries != null;
assert !entries.isEmpty();
assert curFut != null;
- try {
- incrementActiveTasks();
- }
- catch (IgniteDataStreamerTimeoutException e) {
- curFut.onDone(e);
- throw e;
+ if (!remap) {
+ try {
+ incrementActiveTasks();
+ }
+ catch (IgniteDataStreamerTimeoutException e) {
+ curFut.onDone(e);
+
+ throw e;
+ }
}
IgniteInternalFuture<Object> fut;
@@ -1318,27 +1622,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
if (plc == null)
plc = PUBLIC_POOL;
- if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL) {
- fut = ctx.closure().callLocalSafe(
- new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, keepBinary, rcvr), false);
-
- locFuts.add(fut);
-
- fut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() {
- @Override public void apply(IgniteInternalFuture<Object> t) {
- try {
- boolean rmv = locFuts.remove(t);
-
- assert rmv;
-
- curFut.onDone(t.get());
- }
- catch (IgniteCheckedException e) {
- curFut.onDone(e);
- }
- }
- });
- }
+ if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL)
+ localUpdate(entries, topVer, curFut);
else {
try {
for (DataStreamerEntry e : entries) {
@@ -1466,8 +1751,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/**
* @param res Response.
+ * @param nodeId Node id.
*/
- void onResponse(DataStreamerResponse res) {
+ void onResponse(DataStreamerResponse res, UUID nodeId) {
if (log.isDebugEnabled())
log.debug("Received data load response: " + res);
@@ -1488,9 +1774,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
try {
GridPeerDeployAware jobPda0 = jobPda;
- err = U.unmarshal(ctx,
- errBytes,
- U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config()));
+ err = new IgniteCheckedException("DataStreamer request failed [node=" + nodeId + "]",
+ (Throwable)U.unmarshal(ctx,
+ errBytes,
+ U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config())));
}
catch (IgniteCheckedException e) {
f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
@@ -1613,7 +1900,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/**
* Isolated receiver which only loads entry initial value.
*/
- private static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, CacheObject>,
+ protected static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, CacheObject>,
DataStreamerCacheUpdaters.InternalUpdater {
/** */
private static final long serialVersionUID = 0L;
@@ -1630,7 +1917,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
GridCacheContext cctx = internalCache.context();
- AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion topVer = cctx.isLocal() ?
+ cctx.affinity().affinityTopologyVersion() :
+ cctx.topology().topologyVersion();
GridCacheVersion ver = cctx.versions().isolatedStreamerVersion();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
index 745619a..ce6783a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
@@ -72,7 +72,21 @@ public class GridLogThrottle {
public static void error(@Nullable IgniteLogger log, @Nullable Throwable e, String msg) {
assert !F.isEmpty(msg);
- log(log, e, msg, null, LogLevel.ERROR, false);
+ log(log, e, msg, null, LogLevel.ERROR, false, false);
+ }
+
+ /**
+ * Logs error if needed.
+ *
+ * @param log Logger.
+ * @param e Error (optional).
+ * @param msg Message.
+ * @param byMessage Errors group by message, not by tuple(error, msg).
+ */
+ public static void error(@Nullable IgniteLogger log, @Nullable Throwable e, String msg, boolean byMessage) {
+ assert !F.isEmpty(msg);
+
+ log(log, e, msg, null, LogLevel.ERROR, false, byMessage);
}
/**
@@ -85,7 +99,7 @@ public class GridLogThrottle {
public static void warn(@Nullable IgniteLogger log, @Nullable Throwable e, String msg) {
assert !F.isEmpty(msg);
- log(log, e, msg, null, LogLevel.WARN, false);
+ log(log, e, msg, null, LogLevel.WARN, false, false);
}
/**
@@ -99,7 +113,7 @@ public class GridLogThrottle {
public static void warn(@Nullable IgniteLogger log, @Nullable Throwable e, String msg, boolean quite) {
assert !F.isEmpty(msg);
- log(log, e, msg, null, LogLevel.WARN, quite);
+ log(log, e, msg, null, LogLevel.WARN, quite, false);
}
/**
@@ -113,7 +127,7 @@ public class GridLogThrottle {
public static void warn(@Nullable IgniteLogger log, @Nullable Throwable e, String longMsg, @Nullable String shortMsg) {
assert !F.isEmpty(longMsg);
- log(log, e, longMsg, shortMsg, LogLevel.WARN, false);
+ log(log, e, longMsg, shortMsg, LogLevel.WARN, false, false);
}
/**
@@ -126,7 +140,7 @@ public class GridLogThrottle {
public static void info(@Nullable IgniteLogger log, String msg, boolean quite) {
assert !F.isEmpty(msg);
- log(log, null, msg, null, LogLevel.INFO, quite);
+ log(log, null, msg, null, LogLevel.INFO, quite, false);
}
/**
@@ -154,14 +168,15 @@ public class GridLogThrottle {
* @param longMsg Long message (or just message).
* @param shortMsg Short message for quite logging.
* @param level Level where messages should appear.
+ * @param byMessage Errors group by message, not by tuple(error, msg).
*/
@SuppressWarnings({"RedundantTypeArguments"})
private static void log(@Nullable IgniteLogger log, @Nullable Throwable e, String longMsg, @Nullable String shortMsg,
- LogLevel level, boolean quiet) {
+ LogLevel level, boolean quiet, boolean byMessage) {
assert !F.isEmpty(longMsg);
IgniteBiTuple<Class<? extends Throwable>, String> tup =
- e != null ? F.<Class<? extends Throwable>, String>t(e.getClass(), e.getMessage()) :
+ e != null && !byMessage ? F.<Class<? extends Throwable>, String>t(e.getClass(), e.getMessage()) :
F.<Class<? extends Throwable>, String>t(null, longMsg);
while (true) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
index 5bd6074..c92ea9e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
@@ -78,27 +78,37 @@ public class IgniteCacheDynamicStopSelfTest extends GridCommonAbstractTest {
IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
/** {@inheritDoc} */
@Override public Object call() throws Exception {
- try (IgniteDataStreamer<Integer, Integer> str = ignite(0).dataStreamer(null)) {
- str.allowOverwrite(allowOverwrite);
-
- int i = 0;
-
- while (!stop.get()) {
- str.addData(i % 10_000, i).listen(new CI1<IgniteFuture<?>>() {
- @Override public void apply(IgniteFuture<?> f) {
- try {
- f.get();
- }
- catch (CacheException ignore) {
- // This may be debugged.
- }
+ while (!stop.get()) {
+ try (IgniteDataStreamer<Integer, Integer> str = ignite(0).dataStreamer(null)) {
+ str.allowOverwrite(allowOverwrite);
+
+ int i = 0;
+
+ while (!stop.get()) {
+ try {
+ str.addData(i % 10_000, i).listen(new CI1<IgniteFuture<?>>() {
+ @Override public void apply(IgniteFuture<?> f) {
+ try {
+ f.get();
+ }
+ catch (CacheException ignore) {
+ // This may be debugged.
+ }
+ }
+ });
+ }
+ catch (IllegalStateException ignored) {
+ break;
}
- });
- if (i > 0 && i % 10000 == 0)
- info("Added: " + i);
+ if (i > 0 && i % 10000 == 0)
+ info("Added: " + i);
- i++;
+ i++;
+ }
+ }
+ catch (IllegalStateException | CacheException ignored) {
+ // This may be debugged.
}
}
@@ -114,6 +124,8 @@ public class IgniteCacheDynamicStopSelfTest extends GridCommonAbstractTest {
Thread.sleep(500);
ignite(0).createCache(ccfg);
+
+ Thread.sleep(1000);
}
finally {
stop.set(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
index 9da6cf7..0801691 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
@@ -18,6 +18,9 @@
package org.apache.ignite.internal.processors.cache.distributed;
import java.io.Serializable;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
import java.util.concurrent.Callable;
import javax.cache.Cache;
import javax.cache.configuration.FactoryBuilder;
@@ -28,32 +31,47 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
/**
* Tests for cache data loading during simultaneous grids start.
*/
-public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractTest {
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-500");
- }
-
+public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractTest implements Serializable {
/** Grids count */
private static int GRIDS_CNT = 5;
/** Keys count */
private static int KEYS_CNT = 1_000_000;
+ /** Client. */
+ private volatile boolean client;
+
+ /** Config. */
+ private volatile boolean configured;
+
+ /** Allow override. */
+ protected volatile boolean allowOverwrite;
+
+ /** Restarts. */
+ protected volatile boolean restarts;
+
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -67,7 +85,24 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT
ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestCacheStoreAdapter()));
- cfg.setCacheConfiguration(ccfg);
+ if (getTestGridName(0).equals(gridName)) {
+ if (client)
+ cfg.setClientMode(true);
+
+ if (configured)
+ cfg.setCacheConfiguration(ccfg);
+ }
+ else
+ cfg.setCacheConfiguration(ccfg);
+
+ if (!configured)
+ ccfg.setNodeFilter(new P1<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ String name = node.attribute(ATTR_GRID_NAME).toString();
+
+ return !getTestGridName(0).equals(name);
+ }
+ });
return cfg;
}
@@ -81,22 +116,35 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT
* @throws Exception if failed
*/
public void testLoadCacheWithDataStreamer() throws Exception {
- IgniteInClosure<Ignite> f = new IgniteInClosure<Ignite>() {
- @Override public void apply(Ignite grid) {
- try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(null)) {
- for (int i = 0; i < KEYS_CNT; i++)
- dataStreamer.addData(i, Integer.toString(i));
+ configured = true;
+
+ try {
+ IgniteInClosure<Ignite> f = new IgniteInClosure<Ignite>() {
+ @Override public void apply(Ignite grid) {
+ try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(null)) {
+ dataStreamer.allowOverwrite(allowOverwrite);
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ dataStreamer.addData(i, Integer.toString(i));
+ }
+
+ log.info("Data loaded.");
}
- }
- };
+ };
- loadCache(f);
+ loadCache(f);
+ }
+ finally {
+ configured = false;
+ }
}
/**
* @throws Exception if failed
*/
public void testLoadCacheFromStore() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-4210");
+
loadCache(new IgniteInClosure<Ignite>() {
@Override public void apply(Ignite grid) {
grid.cache(null).loadCache(null);
@@ -105,12 +153,177 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT
}
/**
+ * @throws Exception if failed
+ */
+ public void testLoadCacheWithDataStreamerSequentialClient() throws Exception {
+ client = true;
+
+ try {
+ loadCacheWithDataStreamerSequential();
+ }
+ finally {
+ client = false;
+ }
+ }
+
+ /**
+ * @throws Exception if failed
+ */
+ public void testLoadCacheWithDataStreamerSequentialClientWithConfig() throws Exception {
+ client = true;
+ configured = true;
+
+ try {
+ loadCacheWithDataStreamerSequential();
+ }
+ finally {
+ client = false;
+ configured = false;
+ }
+ }
+
+ /**
+ * @throws Exception if failed
+ */
+ public void testLoadCacheWithDataStreamerSequential() throws Exception {
+ loadCacheWithDataStreamerSequential();
+ }
+
+ /**
+ * @throws Exception if failed
+ */
+ public void testLoadCacheWithDataStreamerSequentialWithConfigAndRestarts() throws Exception {
+ restarts = true;
+ configured = true;
+
+ try {
+ loadCacheWithDataStreamerSequential();
+ }
+ finally {
+ restarts = false;
+ configured = false;
+ }
+ }
+
+ /**
+ * @throws Exception if failed
+ */
+ public void testLoadCacheWithDataStreamerSequentialWithConfig() throws Exception {
+ configured = true;
+
+ try {
+ loadCacheWithDataStreamerSequential();
+ }
+ finally {
+ configured = false;
+ }
+ }
+
+ /**
+ * @throws Exception if failed
+ */
+ private void loadCacheWithDataStreamerSequential() throws Exception {
+ startGrid(1);
+
+ Ignite g0 = startGrid(0);
+
+ IgniteInternalFuture<Object> restartFut = runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ while (restarts) {
+ stopGrid(1);
+
+ startGrid(1);
+
+ U.sleep(100);
+ }
+
+ return null;
+ }
+ });
+
+ IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 2; i < GRIDS_CNT; i++)
+ startGrid(i);
+
+ return null;
+ }
+ });
+
+ final HashSet<IgniteFuture> set = new HashSet<>();
+
+ IgniteInClosure<Ignite> f = new IgniteInClosure<Ignite>() {
+ @Override public void apply(Ignite grid) {
+ try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(null)) {
+ dataStreamer.allowOverwrite(allowOverwrite);
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ set.add(dataStreamer.addData(i, "Data"));
+
+ if (i % 100000 == 0)
+ log.info("Streaming " + i + "'th entry.");
+ }
+ }
+ }
+ };
+
+ f.apply(g0);
+
+ log.info("Data loaded.");
+
+ restarts = false;
+
+ fut.get();
+ restartFut.get();
+
+ for (IgniteFuture res : set)
+ assertNull(res.get());
+
+ IgniteCache<Integer, String> cache = grid(0).cache(null);
+
+ long size = cache.size(CachePeekMode.PRIMARY);
+
+ if (size != KEYS_CNT) {
+ Set<Integer> failedKeys = new LinkedHashSet<>();
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ if (!cache.containsKey(i)) {
+ log.info("Actual cache size: " + size);
+
+ for (Ignite ignite : G.allGrids()) {
+ IgniteEx igniteEx = (IgniteEx)ignite;
+
+ log.info("Missed key info:" +
+ igniteEx.localNode().id() +
+ " primary=" +
+ ignite.affinity(null).isPrimary(igniteEx.localNode(), i) +
+ " backup=" +
+ ignite.affinity(null).isBackup(igniteEx.localNode(), i) +
+ " local peek=" +
+ ignite.cache(null).localPeek(i, CachePeekMode.ONHEAP));
+ }
+
+ for (int j = i; j < i + 10000; j++) {
+ if (!cache.containsKey(j))
+ failedKeys.add(j);
+ }
+
+ break;
+ }
+
+ assert failedKeys.isEmpty() : "Some failed keys: " + failedKeys.toString();
+ }
+
+ assertCacheSize();
+ }
+
+ /**
* Loads cache using closure and asserts cache size.
*
* @param f cache loading closure
* @throws Exception if failed
*/
- private void loadCache(IgniteInClosure<Ignite> f) throws Exception {
+ protected void loadCache(IgniteInClosure<Ignite> f) throws Exception {
Ignite g0 = startGrid(0);
IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Ignite>() {
@@ -130,17 +343,17 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT
}
/** Asserts cache size. */
- private void assertCacheSize() {
+ protected void assertCacheSize() {
IgniteCache<Integer, String> cache = grid(0).cache(null);
- assertEquals(KEYS_CNT, cache.size(CachePeekMode.PRIMARY));
+ assertEquals("Data lost.", KEYS_CNT, cache.size(CachePeekMode.PRIMARY));
int total = 0;
for (int i = 0; i < GRIDS_CNT; i++)
total += grid(i).cache(null).localSize(CachePeekMode.PRIMARY);
- assertEquals(KEYS_CNT, total);
+ assertEquals("Data lost.", KEYS_CNT, total);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.java
new file mode 100644
index 0000000..c9cd9fa
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+/**
+ *
+ */
+public class CacheLoadingConcurrentGridStartSelfTestAllowOverwrite extends CacheLoadingConcurrentGridStartSelfTest {
+ /**
+ * Default constructor.
+ */
+ public CacheLoadingConcurrentGridStartSelfTestAllowOverwrite() {
+ allowOverwrite = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index 9fedc35..0f8ae29 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -29,9 +29,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
+import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
@@ -194,7 +194,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
assert false;
}
- catch (IgniteCheckedException e) {
+ catch (CacheException e) {
// Cannot load local cache configured remotely.
info("Caught expected exception: " + e);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index 0c6686f..a6a9f54 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -22,13 +22,17 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
+import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheServerNotFoundException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -50,6 +54,16 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
/** Started grid counter. */
private static int cnt;
+ /** No nodes filter. */
+ private static volatile boolean noNodesFilter;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -72,88 +86,149 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testNullPointerExceptionUponDataStreamerClosing() throws Exception {
- try {
- startGrids(5);
+ startGrids(5);
- final CyclicBarrier barrier = new CyclicBarrier(2);
+ final CyclicBarrier barrier = new CyclicBarrier(2);
- multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- U.awaitQuiet(barrier);
+ multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ U.awaitQuiet(barrier);
- G.stopAll(true);
+ G.stopAll(true);
- return null;
- }
- }, 1);
+ return null;
+ }
+ }, 1);
- Ignite g4 = grid(4);
+ Ignite g4 = grid(4);
- IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null);
+ IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null);
- dataLdr.perNodeBufferSize(32);
+ dataLdr.perNodeBufferSize(32);
- for (int i = 0; i < 100000; i += 2) {
- dataLdr.addData(i, i);
- dataLdr.removeData(i + 1);
- }
+ for (int i = 0; i < 100000; i += 2) {
+ dataLdr.addData(i, i);
+ dataLdr.removeData(i + 1);
+ }
- U.awaitQuiet(barrier);
+ U.awaitQuiet(barrier);
- info("Closing data streamer.");
+ info("Closing data streamer.");
- try {
- dataLdr.close(true);
- }
- catch (IllegalStateException ignore) {
- // This is ok to ignore this exception as test is racy by it's nature -
- // grid is stopping in different thread.
- }
+ try {
+ dataLdr.close(true);
}
- finally {
- G.stopAll(true);
+ catch (CacheException | IllegalStateException ignore) {
+ // This is ok to ignore this exception as test is racy by it's nature -
+ // grid is stopping in different thread.
}
}
/**
* Data streamer should correctly load entries from HashMap in case of grids with more than one node
- * and with GridOptimizedMarshaller that requires serializable.
+ * and with GridOptimizedMarshaller that requires serializable.
*
* @throws Exception If failed.
*/
public void testAddDataFromMap() throws Exception {
- try {
- cnt = 0;
+ cnt = 0;
- startGrids(2);
+ startGrids(2);
- Ignite g0 = grid(0);
+ Ignite g0 = grid(0);
- IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null);
+ IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null);
- Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
+ Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
- for (int i = 0; i < KEYS_COUNT; i ++)
- map.put(i, String.valueOf(i));
+ for (int i = 0; i < KEYS_COUNT; i++)
+ map.put(i, String.valueOf(i));
- dataLdr.addData(map);
+ dataLdr.addData(map);
- dataLdr.close();
+ dataLdr.close();
- Random rnd = new Random();
+ Random rnd = new Random();
- IgniteCache<Integer, String> c = g0.cache(null);
+ IgniteCache<Integer, String> c = g0.cache(null);
- for (int i = 0; i < KEYS_COUNT; i ++) {
- Integer k = rnd.nextInt(KEYS_COUNT);
+ for (int i = 0; i < KEYS_COUNT; i++) {
+ Integer k = rnd.nextInt(KEYS_COUNT);
- String v = c.get(k);
+ String v = c.get(k);
+
+ assertEquals(k.toString(), v);
+ }
+ }
+
+ /**
+ * Test logging on {@code DataStreamer.addData()} method when cache have no data nodes
+ *
+ * @throws Exception If fail.
+ */
+ public void testNoDataNodesOnClose() throws Exception {
+ boolean failed = false;
+
+ cnt = 0;
+
+ noNodesFilter = true;
+
+ try {
+ Ignite ignite = startGrid(1);
- assertEquals(k.toString(), v);
+ try (IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(null)) {
+ streamer.addData(1, "1");
+ }
+ catch (CacheException ex) {
+ failed = true;
}
}
finally {
- G.stopAll(true);
+ noNodesFilter = false;
+
+ assertTrue(failed);
+ }
+ }
+
+ /**
+ * Test logging on {@code DataStreamer.addData()} method when cache have no data nodes
+ *
+ * @throws Exception If fail.
+ */
+ public void testNoDataNodesOnFlush() throws Exception {
+ boolean failed = false;
+
+ cnt = 0;
+
+ noNodesFilter = true;
+
+ try {
+ Ignite ignite = startGrid(1);
+
+ IgniteFuture fut = null;
+
+ try (IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(null)) {
+ fut = streamer.addData(1, "1");
+
+ streamer.flush();
+ }
+ catch (IllegalStateException ex) {
+ try {
+ fut.get();
+
+ fail("DataStreamer ignores failed streaming.");
+ }
+ catch (CacheServerNotFoundException ignored) {
+ // No-op.
+ }
+
+ failed = true;
+ }
+ }
+ finally {
+ noNodesFilter = false;
+
+ assertTrue(failed);
}
}
@@ -169,6 +244,9 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
cacheCfg.setBackups(1);
cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+ if (noNodesFilter)
+ cacheCfg.setNodeFilter(F.alwaysFalse());
+
return cacheCfg;
}