You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/14 11:48:55 UTC

[08/11] ignite git commit: IGNITE-5890 Added estimated time to rebalance completion and time to rebalance start to MXBean - Fixes #2386.

IGNITE-5890 Added estimated time to rebalance completion and time to rebalance start to MXBean - Fixes #2386.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1e0d4a54
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1e0d4a54
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1e0d4a54

Branch: refs/heads/ignite-5872-5578
Commit: 1e0d4a542740dab6ab98b4e3b4df3a30563c3ceb
Parents: 199339e
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Mon Aug 14 12:12:46 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Aug 14 12:12:46 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/cache/CacheMetrics.java   |  10 ++
 .../cache/CacheClusterMetricsMXBeanImpl.java    |  10 ++
 .../cache/CacheLocalMetricsMXBeanImpl.java      |  10 ++
 .../processors/cache/CacheMetricsImpl.java      |  36 +++++-
 .../processors/cache/CacheMetricsSnapshot.java  |  18 +++
 .../dht/preloader/GridDhtPartitionDemander.java |  22 +++-
 .../dht/preloader/GridDhtPreloader.java         |  18 +--
 .../cache/CacheGroupsMetricsRebalanceTest.java  | 118 +++++++++++++++++++
 8 files changed, 224 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
index 0cff4a8..20ea692 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
@@ -506,6 +506,16 @@ public interface CacheMetrics {
     public long getRebalancingBytesRate();
 
     /**
+     * @return Estimated rebalancing finished time.
+     */
+    public long estimateRebalancingFinishTime();
+
+    /**
+     * @return Rebalancing start time.
+     */
+    public long rebalancingStartTime();
+
+    /**
      * Checks whether statistics collection is enabled in this cache.
      * <p>
      * The default value is {@code false}.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
index 266c577..df4a6ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
@@ -382,4 +382,14 @@ class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean {
     @Override public long getRebalancingBytesRate() {
         return cache.clusterMetrics().getRebalancingBytesRate();
     }
+
+    /** {@inheritDoc} */
+    @Override public long estimateRebalancingFinishTime() {
+        return cache.clusterMetrics().estimateRebalancingFinishTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long rebalancingStartTime() {
+        return cache.clusterMetrics().rebalancingStartTime();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
index f363bfe..a767193 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
@@ -382,4 +382,14 @@ class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean {
     @Override public long getRebalancingBytesRate() {
         return cache.metrics0().getRebalancingBytesRate();
     }
+
+    /** {@inheritDoc} */
+    @Override public long estimateRebalancingFinishTime() {
+        return cache.metrics0().estimateRebalancingFinishTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long rebalancingStartTime() {
+        return cache.metrics0().rebalancingStartTime();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 6a8ae0b..d03a6f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics;
 import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Adapter for cache metrics.
@@ -108,6 +109,9 @@ public class CacheMetricsImpl implements CacheMetrics {
     /** Total rebalanced bytes count. */
     private AtomicLong totalRebalancedBytes = new AtomicLong();
 
+    /** Rebalanced start time. */
+    private AtomicLong rebalanceStartTime = new AtomicLong(-1L);
+
     /** Estimated rebalancing keys count. */
     private AtomicLong estimatedRebalancingKeys = new AtomicLong();
 
@@ -734,7 +738,7 @@ public class CacheMetricsImpl implements CacheMetrics {
     }
 
     /** {@inheritDoc} */
-    public int getTotalPartitionsCount() {
+    @Override public int getTotalPartitionsCount() {
         int res = 0;
 
         if (cctx.isLocal())
@@ -749,7 +753,7 @@ public class CacheMetricsImpl implements CacheMetrics {
     }
 
     /** {@inheritDoc} */
-    public int getRebalancingPartitionsCount() {
+    @Override public int getRebalancingPartitionsCount() {
         int res = 0;
 
         if (cctx.isLocal())
@@ -764,17 +768,17 @@ public class CacheMetricsImpl implements CacheMetrics {
     }
 
     /** {@inheritDoc} */
-    public long getKeysToRebalanceLeft() {
+    @Override public long getKeysToRebalanceLeft() {
         return Math.max(0, estimatedRebalancingKeys.get() - rebalancedKeys.get());
     }
 
     /** {@inheritDoc} */
-    public long getRebalancingKeysRate() {
+    @Override public long getRebalancingKeysRate() {
         return rebalancingKeysRate.getRate();
     }
 
     /** {@inheritDoc} */
-    public long getRebalancingBytesRate() {
+    @Override public long getRebalancingBytesRate() {
         return rebalancingBytesRate.getRate();
     }
 
@@ -791,6 +795,28 @@ public class CacheMetricsImpl implements CacheMetrics {
         rebalancingBytesRate.clear();
 
         rebalancingKeysRate.clear();
+
+        rebalanceStartTime.set(-1L);
+    }
+
+    /**
+     *
+     */
+    public void startRebalance(long delay){
+        rebalanceStartTime.addAndGet(delay + U.currentTimeMillis());
+    }
+
+    /** {@inheritDoc} */
+    @Override public long estimateRebalancingFinishTime() {
+        long rate = rebalancingKeysRate.getRate();
+
+        return rate <= 0 ? -1L :
+            ((getKeysToRebalanceLeft() / rate) * REBALANCE_RATE_INTERVAL) + U.currentTimeMillis();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long rebalancingStartTime() {
+        return rebalanceStartTime.get();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
index e9141c6..2d38db8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
@@ -203,6 +203,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
     /** Get rebalancing bytes rate. */
     private long rebalancingBytesRate;
 
+    /** Start rebalance time. */
+    private long rebalanceStartTime;
+
+    /** Estimate rebalance finish time. */
+    private long rebalanceFinishTime;
+
     /** */
     private String keyType;
 
@@ -307,6 +313,8 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
         keysToRebalanceLeft = m.getKeysToRebalanceLeft();
         rebalancingBytesRate = m.getRebalancingBytesRate();
         rebalancingKeysRate = m.getRebalancingKeysRate();
+        rebalanceStartTime = m.rebalancingStartTime();
+        rebalanceFinishTime = m.estimateRebalancingFinishTime();
     }
 
     /**
@@ -716,6 +724,16 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Override public long estimateRebalancingFinishTime() {
+        return rebalanceFinishTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long rebalancingStartTime() {
+        return rebalanceStartTime;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean isWriteBehindEnabled() {
         return isWriteBehindEnabled;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 248b739..2258187 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -257,11 +257,13 @@ public class GridDhtPartitionDemander {
      * @param forcedRebFut External future for forced rebalance.
      * @return Rebalancing runnable.
      */
-    Runnable addAssignments(final GridDhtPreloaderAssignments assigns,
+    Runnable addAssignments(
+        final GridDhtPreloaderAssignments assigns,
         boolean force,
         int cnt,
         final Runnable next,
-        @Nullable final GridCompoundFuture<Boolean, Boolean> forcedRebFut) {
+        @Nullable final GridCompoundFuture<Boolean, Boolean> forcedRebFut
+    ) {
         if (log.isDebugEnabled())
             log.debug("Adding partition assignments: " + assigns);
 
@@ -289,14 +291,14 @@ public class GridDhtPartitionDemander {
 
             rebalanceFut = fut;
 
-            fut.sendRebalanceStartedEvent();
-
-            for (GridCacheContext cctx : grp.caches()) {
+            for (final GridCacheContext cctx : grp.caches()) {
                 if (cctx.config().isStatisticsEnabled()) {
                     final CacheMetricsImpl metrics = cctx.cache().metrics0();
 
                     metrics.clearRebalanceCounters();
 
+                    metrics.startRebalance(0);
+
                     rebalanceFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
                         @Override public void apply(IgniteInternalFuture<Boolean> fut) {
                             metrics.clearRebalanceCounters();
@@ -305,6 +307,8 @@ public class GridDhtPartitionDemander {
                 }
             }
 
+            fut.sendRebalanceStartedEvent();
+
             if (assigns.cancelled()) { // Pending exchange.
                 if (log.isDebugEnabled())
                     log.debug("Rebalancing skipped due to cancelled assignments.");
@@ -350,6 +354,14 @@ public class GridDhtPartitionDemander {
             };
         }
         else if (delay > 0) {
+            for (GridCacheContext cctx : grp.caches()) {
+                if (cctx.config().isStatisticsEnabled()) {
+                    final CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+                    metrics.startRebalance(delay);
+                }
+            }
+
             GridTimeoutObject obj = lastTimeoutObj.get();
 
             if (obj != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 7efd4aa..305da92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -306,12 +306,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
                         GridDhtPartitionDemandMessage msg = assigns.get(n);
 
-                    if (msg == null) {
-                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
-                            top.updateSequence(),
-                            exchId.topologyVersion(),
-                            grp.groupId()));
-                    }
+                        if (msg == null) {
+                            assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+                                top.updateSequence(),
+                                exchId.topologyVersion(),
+                                grp.groupId()));
+                        }
 
                         msg.addPartition(p, false);
                     }
@@ -396,11 +396,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
+    @Override public Runnable addAssignments(
+        GridDhtPreloaderAssignments assignments,
         boolean forceRebalance,
         int cnt,
         Runnable next,
-        @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut) {
+        @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut
+    ) {
         return demander.addAssignments(assignments, forceRebalance, cnt, next, forcedRebFut);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
index c15fa5f..a1a855a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
@@ -21,20 +21,28 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_REBALANCE_STATISTICS_TIME_INTERVAL;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
 /**
  *
  */
@@ -71,6 +79,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
             .setCacheMode(CacheMode.PARTITIONED)
             .setAtomicityMode(CacheAtomicityMode.ATOMIC)
             .setRebalanceMode(CacheRebalanceMode.ASYNC)
+            .setRebalanceBatchSize(100)
             .setStatisticsEnabled(true);
 
         CacheConfiguration cfg2 = new CacheConfiguration(cfg1)
@@ -137,4 +146,113 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
 
         assertTrue(ratio > 40 && ratio < 60);
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRebalanceEstimateFinishTime() throws Exception {
+        System.setProperty(IGNITE_REBALANCE_STATISTICS_TIME_INTERVAL, String.valueOf(1000));
+
+        Ignite ig1 = startGrid(1);
+
+        final int KEYS = 4_000_000;
+
+        IgniteCache<Object, Object> cache1 = ig1.cache(CACHE1);
+
+        try (IgniteDataStreamer<Integer, String> st = ig1.dataStreamer(CACHE1)) {
+            for (int i = 0; i < KEYS; i++)
+                st.addData(i, CACHE1 + "-" + i);
+        }
+
+        final CountDownLatch finishRebalanceLatch = new CountDownLatch(1);
+
+        final Ignite ig2 = startGrid(2);
+
+        ig2.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                CacheRebalancingEvent rebEvent = (CacheRebalancingEvent)evt;
+
+                if (rebEvent.cacheName().equals(CACHE1)) {
+                    System.out.println("CountDown rebalance stop latch:" + rebEvent.cacheName());
+
+                    finishRebalanceLatch.countDown();
+                }
+
+                return false;
+            }
+        }, EventType.EVT_CACHE_REBALANCE_STOPPED);
+
+        waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return ig2.cache(CACHE1).localMetrics().rebalancingStartTime() != -1L;
+            }
+        }, 5_000);
+
+        CacheMetrics metrics = ig2.cache(CACHE1).localMetrics();
+
+        long startTime = metrics.rebalancingStartTime();
+
+        assertTrue(startTime > 0);
+        assertTrue((U.currentTimeMillis() - startTime) < 5000);
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        runAsync(new Runnable() {
+            @Override public void run() {
+                // Waiting 25% keys will be rebalanced.
+                int partKeys = KEYS / 2;
+
+                final long keysLine = (long)(partKeys - (partKeys * 0.25));
+
+                System.out.println("Wait until keys left will be less " + keysLine);
+
+                while (finishRebalanceLatch.getCount() != 0) {
+                    CacheMetrics m = ig2.cache(CACHE1).localMetrics();
+
+                    long keyLeft = m.getKeysToRebalanceLeft();
+
+                    if (keyLeft > 0 && keyLeft < keysLine)
+                        latch.countDown();
+
+                    System.out.println("Keys left: " + m.getKeysToRebalanceLeft());
+
+                    try {
+                        Thread.sleep(1_000);
+                    }
+                    catch (InterruptedException e) {
+                        System.out.println("Interrupt thread: " + e.getMessage());
+
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            }
+        });
+
+        latch.await();
+
+        long finishTime = ig2.cache(CACHE1).localMetrics().estimateRebalancingFinishTime();
+
+        assertTrue(finishTime > 0);
+
+        long timePassed = U.currentTimeMillis() - startTime;
+        long timeLeft = finishTime - System.currentTimeMillis();
+
+        assertTrue(finishRebalanceLatch.await(timeLeft + 2_000, TimeUnit.SECONDS));
+
+        System.out.println(
+            "TimePassed:" + timePassed +
+                "\nTimeLeft:" + timeLeft +
+                "\nTime to rebalance: " + (finishTime - startTime) +
+                "\nStartTime: " + startTime +
+                "\nFinishTime: " + finishTime
+        );
+
+        System.clearProperty(IGNITE_REBALANCE_STATISTICS_TIME_INTERVAL);
+
+        System.out.println("Rebalance time:" + (U.currentTimeMillis() - startTime));
+
+        long diff = finishTime - U.currentTimeMillis();
+
+        assertTrue("Expected less 5000, Actual:" + diff, Math.abs(diff) < 10_000);
+    }
 }