You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/11/02 14:12:20 UTC

ignite git commit: review

Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-3 630c5e186 -> bc0423dfd


review


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc0423df
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc0423df
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc0423df

Branch: refs/heads/ignite-1093-3
Commit: bc0423dfdc6e1e2257f7507537e711ca1b33e253
Parents: 630c5e1
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Mon Nov 2 16:12:11 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon Nov 2 16:12:11 2015 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  4 +--
 .../configuration/IgniteConfiguration.java      | 12 +++++----
 .../GridCachePartitionExchangeManager.java      | 13 +++++-----
 .../processors/cache/GridCachePreloader.java    |  1 -
 .../distributed/dht/GridDhtCacheEntry.java      |  8 ++++--
 .../distributed/dht/GridDhtLocalPartition.java  |  7 +++---
 .../GridDhtPartitionDemandMessage.java          |  5 ++--
 .../dht/preloader/GridDhtPartitionDemander.java | 26 +++++++++++++-------
 .../dht/preloader/GridDhtPreloader.java         | 14 ++++++-----
 .../GridCacheRebalancingSyncSelfTest.java       |  9 +++++--
 10 files changed, 59 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 3a9d632..1e1f437 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1279,23 +1279,23 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         return this;
     }
 
-    @Deprecated
     /**
      * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
      *
      * @return Size of rebalancing thread pool.
      */
+    @Deprecated
     public int getRebalanceThreadPoolSize() {
         return rebalancePoolSize;
     }
 
-    @Deprecated
     /**
      * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
      *
      * @param rebalancePoolSize Size of rebalancing thread pool.
      * @return {@code this} for chaining.
      */
+    @Deprecated
     public CacheConfiguration<K, V> setRebalanceThreadPoolSize(int rebalancePoolSize) {
         this.rebalancePoolSize = rebalancePoolSize;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 87fcf3f..02b1066 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -149,8 +149,8 @@ public class IgniteConfiguration {
     /** Default keep alive time for public thread pool. */
     public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
 
-    /** Default limit of threads used at rebalance. */
-    public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 1;// Has minimal affect on the operation of the grid.
+    /** Default limit of threads used for rebalance. */
+    public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 1;
 
     /** Default max queue capacity of public thread pool. */
     public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE;
@@ -1342,17 +1342,19 @@ public class IgniteConfiguration {
      * Minimum is 1.
      * @return count.
      */
-    public int getRebalanceThreadPoolSize(){
+    public int getRebalanceThreadPoolSize() {
         return Math.max(1, rebalanceThreadPoolSize);
     }
 
     /**
      * Sets Max count of threads can be used at rebalancing.
-     * Minimum is 1.
+     *
+     * Default is {@code 1} which has minimal impact on the operation of the grid.
+     *
      * @param size Size.
      * @return {@code this} for chaining.
      */
-    public IgniteConfiguration setRebalanceThreadPoolSize(int size){
+    public IgniteConfiguration setRebalanceThreadPoolSize(int size) {
         this.rebalanceThreadPoolSize = size;
 
         return this;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 382c975..6ca4717 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -322,7 +322,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         exchWorker.futQ.addFirst(fut);
 
         if (!cctx.kernalContext().clientNode()) {
-
             for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
                 final int idx = cnt;
 
@@ -409,8 +408,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param idx
-     * @return topic
+     * @param idx Index.
+     * @return Topic for index.
      */
     public static Object rebalanceTopic(int idx) {
         return TOPIC_CACHE.topic("Rebalance", idx);
@@ -440,10 +439,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         for (AffinityReadyFuture f : readyFuts.values())
             f.onDone(stopErr);
 
-        if (!cctx.kernalContext().clientNode())
-            for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
+        if (!cctx.kernalContext().clientNode()) {
+            for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++)
                 cctx.io().removeOrderedHandler(rebalanceTopic(cnt));
-            }
+        }
 
         U.cancel(exchWorker);
 
@@ -1338,7 +1337,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                                 if (marsR != null)
                                     try {
-                                        marsR.call();//Marshaller cache rebalancing launches in sync way.
+                                        marsR.call(); //Marshaller cache rebalancing launches in sync way.
                                     }
                                     catch (Exception ex) {
                                         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index b2bb8f1..cda392c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -146,7 +146,6 @@ public interface GridCachePreloader {
      */
     public void unwindUndeploys();
 
-
     /**
      * Handles Supply message.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index f6f49f6..392ad6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -555,7 +555,11 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
      * @return {@code True} if entry was not being used, passed the filter and could be removed.
      * @throws IgniteCheckedException If failed to remove from swap.
      */
-    public boolean clearInternal(GridCacheVersion ver, boolean swap, GridCacheObsoleteEntryExtras extras) throws IgniteCheckedException {
+    public boolean clearInternal(
+        GridCacheVersion ver,
+        boolean swap,
+        GridCacheObsoleteEntryExtras extras
+    ) throws IgniteCheckedException {
         boolean rmv = false;
 
         try {
@@ -820,4 +824,4 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
             return S.toString(ReaderId.class, this);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index b3c13a8..6a93b04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -475,10 +475,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
     /**
      * @param updateSeq Update sequence.
-     * @return Future for evict attempt.
      */
     void tryEvictAsync(boolean updateSeq) {
         if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
+            state.getReference() == RENTING && state.getStamp() == 0 &&
             state.compareAndSet(RENTING, EVICTED, 0, 0)) {
             if (log.isDebugEnabled())
                 log.debug("Evicted partition: " + this);
@@ -496,9 +496,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
             clearDeferredDeletes();
         }
-        else {
+        else
             cctx.preloader().evictPartitionAsync(this);
-        }
     }
 
     /**
@@ -514,7 +513,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     }
 
     /**
-     * @return {@code True} if entry has been transitioned to state EVICTED.
+     *
      */
     public void tryEvict() {
         if (state.getReference() != RENTING || state.getStamp() != 0 || groupReserved())

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index e99fa9d..53c3d90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -330,7 +330,8 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts != null ? parts.size() : 0, "super",
-            super.toString());
+        return S.toString(GridDhtPartitionDemandMessage.class, this,
+            "partCnt", parts != null ? parts.size() : 0,
+            "super", super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 0c41b8b..2c9b422 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -138,6 +138,7 @@ public class GridDhtPartitionDemander {
      * Start.
      */
     void start() {
+        // No-op.
     }
 
     /**
@@ -224,9 +225,11 @@ public class GridDhtPartitionDemander {
      */
     private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException {
         if (log.isDebugEnabled())
-            log.debug("Waiting for " + name + " cache rebalancing [cacheName=" + cctx.name() + ']');
+            log.debug("Waiting for another cache to start rebalancing [cacheName=" + cctx.name() +
+                ", waitCache=" + name + ']');
 
-        RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name).preloader().rebalanceFuture();
+        RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name)
+            .preloader().rebalanceFuture();
 
         if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) {
             if (!wFut.get()) {
@@ -326,8 +329,10 @@ public class GridDhtPartitionDemander {
     /**
      * @param fut Future.
      */
-    private boolean requestPartitions(RebalanceFuture fut,
-        GridDhtPreloaderAssignments assigns) throws IgniteCheckedException {
+    private boolean requestPartitions(
+        RebalanceFuture fut,
+        GridDhtPreloaderAssignments assigns
+    ) throws IgniteCheckedException {
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
             if (topologyChanged(fut))
                 return false;
@@ -414,7 +419,7 @@ public class GridDhtPartitionDemander {
      * @return String representation of partitions list.
      */
     private String partitionsList(Collection<Integer> c) {
-        LinkedList<Integer> s = new LinkedList<>(c);
+        List<Integer> s = new ArrayList<>(c);
 
         Collections.sort(s);
 
@@ -428,6 +433,7 @@ public class GridDhtPartitionDemander {
 
         while (sit.hasNext()) {
             int p = sit.next();
+
             if (start == -1) {
                 start = p;
                 prev = p;
@@ -465,7 +471,8 @@ public class GridDhtPartitionDemander {
     public void handleSupplyMessage(
         int idx,
         final UUID id,
-        final GridDhtPartitionSupplyMessageV2 supply) {
+        final GridDhtPartitionSupplyMessageV2 supply
+    ) {
         AffinityTopologyVersion topVer = supply.topologyVersion();
 
         final RebalanceFuture fut = rebalanceFut;
@@ -478,9 +485,8 @@ public class GridDhtPartitionDemander {
         if (!fut.isActual(supply.updateSequence())) // Current future have another update sequence.
             return; // Supple message based on another future.
 
-        if (topologyChanged(fut)) { // Topology already changed (for the future that supply message based on).
+        if (topologyChanged(fut)) // Topology already changed (for the future that supply message based on).
             return;
-        }
 
         if (log.isDebugEnabled())
             log.debug("Received supply message: " + supply);
@@ -525,6 +531,7 @@ public class GridDhtPartitionDemander {
 
                                     continue;
                                 }
+
                                 if (!preloadEntry(node, p, entry, topVer)) {
                                     if (log.isDebugEnabled())
                                         log.debug("Got entries for invalid partition during " +
@@ -568,9 +575,10 @@ public class GridDhtPartitionDemander {
             }
 
             // Only request partitions based on latest topology version.
-            for (Integer miss : supply.missed())
+            for (Integer miss : supply.missed()) {
                 if (cctx.affinity().localNode(miss, topVer))
                     fut.partitionMissed(id, miss);
+            }
 
             for (Integer miss : supply.missed())
                 fut.partitionDone(id, miss);

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 46a2675..c3472b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -80,7 +80,9 @@ import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
  * DHT cache preloader.
  */
 public class GridDhtPreloader extends GridCachePreloaderAdapter {
-    /** */
+    /**
+     *
+     */
     public static final IgniteProductVersion REBALANCING_VER_2_SINCE = IgniteProductVersion.fromString("1.5.0");
 
     /** Default preload resend timeout. */
@@ -115,7 +117,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         new ConcurrentHashMap8<>();
 
     /** */
-    private final Queue<GridDhtLocalPartition> partitionsToEvict = new ConcurrentLinkedDeque8<>();
+    private final ConcurrentLinkedDeque8<GridDhtLocalPartition> partitionsToEvict = new ConcurrentLinkedDeque8<>();
 
     /** */
     private final AtomicReference<Integer> partitionsEvictionOwning = new AtomicReference<>(0);
@@ -771,13 +773,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
         partitionsToEvict.add(part);
 
-        if (partitionsEvictionOwning.compareAndSet(0, 1)) {
+        if (partitionsEvictionOwning.get() == 0 && partitionsEvictionOwning.compareAndSet(0, 1)) {
             cctx.closures().callLocalSafe(new GPC<Boolean>() {
                 @Override public Boolean call() {
                     boolean firstRun = true;
 
                     while (true) {
-                        if (!firstRun && !partitionsEvictionOwning.compareAndSet(0, 1))
+                        if (!firstRun && !partitionsToEvict.isEmptyx() &&
+                            !partitionsEvictionOwning.compareAndSet(0, 1))
                             return false;
 
                         firstRun = false;
@@ -785,9 +788,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                         try {
                             GridDhtLocalPartition part = partitionsToEvict.poll();
 
-                            if (part == null) {
+                            if (part == null)
                                 return false;
-                            }
 
                             part.tryEvict();
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc0423df/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index b17588f..c866a1d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -232,6 +232,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
                 }
             }
         };
+
         Thread t2 = new Thread() {
             @Override public void run() {
                 while (!concurrentStartFinished) {
@@ -259,7 +260,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
         long spend = (System.currentTimeMillis() - start) / 1000;
 
-        log.info("Spend " + spend + " seconds to rebalance entries.");
+        info("Time to rebalance entries: " + spend);
     }
 
     /**
@@ -308,6 +309,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         }
     }
 
+    /**
+     *
+     */
     protected void checkSupplyContextMapIsEmpty() {
         for (Ignite g : G.allGrids()) {
             for (GridCacheAdapter c : ((IgniteEx)g).context().cache().internalCaches()) {
@@ -464,9 +468,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         log.info("Spend " + spend + " seconds to rebalance entries.");
     }
 
+    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 
         stopAllGrids();
     }
-}
\ No newline at end of file
+}