You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/11/05 04:28:29 UTC
[37/66] [abbrv] ignite git commit: IGNITE-950 - Fixing RemoveAll()
IGNITE-950 - Fixing RemoveAll()
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e0a1a660
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e0a1a660
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e0a1a660
Branch: refs/heads/ignite-1753-1282
Commit: e0a1a6602f7d7cfc6039a8868c69ca7690f1a2c1
Parents: 8585cce
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Nov 3 17:08:09 2015 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Nov 3 17:08:09 2015 +0300
----------------------------------------------------------------------
.../GridDistributedCacheAdapter.java | 40 ++++++++++++++------
.../datastreamer/DataStreamProcessor.java | 2 -
.../datastreamer/DataStreamerImpl.java | 4 --
.../platform/cache/PlatformCache.java | 2 -
.../datastreamer/PlatformDataStreamer.java | 2 -
5 files changed, 29 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 5aace6e..77035df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -169,6 +169,8 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
boolean skipStore = opCtx != null && opCtx.skipStore();
+ boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
do {
retry = false;
@@ -181,7 +183,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
retry = !ctx.kernalContext().task().execute(
- new RemoveAllTask(ctx.name(), topVer, skipStore), null).get();
+ new RemoveAllTask(ctx.name(), topVer, skipStore, keepBinary), null).get();
}
}
while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || retry);
@@ -200,9 +202,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
CacheOperationContext opCtx = ctx.operationContextPerCall();
- boolean skipStore = opCtx != null && opCtx.skipStore();
-
- removeAllAsync(opFut, topVer, skipStore);
+ removeAllAsync(opFut, topVer, opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.isKeepBinary());
return opFut;
}
@@ -212,15 +212,19 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
* @param topVer Topology version.
* @param skipStore Skip store flag.
*/
- private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer,
- final boolean skipStore) {
+ private void removeAllAsync(
+ final GridFutureAdapter<Void> opFut,
+ final AffinityTopologyVersion topVer,
+ final boolean skipStore,
+ final boolean keepBinary
+ ) {
Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
if (!nodes.isEmpty()) {
ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
IgniteInternalFuture<Boolean> rmvAll = ctx.kernalContext().task().execute(
- new RemoveAllTask(ctx.name(), topVer, skipStore), null);
+ new RemoveAllTask(ctx.name(), topVer, skipStore, keepBinary), null);
rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> fut) {
@@ -232,7 +236,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
if (topVer0.equals(topVer) && !retry)
opFut.onDone();
else
- removeAllAsync(opFut, topVer0, skipStore);
+ removeAllAsync(opFut, topVer0, skipStore, keepBinary);
}
catch (ClusterGroupEmptyCheckedException ignore) {
if (log.isDebugEnabled())
@@ -277,15 +281,19 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
/** Skip store flag. */
private final boolean skipStore;
+ /** Keep binary flag. */
+ private final boolean keepBinary;
+
/**
* @param cacheName Cache name.
* @param topVer Affinity topology version.
* @param skipStore Skip store flag.
*/
- public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, boolean skipStore) {
+ public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, boolean skipStore, boolean keepBinary) {
this.cacheName = cacheName;
this.topVer = topVer;
this.skipStore = skipStore;
+ this.keepBinary = keepBinary;
}
/** {@inheritDoc} */
@@ -294,7 +302,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
Map<ComputeJob, ClusterNode> jobs = new HashMap();
for (ClusterNode node : subgrid)
- jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore), node);
+ jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore, keepBinary), node);
return jobs;
}
@@ -335,15 +343,24 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
/** Skip store flag. */
private final boolean skipStore;
+ /** Keep binary flag. */
+ private final boolean keepBinary;
+
/**
* @param cacheName Cache name.
* @param topVer Topology version.
* @param skipStore Skip store flag.
*/
- private GlobalRemoveAllJob(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) {
+ private GlobalRemoveAllJob(
+ String cacheName,
+ @NotNull AffinityTopologyVersion topVer,
+ boolean skipStore,
+ boolean keepBinary
+ ) {
super(cacheName, topVer);
this.skipStore = skipStore;
+ this.keepBinary = keepBinary;
}
/** {@inheritDoc} */
@@ -378,6 +395,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
((DataStreamerImpl) dataLdr).maxRemapCount(0);
dataLdr.skipStore(skipStore);
+ dataLdr.keepBinary(keepBinary);
dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 14cb1c8..a2aab77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -283,8 +283,6 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
Collection<DataStreamerEntry> col = req.entries();
- U.debug(log, "Processing data streamer update request [keepBinary=" + req.keepBinary() + ", keys=" + col + ']');
-
DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx,
log,
req.cacheName(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index df1f656..27eff0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1255,8 +1255,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
IgniteInternalFuture<Object> fut;
if (isLocNode) {
- U.dumpStack("Submitting local streamer job [entries=" + entries + ", keepBinary=" + keepBinary + ']');
-
fut = ctx.closure().callLocalSafe(
new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, keepBinary, rcvr), false);
@@ -1338,8 +1336,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
if (topVer == null)
topVer = ctx.cache().context().exchange().readyAffinityVersion();
- U.debug(log, "Creating data streamer request [keepBinary=" + keepBinary + ']');
-
DataStreamerRequest req = new DataStreamerRequest(
reqId,
topicBytes,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index d3588ee..6ec52d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -720,8 +720,6 @@ public class PlatformCache extends PlatformAbstractTarget {
* @throws org.apache.ignite.IgniteCheckedException In case of error.
*/
public void removeAll() throws IgniteCheckedException {
- U.debug(log, "Will removeAll on platform cache: " + cache.operationContext());
-
cache.removeAll();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index 9caa913..794ab0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -85,8 +85,6 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
this.cacheName = cacheName;
this.ldr = ldr;
this.keepPortable = keepPortable;
-
- U.debug(log, "Created platform streamer [keepBinary=" + ldr.keepBinary() + ']');
}
/** {@inheritDoc} */