You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2020/06/22 08:52:42 UTC
[ignite] branch master updated: IGNITE-10708 Add partition states
system view (#7932)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 32d3baa IGNITE-10708 Add partition states system view (#7932)
32d3baa is described below
commit 32d3baa50d3337087e806d3387521359f5618d18
Author: Aleksey Plekhanov <Pl...@gmail.com>
AuthorDate: Mon Jun 22 13:52:20 2020 +0500
IGNITE-10708 Add partition states system view (#7932)
---
.../internal/jdbc2/JdbcMetadataSelfTest.java | 3 +-
.../ignite/jdbc/thin/JdbcThinMetadataSelfTest.java | 12 +-
.../SystemViewRowAttributeWalkerGenerator.java | 2 +
.../walker/PartitionStateViewWalker.java | 75 ++++++++
.../affinity/GridAffinityAssignmentCache.java | 7 +
.../processors/cache/GridCacheProcessor.java | 90 ++++++++--
.../spi/systemview/view/PartitionStateView.java | 91 ++++++++++
.../cache/metric/SqlViewExporterSpiTest.java | 196 ++++++++++++++++++++-
8 files changed, 455 insertions(+), 21 deletions(-)
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java
index 0570be5..aa1237f 100755
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java
@@ -344,7 +344,8 @@ public class JdbcMetadataSelfTest extends GridCommonAbstractTest {
"CONTINUOUS_QUERIES",
"STRIPED_THREADPOOL_QUEUE",
"DATASTREAM_THREADPOOL_QUEUE",
- "CACHE_GROUP_PAGE_LISTS"
+ "CACHE_GROUP_PAGE_LISTS",
+ "PARTITION_STATES"
));
Set<String> actViews = new HashSet<>();
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
index a2c1eff..4fba9eb 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
@@ -442,7 +442,8 @@ public class JdbcThinMetadataSelfTest extends JdbcThinAbstractSelfTest {
"SYS.STRIPED_THREADPOOL_QUEUE",
"SYS.DATASTREAM_THREADPOOL_QUEUE",
"SYS.CACHE_GROUP_PAGE_LISTS",
- "SYS.DATA_REGION_PAGE_LISTS"
+ "SYS.DATA_REGION_PAGE_LISTS",
+ "SYS.PARTITION_STATES"
))
);
}
@@ -982,8 +983,13 @@ public class JdbcThinMetadataSelfTest extends JdbcThinAbstractSelfTest {
"SYS.DATA_REGION_PAGE_LISTS.BUCKET_NUMBER.null.10",
"SYS.DATA_REGION_PAGE_LISTS.BUCKET_SIZE.null.19",
"SYS.DATA_REGION_PAGE_LISTS.STRIPES_COUNT.null.10",
- "SYS.DATA_REGION_PAGE_LISTS.CACHED_PAGES_COUNT.null.10"
- ));
+ "SYS.DATA_REGION_PAGE_LISTS.CACHED_PAGES_COUNT.null.10",
+ "SYS.PARTITION_STATES.CACHE_GROUP_ID.null.10",
+ "SYS.PARTITION_STATES.PARTITION_ID.null.10",
+ "SYS.PARTITION_STATES.NODE_ID.null.2147483647",
+ "SYS.PARTITION_STATES.STATE.null.2147483647",
+ "SYS.PARTITION_STATES.IS_PRIMARY.null.1"
+ ));
Assert.assertEquals(expectedCols, actualSystemCols);
}
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
index 19c66a8..6b2379f 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
@@ -46,6 +46,7 @@ import org.apache.ignite.spi.systemview.view.ClientConnectionView;
import org.apache.ignite.spi.systemview.view.ClusterNodeView;
import org.apache.ignite.spi.systemview.view.ComputeTaskView;
import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.PartitionStateView;
import org.apache.ignite.spi.systemview.view.ScanQueryView;
import org.apache.ignite.spi.systemview.view.ServiceView;
import org.apache.ignite.spi.systemview.view.SqlIndexView;
@@ -104,6 +105,7 @@ public class SystemViewRowAttributeWalkerGenerator {
gen.generateAndWrite(StripedExecutorTaskView.class, DFLT_SRC_DIR);
gen.generateAndWrite(PagesListView.class, DFLT_SRC_DIR);
gen.generateAndWrite(CachePagesListView.class, DFLT_SRC_DIR);
+ gen.generateAndWrite(PartitionStateView.class, DFLT_SRC_DIR);
gen.generateAndWrite(SqlSchemaView.class, INDEXING_SRC_DIR);
gen.generateAndWrite(SqlTableView.class, INDEXING_SRC_DIR);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/PartitionStateViewWalker.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/PartitionStateViewWalker.java
new file mode 100644
index 0000000..c2a9c5c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/PartitionStateViewWalker.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.systemview.walker;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.systemview.view.PartitionStateView;
+import org.apache.ignite.spi.systemview.view.SystemViewRowAttributeWalker;
+
+/**
+ * Generated by {@code org.apache.ignite.codegen.SystemViewRowAttributeWalkerGenerator}.
+ * {@link PartitionStateView} attributes walker.
+ *
+ * @see PartitionStateView
+ */
+public class PartitionStateViewWalker implements SystemViewRowAttributeWalker<PartitionStateView> {
+ /** Filter key for attribute "cacheGroupId" */
+ public static final String CACHE_GROUP_ID_FILTER = "cacheGroupId";
+
+ /** Filter key for attribute "nodeId" */
+ public static final String NODE_ID_FILTER = "nodeId";
+
+ /** Filter key for attribute "partitionId" */
+ public static final String PARTITION_ID_FILTER = "partitionId";
+
+ /** List of filtrable attributes. */
+ private static final List<String> FILTRABLE_ATTRS = Collections.unmodifiableList(F.asList(
+ "cacheGroupId", "nodeId", "partitionId"
+ ));
+
+ /** {@inheritDoc} */
+ @Override public List<String> filtrableAttributes() {
+ return FILTRABLE_ATTRS;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitAll(AttributeVisitor v) {
+ v.accept(0, "cacheGroupId", int.class);
+ v.accept(1, "nodeId", UUID.class);
+ v.accept(2, "partitionId", int.class);
+ v.accept(3, "state", String.class);
+ v.accept(4, "isPrimary", boolean.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void visitAll(PartitionStateView row, AttributeWithValueVisitor v) {
+ v.acceptInt(0, "cacheGroupId", row.cacheGroupId());
+ v.accept(1, "nodeId", UUID.class, row.nodeId());
+ v.acceptInt(2, "partitionId", row.partitionId());
+ v.accept(3, "state", String.class, row.state());
+ v.acceptBoolean(4, "isPrimary", row.isPrimary());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int count() {
+ return 5;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 1c8db77..533f586 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -597,6 +597,13 @@ public class GridAffinityAssignmentCache {
}
/**
+ * @return Last initialized affinity assignment.
+ */
+ public AffinityAssignment lastReadyAffinity() {
+ return head.get();
+ }
+
+ /**
* @param topVer Topology version.
* @return Affinity assignment.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index d994988..aff0dc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.systemview.walker.CachePagesListViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.PartitionStateViewWalker;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -181,6 +182,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.apache.ignite.spi.systemview.view.CachePagesListView;
+import org.apache.ignite.spi.systemview.view.PartitionStateView;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -224,6 +226,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** System view description for page lists. */
public static final String CACHE_GRP_PAGE_LIST_VIEW_DESC = "Cache group page lists";
+ /** System view name for partition states. */
+ public static final String PART_STATES_VIEW = "partitionStates";
+
+ /** System view description for partition states. */
+ public static final String PART_STATES_VIEW_DESC = "Distribution of cache group partitions across cluster nodes";
+
/** Enables start caches in parallel. */
private final boolean IGNITE_ALLOW_START_CACHES_IN_PARALLEL =
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, true);
@@ -595,6 +603,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
this::pagesListViewSupplier,
Function.identity()
);
+
+ ctx.systemView().registerFiltrableView(
+ PART_STATES_VIEW,
+ PART_STATES_VIEW_DESC,
+ new PartitionStateViewWalker(),
+ this::partStatesViewSupplier,
+ Function.identity()
+ );
}
/**
@@ -5263,25 +5279,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*/
private Iterable<CachePagesListView> pagesListViewSupplier(Map<String, Object> filter) {
Integer cacheGrpId = (Integer)filter.get(CachePagesListViewWalker.CACHE_GROUP_ID_FILTER);
-
- Collection<CacheGroupContext> cacheGrps;
-
- if (cacheGrpId != null) {
- CacheGroupContext cacheGrp = this.cacheGrps.get(cacheGrpId);
-
- if (cacheGrp == null)
- return Collections.emptyList();
-
- cacheGrps = Collections.singletonList(cacheGrp);
- }
- else
- cacheGrps = this.cacheGrps.values();
-
Integer partId = (Integer)filter.get(CachePagesListViewWalker.PARTITION_ID_FILTER);
Integer bucketNum = (Integer)filter.get(CachePagesListViewWalker.BUCKET_NUMBER_FILTER);
- Iterable<IgniteCacheOffheapManager.CacheDataStore> dataStores =
- F.flat(F.iterator(cacheGrps, grp -> grp.offheap().cacheDataStores(), true));
+ Iterable<IgniteCacheOffheapManager.CacheDataStore> dataStores = F.flat(F.iterator(
+ filteredMap(cacheGrps, cacheGrpId).values(), grp -> grp.offheap().cacheDataStores(), true));
return F.flat(F.iterator(dataStores, dataStore -> {
RowStore rowStore = dataStore.rowStore();
@@ -5304,6 +5306,62 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * Partition states view supplier.
+ *
+ * @param filter Filter.
+ */
+ private Iterable<PartitionStateView> partStatesViewSupplier(Map<String, Object> filter) {
+ Integer cacheGrpId = (Integer)filter.get(PartitionStateViewWalker.CACHE_GROUP_ID_FILTER);
+ UUID nodeId = (UUID)filter.get(PartitionStateViewWalker.NODE_ID_FILTER);
+ Integer partId = (Integer)filter.get(PartitionStateViewWalker.PARTITION_ID_FILTER);
+
+ return () -> F.concat(F.concat(F.iterator(filteredMap(cacheGrps, cacheGrpId).values(),
+ grp -> F.iterator(filteredMap(grp.topology().partitionMap(false), nodeId).entrySet(),
+ nodeToParts -> F.iterator(filteredMap(nodeToParts.getValue().map(),
+ partId == null || partId < 0 ? null : partId).entrySet(),
+ partToStates -> new PartitionStateView(
+ grp.groupId(),
+ nodeToParts.getKey(),
+ partToStates.getKey(),
+ partToStates.getValue(),
+ isPrimary(grp, nodeToParts.getKey(), partToStates.getKey())),
+ true),
+ true),
+ true)));
+ }
+
+ /**
+ * Filter map by key.
+ *
+ * @param map Map.
+ * @param key Filtering key.
+ */
+ private static <K, V> Map<K, V> filteredMap(Map<K, V> map, K key) {
+ if (key == null)
+ return map;
+
+ V val = map.get(key);
+
+ return val != null ? F.asMap(key, val) : Collections.emptyMap();
+ }
+
+ /**
+ * @param grp Cache group.
+ * @param nodeId Node id.
+ * @param part Partition.
+ */
+ private static boolean isPrimary(CacheGroupContext grp, UUID nodeId, int part) {
+ List<ClusterNode> nodes = grp.affinity().lastReadyAffinity().get(part);
+
+ if (F.isEmpty(nodes))
+ return false;
+
+ ClusterNode primaryNode = nodes.get(0);
+
+ return primaryNode != null && nodeId.equals(primaryNode.id());
+ }
+
+ /**
* Recovery lifecycle for caches.
*/
private class CacheRecoveryLifecycle implements MetastorageLifecycleListener, DatabaseLifecycleListener {
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/PartitionStateView.java b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/PartitionStateView.java
new file mode 100644
index 0000000..ac5726f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/PartitionStateView.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.systemview.view;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.systemview.walker.Filtrable;
+import org.apache.ignite.internal.managers.systemview.walker.Order;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+
+/**
+ * Partition state representation for a {@link SystemView}.
+ */
+public class PartitionStateView {
+ /** Cache group id. */
+ private final int cacheGrpId;
+
+ /** Node id. */
+ private final UUID nodeId;
+
+ /** Partition id. */
+ private final int partId;
+
+ /** Partition state. */
+ private final GridDhtPartitionState state;
+
+ /** Is primary partition. */
+ private final boolean primary;
+
+ /**
+ * @param cacheGrpId Cache group id.
+ * @param nodeId Node id.
+ * @param partId Partition id.
+ * @param state Partition state.
+ * @param primary Is primary partition for node.
+ */
+ public PartitionStateView(int cacheGrpId, UUID nodeId, int partId, GridDhtPartitionState state, boolean primary) {
+ this.cacheGrpId = cacheGrpId;
+ this.nodeId = nodeId;
+ this.partId = partId;
+ this.state = state;
+ this.primary = primary;
+ }
+
+ /** @return Cache group id. */
+ @Order
+ @Filtrable
+ public int cacheGroupId() {
+ return cacheGrpId;
+ }
+
+ /** @return Node id. */
+ @Order(1)
+ @Filtrable
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** @return Partition id. */
+ @Order(2)
+ @Filtrable
+ public int partitionId() {
+ return partId;
+ }
+
+ /** @return Partition state. */
+ @Order(3)
+ public String state() {
+ return state.name();
+ }
+
+ /** @return Is primary partition. */
+ @Order(4)
+ public boolean isPrimary() {
+ return primary;
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
index f95e063..75025cf 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
@@ -18,11 +18,14 @@
package org.apache.ignite.internal.processors.cache.metric;
import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@@ -35,12 +38,16 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteJdbcThinDriver;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -52,10 +59,14 @@ import org.apache.ignite.internal.metric.AbstractExporterSpiTest;
import org.apache.ignite.internal.metric.SystemViewSelfTest.TestPredicate;
import org.apache.ignite.internal.metric.SystemViewSelfTest.TestRunnable;
import org.apache.ignite.internal.metric.SystemViewSelfTest.TestTransformer;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.service.DummyService;
import org.apache.ignite.internal.util.StripedExecutor;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.spi.metric.sql.SqlViewMetricExporterSpi;
import org.apache.ignite.spi.systemview.view.SqlSchemaView;
@@ -119,6 +130,7 @@ public class SqlViewExporterSpiTest extends AbstractExporterSpiTest {
ignite0 = startGrid(0);
ignite1 = startGrid(1);
+ ignite0.cluster().baselineAutoAdjustEnabled(false);
ignite0.cluster().active(true);
}
@@ -427,7 +439,8 @@ public class SqlViewExporterSpiTest extends AbstractExporterSpiTest {
"STRIPED_THREADPOOL_QUEUE",
"DATASTREAM_THREADPOOL_QUEUE",
"DATA_REGION_PAGE_LISTS",
- "CACHE_GROUP_PAGE_LISTS"
+ "CACHE_GROUP_PAGE_LISTS",
+ "PARTITION_STATES"
));
Set<String> actViews = new HashSet<>();
@@ -922,6 +935,131 @@ public class SqlViewExporterSpiTest extends AbstractExporterSpiTest {
"BUCKET_SIZE > 0").isEmpty());
}
+ /** */
+ @Test
+ public void testPartitionStates() throws Exception {
+ String nodeName0 = getTestIgniteInstanceName(0);
+ String nodeName1 = getTestIgniteInstanceName(1);
+ String nodeName2 = getTestIgniteInstanceName(2);
+
+ IgniteCache<Integer, Integer> cache1 = ignite0.createCache(new CacheConfiguration<Integer, Integer>()
+ .setName("cache1")
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setAffinity(new TestAffinityFunction(new String[][] {{nodeName0, nodeName1}, {nodeName1, nodeName2},
+ {nodeName2, nodeName0}})));
+
+ IgniteCache<Integer, Integer> cache2 = ignite0.createCache(new CacheConfiguration<Integer, Integer>()
+ .setName("cache2")
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setAffinity(new TestAffinityFunction(new String[][] {{nodeName0, nodeName1, nodeName2}, {nodeName1}})));
+
+ for (int i = 0; i < 100; i++) {
+ cache1.put(i, i);
+ cache2.put(i, i);
+ }
+
+ try (IgniteEx ignite2 = startGrid(nodeName2)) {
+ ignite2.rebalanceEnabled(false);
+
+ ignite0.cluster().setBaselineTopology(ignite0.cluster().topologyVersion());
+
+ String partStateSql = "SELECT STATE FROM SYS.PARTITION_STATES WHERE CACHE_GROUP_ID = ? AND NODE_ID = ? " +
+ "AND PARTITION_ID = ?";
+
+ UUID nodeId0 = ignite0.cluster().localNode().id();
+ UUID nodeId1 = ignite1.cluster().localNode().id();
+ UUID nodeId2 = ignite2.cluster().localNode().id();
+
+ Integer cacheGrpId1 = ignite0.cachex("cache1").context().groupId();
+ Integer cacheGrpId2 = ignite0.cachex("cache2").context().groupId();
+
+ String owningState = GridDhtPartitionState.OWNING.name();
+ String movingState = GridDhtPartitionState.MOVING.name();
+
+ for (Ignite ignite : Arrays.asList(ignite0, ignite1, ignite2)) {
+ // Check partitions for cache1.
+ assertEquals(owningState, execute(ignite, partStateSql, cacheGrpId1, nodeId0, 0).get(0).get(0));
+ assertEquals(owningState, execute(ignite, partStateSql, cacheGrpId1, nodeId1, 0).get(0).get(0));
+ assertEquals(owningState, execute(ignite, partStateSql, cacheGrpId1, nodeId1, 1).get(0).get(0));
+ assertEquals(movingState, execute(ignite, partStateSql, cacheGrpId1, nodeId2, 1).get(0).get(0));
+ assertEquals(owningState, execute(ignite, partStateSql, cacheGrpId1, nodeId0, 2).get(0).get(0));
+ assertEquals(movingState, execute(ignite, partStateSql, cacheGrpId1, nodeId2, 2).get(0).get(0));
+
+ // Check partitions for cache2.
+ assertEquals(owningState, execute(ignite, partStateSql, cacheGrpId2, nodeId0, 0).get(0).get(0));
+ assertEquals(owningState, execute(ignite, partStateSql, cacheGrpId2, nodeId1, 0).get(0).get(0));
+ assertEquals(movingState, execute(ignite, partStateSql, cacheGrpId2, nodeId2, 0).get(0).get(0));
+ assertEquals(owningState, execute(ignite, partStateSql, cacheGrpId2, nodeId1, 1).get(0).get(0));
+ }
+
+ // Check primary flag.
+ String partPrimarySql = "SELECT IS_PRIMARY FROM SYS.PARTITION_STATES WHERE CACHE_GROUP_ID = ? " +
+ "AND NODE_ID = ? AND PARTITION_ID = ?";
+
+ for (Ignite ignite : Arrays.asList(ignite0, ignite1, ignite2)) {
+ // Check partitions for cache1.
+ assertEquals(true, execute(ignite, partPrimarySql, cacheGrpId1, nodeId0, 0).get(0).get(0));
+ assertEquals(false, execute(ignite, partPrimarySql, cacheGrpId1, nodeId1, 0).get(0).get(0));
+ assertEquals(true, execute(ignite, partPrimarySql, cacheGrpId1, nodeId1, 1).get(0).get(0));
+ assertEquals(false, execute(ignite, partPrimarySql, cacheGrpId1, nodeId2, 1).get(0).get(0));
+ assertEquals(true, execute(ignite, partPrimarySql, cacheGrpId1, nodeId0, 2).get(0).get(0));
+ assertEquals(false, execute(ignite, partPrimarySql, cacheGrpId1, nodeId2, 2).get(0).get(0));
+
+ // Check partitions for cache2.
+ assertEquals(true, execute(ignite, partPrimarySql, cacheGrpId2, nodeId0, 0).get(0).get(0));
+ assertEquals(false, execute(ignite, partPrimarySql, cacheGrpId2, nodeId1, 0).get(0).get(0));
+ assertEquals(false, execute(ignite, partPrimarySql, cacheGrpId2, nodeId2, 0).get(0).get(0));
+ assertEquals(true, execute(ignite, partPrimarySql, cacheGrpId2, nodeId1, 1).get(0).get(0));
+ }
+
+ // Check joins with cache groups and nodes views.
+ assertEquals(owningState, execute(ignite0, "SELECT p.STATE " +
+ "FROM SYS.PARTITION_STATES p " +
+ "JOIN SYS.CACHE_GROUPS g ON p.CACHE_GROUP_ID = g.CACHE_GROUP_ID " +
+ "JOIN SYS.NODES n ON p.NODE_ID = n.NODE_ID " +
+ "WHERE g.CACHE_GROUP_NAME = 'cache2' AND n.CONSISTENT_ID = ? AND p.PARTITION_ID = 1", nodeName1)
+ .get(0).get(0));
+
+ // Check malformed or invalid values for indexed columns.
+ assertEquals(0, execute(ignite0, "SELECT * FROM SYS.PARTITION_STATES WHERE PARTITION_ID = ?",
+ Integer.MAX_VALUE).size());
+ assertEquals(0, execute(ignite0, "SELECT * FROM SYS.PARTITION_STATES WHERE PARTITION_ID = -1")
+ .size());
+ assertEquals(0, execute(ignite0, "SELECT * FROM SYS.PARTITION_STATES WHERE NODE_ID = '123'")
+ .size());
+ assertEquals(0, execute(ignite0, "SELECT * FROM SYS.PARTITION_STATES WHERE NODE_ID = ?",
+ UUID.randomUUID()).size());
+ assertEquals(0, execute(ignite0, "SELECT * FROM SYS.PARTITION_STATES WHERE CACHE_GROUP_ID = 0")
+ .size());
+
+ AffinityTopologyVersion topVer = ignite0.context().discovery().topologyVersionEx();
+
+ ignite2.rebalanceEnabled(true);
+
+ // Wait until rebalance complete.
+ assertTrue(GridTestUtils.waitForCondition(() -> ignite0.context().discovery().topologyVersionEx()
+ .compareTo(topVer) > 0, 5_000L));
+
+ // Check that all partitions are in OWNING state now.
+ String cntByStateSql = "SELECT COUNT(*) FROM SYS.PARTITION_STATES " +
+ "WHERE CACHE_GROUP_ID IN (?, ?) AND STATE = ?";
+
+ for (Ignite ignite : Arrays.asList(ignite0, ignite1, ignite2)) {
+ assertEquals(10L, execute(ignite, cntByStateSql, cacheGrpId1, cacheGrpId2, owningState).get(0).get(0));
+ assertEquals(0L, execute(ignite, cntByStateSql, cacheGrpId1, cacheGrpId2, movingState).get(0).get(0));
+ }
+
+ // Check that assignment is now changed to ideal.
+ for (Ignite ignite : Arrays.asList(ignite0, ignite1, ignite2)) {
+ assertEquals(false, execute(ignite, partPrimarySql, cacheGrpId1, nodeId0, 2).get(0).get(0));
+ assertEquals(true, execute(ignite, partPrimarySql, cacheGrpId1, nodeId2, 2).get(0).get(0));
+ }
+ }
+ finally {
+ ignite0.cluster().setBaselineTopology(ignite0.cluster().topologyVersion());
+ }
+ }
+
/**
* Execute query on given node.
*
@@ -935,4 +1073,60 @@ public class SqlViewExporterSpiTest extends AbstractExporterSpiTest {
return queryProcessor(node).querySqlFields(qry, true).getAll();
}
+
+ /**
+ * Affinity function with fixed partition allocation.
+ */
+ private static class TestAffinityFunction implements AffinityFunction {
+ /** Partitions to nodes map. */
+ private final String[][] partMap;
+
+ /**
+ * @param partMap Parition allocation map, contains nodes consistent ids for each partition.
+ */
+ private TestAffinityFunction(String[][] partMap) {
+ this.partMap = partMap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ return partMap.length;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key) {
+ return key.hashCode() % partitions();
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+ List<List<ClusterNode>> parts = new ArrayList<>(partMap.length);
+
+ for (String[] nodes : partMap) {
+ List<ClusterNode> nodesList = new ArrayList<>();
+
+ for (String nodeConsistentId: nodes) {
+ ClusterNode affNode = F.find(affCtx.currentTopologySnapshot(), null,
+ (IgnitePredicate<ClusterNode>)node -> node.consistentId().equals(nodeConsistentId));
+
+ if (affNode != null)
+ nodesList.add(affNode);
+ }
+
+ parts.add(nodesList);
+ }
+
+ return parts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNode(UUID nodeId) {
+ // No-op.
+ }
+ }
}