You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2018/04/11 17:35:09 UTC
[2/9] ignite git commit: IGNITE-4756 Print info about partition
distribution to log
IGNITE-4756 Print info about partition distribution to log
Signed-off-by: Anton Vinogradov <av...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a3eb1f5d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a3eb1f5d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a3eb1f5d
Branch: refs/heads/ignite-8201
Commit: a3eb1f5d753a38c4019440e1bf39d00bc6136455
Parents: 0e73fa2
Author: Vyacheslav Daradur <da...@gmail.com>
Authored: Wed Apr 11 14:41:29 2018 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Wed Apr 11 14:41:29 2018 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 7 +
.../affinity/GridAffinityAssignmentCache.java | 50 +++-
.../AffinityDistributionLoggingTest.java | 268 +++++++++++++++++++
.../testsuites/IgniteCacheTestSuite5.java | 9 +-
4 files changed, 327 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a3eb1f5d/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 9da123e..04eb425 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -857,6 +857,13 @@ public final class IgniteSystemProperties {
public static final String IGNITE_BPLUS_TREE_LOCK_RETRIES = "IGNITE_BPLUS_TREE_LOCK_RETRIES";
/**
+ * The threshold of uneven distribution above which partition distribution will be logged.
+ *
+ * The default is '50', that means: warn about nodes with 50+% difference.
+ */
+ public static final String IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD = "IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a3eb1f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 18edd02..b1899e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -34,13 +34,14 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.affinity.AffinityCentralizedFunction;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.cluster.NodeOrderComparator;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.NodeOrderComparator;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
@@ -53,7 +54,10 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.getFloat;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
/**
@@ -63,6 +67,9 @@ public class GridAffinityAssignmentCache {
/** Cleanup history size. */
private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 500);
+ /** Partition distribution. */
+ private final float partDistribution = getFloat(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, 50f);
+
/** Group name if specified or cache name. */
private final String cacheOrGrpName;
@@ -367,6 +374,9 @@ public class GridAffinityAssignmentCache {
idealAssignment = assignment;
+ if (ctx.cache().cacheMode(cacheOrGrpName) == PARTITIONED)
+ printDistributionIfThresholdExceeded(assignment, sorted.size());
+
if (hasBaseline) {
baselineTopology = discoCache.state().baselineTopology();
assert baselineAssignment != null;
@@ -418,6 +428,44 @@ public class GridAffinityAssignmentCache {
}
/**
+ * Calculates and logs partitions distribution if threshold of uneven distribution {@link #partDistribution} is exceeded.
+ *
+ * @param assignments Assignments to calculate partitions distribution.
+ * @param nodes Affinity nodes number.
+ * @see IgniteSystemProperties#IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD
+ */
+ private void printDistributionIfThresholdExceeded(List<List<ClusterNode>> assignments, int nodes) {
+ int locPrimaryCnt = 0;
+ int locBackupCnt = 0;
+
+ for (List<ClusterNode> assignment : assignments) {
+ for (int i = 0; i < assignment.size(); i++) {
+ ClusterNode node = assignment.get(i);
+
+ if (node.isLocal()) {
+ if (i == 0)
+ locPrimaryCnt++;
+ else
+ locBackupCnt++;
+ }
+ }
+ }
+
+ float expCnt = (float)partsCnt / nodes;
+
+ float deltaPrimary = Math.abs(1 - (float)locPrimaryCnt / expCnt) * 100;
+ float deltaBackup = Math.abs(1 - (float)locBackupCnt / (expCnt * backups)) * 100;
+
+ if (deltaPrimary > partDistribution || deltaBackup > partDistribution) {
+ log.info(String.format("Local node affinity assignment distribution is not ideal " +
+ "[cache=%s, expectedPrimary=%.2f, actualPrimary=%d, " +
+ "expectedBackups=%.2f, actualBackups=%d, warningThreshold=%.2f%%]",
+ cacheOrGrpName, expCnt, locPrimaryCnt,
+ expCnt * backups, locBackupCnt, partDistribution));
+ }
+ }
+
+ /**
* Copies previous affinity assignment when discovery event does not cause affinity assignment changes
* (e.g. client node joins on leaves).
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/a3eb1f5d/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityDistributionLoggingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityDistributionLoggingTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityDistributionLoggingTest.java
new file mode 100644
index 0000000..813c830
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityDistributionLoggingTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.testframework.GridStringLogger;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+
+/**
+ * Tests of partitions distribution logging.
+ *
+ * Tests based on using of affinity function which provides an even distribution of partitions between nodes.
+ *
+ * @see EvenDistributionAffinityFunction
+ */
+public class AffinityDistributionLoggingTest extends GridCommonAbstractTest {
+ /** Pattern to test. */
+ private static final String LOG_MESSAGE_PREFIX = "Local node affinity assignment distribution is not ideal ";
+
+ /** Partitions number. */
+ private int parts = 0;
+
+ /** Nodes number. */
+ private int nodes = 0;
+
+ /** Backups number. */
+ private int backups = 0;
+
+ /** For storing original value of system property. */
+ private String tempProp;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ tempProp = System.getProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ if (tempProp != null)
+ System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, tempProp);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ System.clearProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD);
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+ cacheCfg.setBackups(backups);
+ cacheCfg.setAffinity(new EvenDistributionAffinityFunction(parts));
+
+ cfg.setCacheConfiguration(cacheCfg);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception In case of an error.
+ */
+ public void test2PartitionsIdealDistributionIsNotLogged() throws Exception {
+ System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "0");
+
+ nodes = 2;
+ parts = 2;
+ backups = 1;
+
+ String testsLog = runAndGetExchangeLog();
+
+ assertFalse(testsLog.contains(LOG_MESSAGE_PREFIX));
+ }
+
+ /**
+ * @throws Exception In case of an error.
+ */
+ public void test120PartitionsIdeadDistributionIsNotLogged() throws Exception {
+ System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "0.0");
+
+ nodes = 3;
+ parts = 120;
+ backups = 2;
+
+ String testsLog = runAndGetExchangeLog();
+
+ assertFalse(testsLog.contains(LOG_MESSAGE_PREFIX));
+ }
+
+ /**
+ * @throws Exception In case of an error.
+ */
+ public void test5PartitionsNotIdealDistributionIsLogged() throws Exception {
+ System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "50.0");
+
+ nodes = 4;
+ parts = 5;
+ backups = 3;
+
+ String testsLog = runAndGetExchangeLog();
+
+ assertTrue(testsLog.contains(LOG_MESSAGE_PREFIX));
+ }
+
+ /**
+ * @throws Exception In case of an error.
+ */
+ public void test7PartitionsNotIdealDistributionSuppressedLogging() throws Exception {
+ System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "50.0");
+
+ nodes = 3;
+ parts = 7;
+ backups = 0;
+
+ String testsLog = runAndGetExchangeLog();
+
+ assertFalse(testsLog.contains(LOG_MESSAGE_PREFIX));
+ }
+
+ /**
+ * @throws Exception In case of an error.
+ */
+ public void test5PartitionsNotIdealDistributionSuppressedLogging() throws Exception {
+ System.setProperty(IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD, "65");
+
+ nodes = 4;
+ parts = 5;
+ backups = 3;
+
+ String testsLog = runAndGetExchangeLog();
+
+ assertFalse(testsLog.contains(LOG_MESSAGE_PREFIX));
+ }
+
+ /**
+ * Starts a specified number of Ignite nodes and log partition node exchange during a last node's startup.
+ *
+ * @return Log of latest partition map exchange.
+ * @throws Exception In case of an error.
+ */
+ private String runAndGetExchangeLog() throws Exception {
+ assert nodes > 1;
+
+ IgniteEx ignite = (IgniteEx)startGrids(nodes - 1);
+
+ awaitPartitionMapExchange();
+
+ GridCacheProcessor proc = ignite.context().cache();
+
+ GridCacheContext cctx = proc.context().cacheContext(CU.cacheId(DEFAULT_CACHE_NAME));
+
+ final GridStringLogger log = new GridStringLogger(false, this.log);
+
+ GridAffinityAssignmentCache aff = GridTestUtils.getFieldValue(cctx.affinity(), "aff");
+
+ GridTestUtils.setFieldValue(aff, "log", log);
+
+ startGrid(nodes);
+
+ awaitPartitionMapExchange();
+
+ return log.toString();
+ }
+
+ /**
+ * Affinity function for a partitioned cache which provides even distribution partitions between nodes in cluster.
+ */
+ private static class EvenDistributionAffinityFunction implements AffinityFunction {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Partitions number. */
+ private int parts;
+
+ /**
+ * @param parts Number of partitions for one cache.
+ */
+ private EvenDistributionAffinityFunction(int parts) {
+ this.parts = parts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ return parts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key) {
+ return key.hashCode() % parts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+ List<ClusterNode> nodes = new ArrayList<>(affCtx.currentTopologySnapshot());
+
+ nodes.sort(Comparator.comparing(o -> o.<String>attribute(ATTR_IGNITE_INSTANCE_NAME)));
+
+ List<List<ClusterNode>> res = new ArrayList<>(parts);
+
+ for (int i = 0; i < parts; i++) {
+ Set<ClusterNode> n0 = new LinkedHashSet<>();
+
+ n0.add(nodes.get(i % nodes.size()));
+
+ for (int j = 1; j <= affCtx.backups(); j++)
+ n0.add(nodes.get((i + j) % nodes.size()));
+
+ res.add(new ArrayList<>(n0));
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNode(UUID nodeId) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // No-op.
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a3eb1f5d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index 7c41e49..945a76c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
import org.apache.ignite.GridCacheAffinityBackupsSelfTest;
import org.apache.ignite.IgniteCacheAffinitySelfTest;
import org.apache.ignite.cache.affinity.AffinityClientNodeSelfTest;
+import org.apache.ignite.cache.affinity.AffinityDistributionLoggingTest;
import org.apache.ignite.cache.affinity.AffinityHistoryCleanupTest;
import org.apache.ignite.cache.affinity.local.LocalAffinityFunctionTest;
import org.apache.ignite.internal.GridCachePartitionExchangeManagerHistSizeTest;
@@ -35,13 +36,7 @@ import org.apache.ignite.internal.processors.cache.EntryVersionConsistencyReadTh
import org.apache.ignite.internal.processors.cache.IgniteCachePutStackOverflowSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheReadThroughEvictionsVariationsSuite;
import org.apache.ignite.internal.processors.cache.IgniteCacheStoreCollectionTest;
-import org.apache.ignite.internal.processors.cache.PartitionedAtomicCacheGetsDistributionTest;
-import org.apache.ignite.internal.processors.cache.PartitionedTransactionalOptimisticCacheGetsDistributionTest;
-import org.apache.ignite.internal.processors.cache.PartitionedTransactionalPessimisticCacheGetsDistributionTest;
import org.apache.ignite.internal.processors.cache.PartitionsExchangeOnDiscoveryHistoryOverflowTest;
-import org.apache.ignite.internal.processors.cache.ReplicatedAtomicCacheGetsDistributionTest;
-import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalOptimisticCacheGetsDistributionTest;
-import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessimisticCacheGetsDistributionTest;
import org.apache.ignite.internal.processors.cache.distributed.Cache64kPartitionsTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentNodeJoinValidationTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentTest;
@@ -95,6 +90,8 @@ public class IgniteCacheTestSuite5 extends TestSuite {
suite.addTestSuite(LocalAffinityFunctionTest.class);
suite.addTestSuite(AffinityHistoryCleanupTest.class);
+ suite.addTestSuite(AffinityDistributionLoggingTest.class);
+
suite.addTestSuite(IgniteCacheAtomicProtocolTest.class);
suite.addTestSuite(PartitionsExchangeOnDiscoveryHistoryOverflowTest.class);