You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/11/05 04:28:29 UTC

[37/66] [abbrv] ignite git commit: IGNITE-950 - Fixing RemoveAll()

IGNITE-950 - Fixing RemoveAll()


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e0a1a660
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e0a1a660
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e0a1a660

Branch: refs/heads/ignite-1753-1282
Commit: e0a1a6602f7d7cfc6039a8868c69ca7690f1a2c1
Parents: 8585cce
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Nov 3 17:08:09 2015 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Nov 3 17:08:09 2015 +0300

----------------------------------------------------------------------
 .../GridDistributedCacheAdapter.java            | 40 ++++++++++++++------
 .../datastreamer/DataStreamProcessor.java       |  2 -
 .../datastreamer/DataStreamerImpl.java          |  4 --
 .../platform/cache/PlatformCache.java           |  2 -
 .../datastreamer/PlatformDataStreamer.java      |  2 -
 5 files changed, 29 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 5aace6e..77035df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -169,6 +169,8 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
 
             boolean skipStore = opCtx != null && opCtx.skipStore();
 
+            boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
             do {
                 retry = false;
 
@@ -181,7 +183,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                     ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
                     retry = !ctx.kernalContext().task().execute(
-                        new RemoveAllTask(ctx.name(), topVer, skipStore), null).get();
+                        new RemoveAllTask(ctx.name(), topVer, skipStore, keepBinary), null).get();
                 }
             }
             while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || retry);
@@ -200,9 +202,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
 
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
-        boolean skipStore = opCtx != null && opCtx.skipStore();
-
-        removeAllAsync(opFut, topVer, skipStore);
+        removeAllAsync(opFut, topVer, opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.isKeepBinary());
 
         return opFut;
     }
@@ -212,15 +212,19 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
      * @param topVer Topology version.
      * @param skipStore Skip store flag.
      */
-    private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer,
-        final boolean skipStore) {
+    private void removeAllAsync(
+        final GridFutureAdapter<Void> opFut,
+        final AffinityTopologyVersion topVer,
+        final boolean skipStore,
+        final boolean keepBinary
+    ) {
         Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
 
         if (!nodes.isEmpty()) {
             ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
             IgniteInternalFuture<Boolean> rmvAll = ctx.kernalContext().task().execute(
-                new RemoveAllTask(ctx.name(), topVer, skipStore), null);
+                new RemoveAllTask(ctx.name(), topVer, skipStore, keepBinary), null);
 
             rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
                 @Override public void apply(IgniteInternalFuture<Boolean> fut) {
@@ -232,7 +236,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                         if (topVer0.equals(topVer) && !retry)
                             opFut.onDone();
                         else
-                            removeAllAsync(opFut, topVer0, skipStore);
+                            removeAllAsync(opFut, topVer0, skipStore, keepBinary);
                     }
                     catch (ClusterGroupEmptyCheckedException ignore) {
                         if (log.isDebugEnabled())
@@ -277,15 +281,19 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
         /** Skip store flag. */
         private final boolean skipStore;
 
+        /** Keep binary flag. */
+        private final boolean keepBinary;
+
         /**
          * @param cacheName Cache name.
          * @param topVer Affinity topology version.
          * @param skipStore Skip store flag.
          */
-        public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, boolean skipStore) {
+        public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, boolean skipStore, boolean keepBinary) {
             this.cacheName = cacheName;
             this.topVer = topVer;
             this.skipStore = skipStore;
+            this.keepBinary = keepBinary;
         }
 
         /** {@inheritDoc} */
@@ -294,7 +302,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
             Map<ComputeJob, ClusterNode> jobs = new HashMap();
 
             for (ClusterNode node : subgrid)
-                jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore), node);
+                jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore, keepBinary), node);
 
             return jobs;
         }
@@ -335,15 +343,24 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
         /** Skip store flag. */
         private final boolean skipStore;
 
+        /** Keep binary flag. */
+        private final boolean keepBinary;
+
         /**
          * @param cacheName Cache name.
          * @param topVer Topology version.
          * @param skipStore Skip store flag.
          */
-        private GlobalRemoveAllJob(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) {
+        private GlobalRemoveAllJob(
+            String cacheName,
+            @NotNull AffinityTopologyVersion topVer,
+            boolean skipStore,
+            boolean keepBinary
+        ) {
             super(cacheName, topVer);
 
             this.skipStore = skipStore;
+            this.keepBinary = keepBinary;
         }
 
         /** {@inheritDoc} */
@@ -378,6 +395,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                     ((DataStreamerImpl) dataLdr).maxRemapCount(0);
 
                     dataLdr.skipStore(skipStore);
+                    dataLdr.keepBinary(keepBinary);
 
                     dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 14cb1c8..a2aab77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -283,8 +283,6 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
 
             Collection<DataStreamerEntry> col = req.entries();
 
-            U.debug(log, "Processing data streamer update request [keepBinary=" + req.keepBinary() + ", keys=" + col + ']');
-
             DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx,
                 log,
                 req.cacheName(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index df1f656..27eff0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1255,8 +1255,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             IgniteInternalFuture<Object> fut;
 
             if (isLocNode) {
-                U.dumpStack("Submitting local streamer job [entries=" + entries + ", keepBinary=" + keepBinary + ']');
-
                 fut = ctx.closure().callLocalSafe(
                     new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, keepBinary, rcvr), false);
 
@@ -1338,8 +1336,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 if (topVer == null)
                     topVer = ctx.cache().context().exchange().readyAffinityVersion();
 
-                U.debug(log, "Creating data streamer request [keepBinary=" + keepBinary + ']');
-
                 DataStreamerRequest req = new DataStreamerRequest(
                     reqId,
                     topicBytes,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index d3588ee..6ec52d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -720,8 +720,6 @@ public class PlatformCache extends PlatformAbstractTarget {
      * @throws org.apache.ignite.IgniteCheckedException In case of error.
      */
     public void removeAll() throws IgniteCheckedException {
-        U.debug(log, "Will removeAll on platform cache: " + cache.operationContext());
-
         cache.removeAll();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index 9caa913..794ab0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -85,8 +85,6 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
         this.cacheName = cacheName;
         this.ldr = ldr;
         this.keepPortable = keepPortable;
-
-        U.debug(log, "Created platform streamer [keepBinary=" + ldr.keepBinary() + ']');
     }
 
     /** {@inheritDoc}  */