You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/21 08:40:11 UTC
[02/50] [abbrv] ignite git commit: IGNITE-5890 Added estimated time
to rebalance completion and time to rebalance start to MXBean - Fixes #2386.
IGNITE-5890 Added estimated time to rebalance completion and time to rebalance start to MXBean - Fixes #2386.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1e0d4a54
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1e0d4a54
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1e0d4a54
Branch: refs/heads/ignite-5578
Commit: 1e0d4a542740dab6ab98b4e3b4df3a30563c3ceb
Parents: 199339e
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Mon Aug 14 12:12:46 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Aug 14 12:12:46 2017 +0300
----------------------------------------------------------------------
.../org/apache/ignite/cache/CacheMetrics.java | 10 ++
.../cache/CacheClusterMetricsMXBeanImpl.java | 10 ++
.../cache/CacheLocalMetricsMXBeanImpl.java | 10 ++
.../processors/cache/CacheMetricsImpl.java | 36 +++++-
.../processors/cache/CacheMetricsSnapshot.java | 18 +++
.../dht/preloader/GridDhtPartitionDemander.java | 22 +++-
.../dht/preloader/GridDhtPreloader.java | 18 +--
.../cache/CacheGroupsMetricsRebalanceTest.java | 118 +++++++++++++++++++
8 files changed, 224 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
index 0cff4a8..20ea692 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
@@ -506,6 +506,16 @@ public interface CacheMetrics {
public long getRebalancingBytesRate();
/**
+ * @return Estimated rebalancing finished time.
+ */
+ public long estimateRebalancingFinishTime();
+
+ /**
+ * @return Rebalancing start time.
+ */
+ public long rebalancingStartTime();
+
+ /**
* Checks whether statistics collection is enabled in this cache.
* <p>
* The default value is {@code false}.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
index 266c577..df4a6ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
@@ -382,4 +382,14 @@ class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean {
@Override public long getRebalancingBytesRate() {
return cache.clusterMetrics().getRebalancingBytesRate();
}
+
+ /** {@inheritDoc} */
+ @Override public long estimateRebalancingFinishTime() {
+ return cache.clusterMetrics().estimateRebalancingFinishTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long rebalancingStartTime() {
+ return cache.clusterMetrics().rebalancingStartTime();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
index f363bfe..a767193 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
@@ -382,4 +382,14 @@ class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean {
@Override public long getRebalancingBytesRate() {
return cache.metrics0().getRebalancingBytesRate();
}
+
+ /** {@inheritDoc} */
+ @Override public long estimateRebalancingFinishTime() {
+ return cache.metrics0().estimateRebalancingFinishTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long rebalancingStartTime() {
+ return cache.metrics0().rebalancingStartTime();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 6a8ae0b..d03a6f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics;
import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Adapter for cache metrics.
@@ -108,6 +109,9 @@ public class CacheMetricsImpl implements CacheMetrics {
/** Total rebalanced bytes count. */
private AtomicLong totalRebalancedBytes = new AtomicLong();
+ /** Rebalanced start time. */
+ private AtomicLong rebalanceStartTime = new AtomicLong(-1L);
+
/** Estimated rebalancing keys count. */
private AtomicLong estimatedRebalancingKeys = new AtomicLong();
@@ -734,7 +738,7 @@ public class CacheMetricsImpl implements CacheMetrics {
}
/** {@inheritDoc} */
- public int getTotalPartitionsCount() {
+ @Override public int getTotalPartitionsCount() {
int res = 0;
if (cctx.isLocal())
@@ -749,7 +753,7 @@ public class CacheMetricsImpl implements CacheMetrics {
}
/** {@inheritDoc} */
- public int getRebalancingPartitionsCount() {
+ @Override public int getRebalancingPartitionsCount() {
int res = 0;
if (cctx.isLocal())
@@ -764,17 +768,17 @@ public class CacheMetricsImpl implements CacheMetrics {
}
/** {@inheritDoc} */
- public long getKeysToRebalanceLeft() {
+ @Override public long getKeysToRebalanceLeft() {
return Math.max(0, estimatedRebalancingKeys.get() - rebalancedKeys.get());
}
/** {@inheritDoc} */
- public long getRebalancingKeysRate() {
+ @Override public long getRebalancingKeysRate() {
return rebalancingKeysRate.getRate();
}
/** {@inheritDoc} */
- public long getRebalancingBytesRate() {
+ @Override public long getRebalancingBytesRate() {
return rebalancingBytesRate.getRate();
}
@@ -791,6 +795,28 @@ public class CacheMetricsImpl implements CacheMetrics {
rebalancingBytesRate.clear();
rebalancingKeysRate.clear();
+
+ rebalanceStartTime.set(-1L);
+ }
+
+ /**
+ *
+ */
+ public void startRebalance(long delay){
+ rebalanceStartTime.addAndGet(delay + U.currentTimeMillis());
+ }
+
+ /** {@inheritDoc} */
+ @Override public long estimateRebalancingFinishTime() {
+ long rate = rebalancingKeysRate.getRate();
+
+ return rate <= 0 ? -1L :
+ ((getKeysToRebalanceLeft() / rate) * REBALANCE_RATE_INTERVAL) + U.currentTimeMillis();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long rebalancingStartTime() {
+ return rebalanceStartTime.get();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
index e9141c6..2d38db8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
@@ -203,6 +203,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** Get rebalancing bytes rate. */
private long rebalancingBytesRate;
+ /** Start rebalance time. */
+ private long rebalanceStartTime;
+
+ /** Estimate rebalance finish time. */
+ private long rebalanceFinishTime;
+
/** */
private String keyType;
@@ -307,6 +313,8 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
keysToRebalanceLeft = m.getKeysToRebalanceLeft();
rebalancingBytesRate = m.getRebalancingBytesRate();
rebalancingKeysRate = m.getRebalancingKeysRate();
+ rebalanceStartTime = m.rebalancingStartTime();
+ rebalanceFinishTime = m.estimateRebalancingFinishTime();
}
/**
@@ -716,6 +724,16 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
}
/** {@inheritDoc} */
+ @Override public long estimateRebalancingFinishTime() {
+ return rebalanceFinishTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long rebalancingStartTime() {
+ return rebalanceStartTime;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean isWriteBehindEnabled() {
return isWriteBehindEnabled;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 248b739..2258187 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -257,11 +257,13 @@ public class GridDhtPartitionDemander {
* @param forcedRebFut External future for forced rebalance.
* @return Rebalancing runnable.
*/
- Runnable addAssignments(final GridDhtPreloaderAssignments assigns,
+ Runnable addAssignments(
+ final GridDhtPreloaderAssignments assigns,
boolean force,
int cnt,
final Runnable next,
- @Nullable final GridCompoundFuture<Boolean, Boolean> forcedRebFut) {
+ @Nullable final GridCompoundFuture<Boolean, Boolean> forcedRebFut
+ ) {
if (log.isDebugEnabled())
log.debug("Adding partition assignments: " + assigns);
@@ -289,14 +291,14 @@ public class GridDhtPartitionDemander {
rebalanceFut = fut;
- fut.sendRebalanceStartedEvent();
-
- for (GridCacheContext cctx : grp.caches()) {
+ for (final GridCacheContext cctx : grp.caches()) {
if (cctx.config().isStatisticsEnabled()) {
final CacheMetricsImpl metrics = cctx.cache().metrics0();
metrics.clearRebalanceCounters();
+ metrics.startRebalance(0);
+
rebalanceFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> fut) {
metrics.clearRebalanceCounters();
@@ -305,6 +307,8 @@ public class GridDhtPartitionDemander {
}
}
+ fut.sendRebalanceStartedEvent();
+
if (assigns.cancelled()) { // Pending exchange.
if (log.isDebugEnabled())
log.debug("Rebalancing skipped due to cancelled assignments.");
@@ -350,6 +354,14 @@ public class GridDhtPartitionDemander {
};
}
else if (delay > 0) {
+ for (GridCacheContext cctx : grp.caches()) {
+ if (cctx.config().isStatisticsEnabled()) {
+ final CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+ metrics.startRebalance(delay);
+ }
+ }
+
GridTimeoutObject obj = lastTimeoutObj.get();
if (obj != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 7efd4aa..305da92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -306,12 +306,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
GridDhtPartitionDemandMessage msg = assigns.get(n);
- if (msg == null) {
- assigns.put(n, msg = new GridDhtPartitionDemandMessage(
- top.updateSequence(),
- exchId.topologyVersion(),
- grp.groupId()));
- }
+ if (msg == null) {
+ assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+ top.updateSequence(),
+ exchId.topologyVersion(),
+ grp.groupId()));
+ }
msg.addPartition(p, false);
}
@@ -396,11 +396,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
+ @Override public Runnable addAssignments(
+ GridDhtPreloaderAssignments assignments,
boolean forceRebalance,
int cnt,
Runnable next,
- @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut) {
+ @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut
+ ) {
return demander.addAssignments(assignments, forceRebalance, cnt, next, forcedRebFut);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
index c15fa5f..a1a855a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
@@ -21,20 +21,28 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_REBALANCE_STATISTICS_TIME_INTERVAL;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
/**
*
*/
@@ -71,6 +79,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
.setCacheMode(CacheMode.PARTITIONED)
.setAtomicityMode(CacheAtomicityMode.ATOMIC)
.setRebalanceMode(CacheRebalanceMode.ASYNC)
+ .setRebalanceBatchSize(100)
.setStatisticsEnabled(true);
CacheConfiguration cfg2 = new CacheConfiguration(cfg1)
@@ -137,4 +146,113 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
assertTrue(ratio > 40 && ratio < 60);
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRebalanceEstimateFinishTime() throws Exception {
+ System.setProperty(IGNITE_REBALANCE_STATISTICS_TIME_INTERVAL, String.valueOf(1000));
+
+ Ignite ig1 = startGrid(1);
+
+ final int KEYS = 4_000_000;
+
+ IgniteCache<Object, Object> cache1 = ig1.cache(CACHE1);
+
+ try (IgniteDataStreamer<Integer, String> st = ig1.dataStreamer(CACHE1)) {
+ for (int i = 0; i < KEYS; i++)
+ st.addData(i, CACHE1 + "-" + i);
+ }
+
+ final CountDownLatch finishRebalanceLatch = new CountDownLatch(1);
+
+ final Ignite ig2 = startGrid(2);
+
+ ig2.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ CacheRebalancingEvent rebEvent = (CacheRebalancingEvent)evt;
+
+ if (rebEvent.cacheName().equals(CACHE1)) {
+ System.out.println("CountDown rebalance stop latch:" + rebEvent.cacheName());
+
+ finishRebalanceLatch.countDown();
+ }
+
+ return false;
+ }
+ }, EventType.EVT_CACHE_REBALANCE_STOPPED);
+
+ waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return ig2.cache(CACHE1).localMetrics().rebalancingStartTime() != -1L;
+ }
+ }, 5_000);
+
+ CacheMetrics metrics = ig2.cache(CACHE1).localMetrics();
+
+ long startTime = metrics.rebalancingStartTime();
+
+ assertTrue(startTime > 0);
+ assertTrue((U.currentTimeMillis() - startTime) < 5000);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ runAsync(new Runnable() {
+ @Override public void run() {
+ // Waiting 25% keys will be rebalanced.
+ int partKeys = KEYS / 2;
+
+ final long keysLine = (long)(partKeys - (partKeys * 0.25));
+
+ System.out.println("Wait until keys left will be less " + keysLine);
+
+ while (finishRebalanceLatch.getCount() != 0) {
+ CacheMetrics m = ig2.cache(CACHE1).localMetrics();
+
+ long keyLeft = m.getKeysToRebalanceLeft();
+
+ if (keyLeft > 0 && keyLeft < keysLine)
+ latch.countDown();
+
+ System.out.println("Keys left: " + m.getKeysToRebalanceLeft());
+
+ try {
+ Thread.sleep(1_000);
+ }
+ catch (InterruptedException e) {
+ System.out.println("Interrupt thread: " + e.getMessage());
+
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ });
+
+ latch.await();
+
+ long finishTime = ig2.cache(CACHE1).localMetrics().estimateRebalancingFinishTime();
+
+ assertTrue(finishTime > 0);
+
+ long timePassed = U.currentTimeMillis() - startTime;
+ long timeLeft = finishTime - System.currentTimeMillis();
+
+ assertTrue(finishRebalanceLatch.await(timeLeft + 2_000, TimeUnit.SECONDS));
+
+ System.out.println(
+ "TimePassed:" + timePassed +
+ "\nTimeLeft:" + timeLeft +
+ "\nTime to rebalance: " + (finishTime - startTime) +
+ "\nStartTime: " + startTime +
+ "\nFinishTime: " + finishTime
+ );
+
+ System.clearProperty(IGNITE_REBALANCE_STATISTICS_TIME_INTERVAL);
+
+ System.out.println("Rebalance time:" + (U.currentTimeMillis() - startTime));
+
+ long diff = finishTime - U.currentTimeMillis();
+
+ assertTrue("Expected less 5000, Actual:" + diff, Math.abs(diff) < 10_000);
+ }
}