You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2020/06/22 08:52:42 UTC

[ignite] branch master updated: IGNITE-10708 Add partition states system view (#7932)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 32d3baa  IGNITE-10708 Add partition states system view (#7932)
32d3baa is described below

commit 32d3baa50d3337087e806d3387521359f5618d18
Author: Aleksey Plekhanov <Pl...@gmail.com>
AuthorDate: Mon Jun 22 13:52:20 2020 +0500

    IGNITE-10708 Add partition states system view (#7932)
---
 .../internal/jdbc2/JdbcMetadataSelfTest.java       |   3 +-
 .../ignite/jdbc/thin/JdbcThinMetadataSelfTest.java |  12 +-
 .../SystemViewRowAttributeWalkerGenerator.java     |   2 +
 .../walker/PartitionStateViewWalker.java           |  75 ++++++++
 .../affinity/GridAffinityAssignmentCache.java      |   7 +
 .../processors/cache/GridCacheProcessor.java       |  90 ++++++++--
 .../spi/systemview/view/PartitionStateView.java    |  91 ++++++++++
 .../cache/metric/SqlViewExporterSpiTest.java       | 196 ++++++++++++++++++++-
 8 files changed, 455 insertions(+), 21 deletions(-)

diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java
index 0570be5..aa1237f 100755
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java
@@ -344,7 +344,8 @@ public class JdbcMetadataSelfTest extends GridCommonAbstractTest {
             "CONTINUOUS_QUERIES",
             "STRIPED_THREADPOOL_QUEUE",
             "DATASTREAM_THREADPOOL_QUEUE",
-            "CACHE_GROUP_PAGE_LISTS"
+            "CACHE_GROUP_PAGE_LISTS",
+            "PARTITION_STATES"
         ));
 
         Set<String> actViews = new HashSet<>();
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
index a2c1eff..4fba9eb 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
@@ -442,7 +442,8 @@ public class JdbcThinMetadataSelfTest extends JdbcThinAbstractSelfTest {
                 "SYS.STRIPED_THREADPOOL_QUEUE",
                 "SYS.DATASTREAM_THREADPOOL_QUEUE",
                 "SYS.CACHE_GROUP_PAGE_LISTS",
-                "SYS.DATA_REGION_PAGE_LISTS"
+                "SYS.DATA_REGION_PAGE_LISTS",
+                "SYS.PARTITION_STATES"
             ))
         );
     }
@@ -982,8 +983,13 @@ public class JdbcThinMetadataSelfTest extends JdbcThinAbstractSelfTest {
                 "SYS.DATA_REGION_PAGE_LISTS.BUCKET_NUMBER.null.10",
                 "SYS.DATA_REGION_PAGE_LISTS.BUCKET_SIZE.null.19",
                 "SYS.DATA_REGION_PAGE_LISTS.STRIPES_COUNT.null.10",
-                "SYS.DATA_REGION_PAGE_LISTS.CACHED_PAGES_COUNT.null.10"
-            ));
+                "SYS.DATA_REGION_PAGE_LISTS.CACHED_PAGES_COUNT.null.10",
+                "SYS.PARTITION_STATES.CACHE_GROUP_ID.null.10",
+                "SYS.PARTITION_STATES.PARTITION_ID.null.10",
+                "SYS.PARTITION_STATES.NODE_ID.null.2147483647",
+                "SYS.PARTITION_STATES.STATE.null.2147483647",
+                "SYS.PARTITION_STATES.IS_PRIMARY.null.1"
+                ));
 
             Assert.assertEquals(expectedCols, actualSystemCols);
         }
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
index 19c66a8..6b2379f 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
@@ -46,6 +46,7 @@ import org.apache.ignite.spi.systemview.view.ClientConnectionView;
 import org.apache.ignite.spi.systemview.view.ClusterNodeView;
 import org.apache.ignite.spi.systemview.view.ComputeTaskView;
 import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
+import org.apache.ignite.spi.systemview.view.PartitionStateView;
 import org.apache.ignite.spi.systemview.view.ScanQueryView;
 import org.apache.ignite.spi.systemview.view.ServiceView;
 import org.apache.ignite.spi.systemview.view.SqlIndexView;
@@ -104,6 +105,7 @@ public class SystemViewRowAttributeWalkerGenerator {
         gen.generateAndWrite(StripedExecutorTaskView.class, DFLT_SRC_DIR);
         gen.generateAndWrite(PagesListView.class, DFLT_SRC_DIR);
         gen.generateAndWrite(CachePagesListView.class, DFLT_SRC_DIR);
+        gen.generateAndWrite(PartitionStateView.class, DFLT_SRC_DIR);
 
         gen.generateAndWrite(SqlSchemaView.class, INDEXING_SRC_DIR);
         gen.generateAndWrite(SqlTableView.class, INDEXING_SRC_DIR);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/PartitionStateViewWalker.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/PartitionStateViewWalker.java
new file mode 100644
index 0000000..c2a9c5c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/PartitionStateViewWalker.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.systemview.walker;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.systemview.view.PartitionStateView;
+import org.apache.ignite.spi.systemview.view.SystemViewRowAttributeWalker;
+
+/**
+ * Generated by {@code org.apache.ignite.codegen.SystemViewRowAttributeWalkerGenerator}.
+ * {@link PartitionStateView} attributes walker.
+ * 
+ * @see PartitionStateView
+ */
+public class PartitionStateViewWalker implements SystemViewRowAttributeWalker<PartitionStateView> {
+    /** Filter key for attribute "cacheGroupId" */
+    public static final String CACHE_GROUP_ID_FILTER = "cacheGroupId";
+
+    /** Filter key for attribute "nodeId" */
+    public static final String NODE_ID_FILTER = "nodeId";
+
+    /** Filter key for attribute "partitionId" */
+    public static final String PARTITION_ID_FILTER = "partitionId";
+
+    /** List of filtrable attributes. */
+    private static final List<String> FILTRABLE_ATTRS = Collections.unmodifiableList(F.asList(
+        "cacheGroupId", "nodeId", "partitionId"
+    ));
+
+    /** {@inheritDoc} */
+    @Override public List<String> filtrableAttributes() {
+        return FILTRABLE_ATTRS;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void visitAll(AttributeVisitor v) {
+        v.accept(0, "cacheGroupId", int.class);
+        v.accept(1, "nodeId", UUID.class);
+        v.accept(2, "partitionId", int.class);
+        v.accept(3, "state", String.class);
+        v.accept(4, "isPrimary", boolean.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void visitAll(PartitionStateView row, AttributeWithValueVisitor v) {
+        v.acceptInt(0, "cacheGroupId", row.cacheGroupId());
+        v.accept(1, "nodeId", UUID.class, row.nodeId());
+        v.acceptInt(2, "partitionId", row.partitionId());
+        v.accept(3, "state", String.class, row.state());
+        v.acceptBoolean(4, "isPrimary", row.isPrimary());
+    }
+
+    /** {@inheritDoc} */
+    @Override public int count() {
+        return 5;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 1c8db77..533f586 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -597,6 +597,13 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
+     * @return Last initialized affinity assignment.
+     */
+    public AffinityAssignment lastReadyAffinity() {
+        return head.get();
+    }
+
+    /**
      * @param topVer Topology version.
      * @return Affinity assignment.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index d994988..aff0dc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.managers.systemview.walker.CachePagesListViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.PartitionStateViewWalker;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -181,6 +182,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
 import org.apache.ignite.spi.systemview.view.CachePagesListView;
+import org.apache.ignite.spi.systemview.view.PartitionStateView;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -224,6 +226,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** System view description for page lists. */
     public static final String CACHE_GRP_PAGE_LIST_VIEW_DESC = "Cache group page lists";
 
+    /** System view name for partition states. */
+    public static final String PART_STATES_VIEW = "partitionStates";
+
+    /** System view description for partition states. */
+    public static final String PART_STATES_VIEW_DESC = "Distribution of cache group partitions across cluster nodes";
+
     /** Enables start caches in parallel. */
     private final boolean IGNITE_ALLOW_START_CACHES_IN_PARALLEL =
         IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, true);
@@ -595,6 +603,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             this::pagesListViewSupplier,
             Function.identity()
         );
+
+        ctx.systemView().registerFiltrableView(
+            PART_STATES_VIEW,
+            PART_STATES_VIEW_DESC,
+            new PartitionStateViewWalker(),
+            this::partStatesViewSupplier,
+            Function.identity()
+        );
     }
 
     /**
@@ -5263,25 +5279,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      */
     private Iterable<CachePagesListView> pagesListViewSupplier(Map<String, Object> filter) {
         Integer cacheGrpId = (Integer)filter.get(CachePagesListViewWalker.CACHE_GROUP_ID_FILTER);
-
-        Collection<CacheGroupContext> cacheGrps;
-
-        if (cacheGrpId != null) {
-            CacheGroupContext cacheGrp = this.cacheGrps.get(cacheGrpId);
-
-            if (cacheGrp == null)
-                return Collections.emptyList();
-
-            cacheGrps = Collections.singletonList(cacheGrp);
-        }
-        else
-            cacheGrps = this.cacheGrps.values();
-
         Integer partId = (Integer)filter.get(CachePagesListViewWalker.PARTITION_ID_FILTER);
         Integer bucketNum = (Integer)filter.get(CachePagesListViewWalker.BUCKET_NUMBER_FILTER);
 
-        Iterable<IgniteCacheOffheapManager.CacheDataStore> dataStores =
-            F.flat(F.iterator(cacheGrps, grp -> grp.offheap().cacheDataStores(), true));
+        Iterable<IgniteCacheOffheapManager.CacheDataStore> dataStores = F.flat(F.iterator(
+            filteredMap(cacheGrps, cacheGrpId).values(), grp -> grp.offheap().cacheDataStores(), true));
 
         return F.flat(F.iterator(dataStores, dataStore -> {
             RowStore rowStore = dataStore.rowStore();
@@ -5304,6 +5306,62 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Partition states view supplier.
+     *
+     * @param filter Filter.
+     */
+    private Iterable<PartitionStateView> partStatesViewSupplier(Map<String, Object> filter) {
+        Integer cacheGrpId = (Integer)filter.get(PartitionStateViewWalker.CACHE_GROUP_ID_FILTER);
+        UUID nodeId = (UUID)filter.get(PartitionStateViewWalker.NODE_ID_FILTER);
+        Integer partId = (Integer)filter.get(PartitionStateViewWalker.PARTITION_ID_FILTER);
+
+        return () -> F.concat(F.concat(F.iterator(filteredMap(cacheGrps, cacheGrpId).values(),
+            grp -> F.iterator(filteredMap(grp.topology().partitionMap(false), nodeId).entrySet(),
+                nodeToParts -> F.iterator(filteredMap(nodeToParts.getValue().map(),
+                    partId == null || partId < 0 ? null : partId).entrySet(),
+                    partToStates -> new PartitionStateView(
+                        grp.groupId(),
+                        nodeToParts.getKey(),
+                        partToStates.getKey(),
+                        partToStates.getValue(),
+                        isPrimary(grp, nodeToParts.getKey(), partToStates.getKey())),
+                    true),
+                true),
+            true)));
+    }
+
+    /**
+     * Filter map by key.
+     *
+     * @param map Map.
+     * @param key Filtering key.
+     */
+    private static <K, V> Map<K, V> filteredMap(Map<K, V> map, K key) {
+        if (key == null)
+            return map;
+
+        V val = map.get(key);
+
+        return val != null ? F.asMap(key, val) : Collections.emptyMap();
+    }
+
+    /**
+     * @param grp Cache group.
+     * @param nodeId Node id.
+     * @param part Partition.
+     */
+    private static boolean isPrimary(CacheGroupContext grp, UUID nodeId, int part) {
+        List<ClusterNode> nodes = grp.affinity().lastReadyAffinity().get(part);
+
+        if (F.isEmpty(nodes))
+            return false;
+
+        ClusterNode primaryNode = nodes.get(0);
+
+        return primaryNode != null && nodeId.equals(primaryNode.id());
+    }
+
+    /**
      * Recovery lifecycle for caches.
      */
     private class CacheRecoveryLifecycle implements MetastorageLifecycleListener, DatabaseLifecycleListener {
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/PartitionStateView.java b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/PartitionStateView.java
new file mode 100644
index 0000000..ac5726f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/PartitionStateView.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.systemview.view;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.systemview.walker.Filtrable;
+import org.apache.ignite.internal.managers.systemview.walker.Order;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+
+/**
+ * Partition state representation for a {@link SystemView}.
+ */
+public class PartitionStateView {
+    /** Cache group id. */
+    private final int cacheGrpId;
+
+    /** Node id. */
+    private final UUID nodeId;
+
+    /** Partition id. */
+    private final int partId;
+
+    /** Partition state. */
+    private final GridDhtPartitionState state;
+
+    /** Is primary partition. */
+    private final boolean primary;
+
+    /**
+     * @param cacheGrpId Cache group id.
+     * @param nodeId Node id.
+     * @param partId Partition id.
+     * @param state Partition state.
+     * @param primary Is primary partition for node.
+     */
+    public PartitionStateView(int cacheGrpId, UUID nodeId, int partId, GridDhtPartitionState state, boolean primary) {
+        this.cacheGrpId = cacheGrpId;
+        this.nodeId = nodeId;
+        this.partId = partId;
+        this.state = state;
+        this.primary = primary;
+    }
+
+    /** @return Cache group id. */
+    @Order
+    @Filtrable
+    public int cacheGroupId() {
+        return cacheGrpId;
+    }
+
+    /** @return Node id. */
+    @Order(1)
+    @Filtrable
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /** @return Partition id. */
+    @Order(2)
+    @Filtrable
+    public int partitionId() {
+        return partId;
+    }
+
+    /** @return Partition state. */
+    @Order(3)
+    public String state() {
+        return state.name();
+    }
+
+    /** @return Is primary partition. */
+    @Order(4)
+    public boolean isPrimary() {
+        return primary;
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
index f95e063..75025cf 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
@@ -18,11 +18,14 @@
 package org.apache.ignite.internal.processors.cache.metric;
 
 import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -35,12 +38,16 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteJdbcThinDriver;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -52,10 +59,14 @@ import org.apache.ignite.internal.metric.AbstractExporterSpiTest;
 import org.apache.ignite.internal.metric.SystemViewSelfTest.TestPredicate;
 import org.apache.ignite.internal.metric.SystemViewSelfTest.TestRunnable;
 import org.apache.ignite.internal.metric.SystemViewSelfTest.TestTransformer;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.service.DummyService;
 import org.apache.ignite.internal.util.StripedExecutor;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.spi.metric.sql.SqlViewMetricExporterSpi;
 import org.apache.ignite.spi.systemview.view.SqlSchemaView;
@@ -119,6 +130,7 @@ public class SqlViewExporterSpiTest extends AbstractExporterSpiTest {
         ignite0 = startGrid(0);
         ignite1 = startGrid(1);
 
+        ignite0.cluster().baselineAutoAdjustEnabled(false);
         ignite0.cluster().active(true);
     }
 
@@ -427,7 +439,8 @@ public class SqlViewExporterSpiTest extends AbstractExporterSpiTest {
             "STRIPED_THREADPOOL_QUEUE",
             "DATASTREAM_THREADPOOL_QUEUE",
             "DATA_REGION_PAGE_LISTS",
-            "CACHE_GROUP_PAGE_LISTS"
+            "CACHE_GROUP_PAGE_LISTS",
+            "PARTITION_STATES"
         ));
 
         Set<String> actViews = new HashSet<>();
@@ -922,6 +935,131 @@ public class SqlViewExporterSpiTest extends AbstractExporterSpiTest {
             "BUCKET_SIZE > 0").isEmpty());
     }
 
+    /** */
+    @Test
+    public void testPartitionStates() throws Exception {
+        String nodeName0 = getTestIgniteInstanceName(0);
+        String nodeName1 = getTestIgniteInstanceName(1);
+        String nodeName2 = getTestIgniteInstanceName(2);
+
+        IgniteCache<Integer, Integer> cache1 = ignite0.createCache(new CacheConfiguration<Integer, Integer>()
+            .setName("cache1")
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setAffinity(new TestAffinityFunction(new String[][] {{nodeName0, nodeName1}, {nodeName1, nodeName2},
+                {nodeName2, nodeName0}})));
+
+        IgniteCache<Integer, Integer> cache2 = ignite0.createCache(new CacheConfiguration<Integer, Integer>()
+            .setName("cache2")
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setAffinity(new TestAffinityFunction(new String[][] {{nodeName0, nodeName1, nodeName2}, {nodeName1}})));
+
+        for (int i = 0; i < 100; i++) {
+            cache1.put(i, i);
+            cache2.put(i, i);
+        }
+
+        try (IgniteEx ignite2 = startGrid(nodeName2)) {
+            ignite2.rebalanceEnabled(false);
+
+            ignite0.cluster().setBaselineTopology(ignite0.cluster().topologyVersion());
+
+            String partStateSql = "SELECT STATE FROM SYS.PARTITION_STATES WHERE CACHE_GROUP_ID = ? AND NODE_ID = ? " +
+                "AND PARTITION_ID = ?";
+
+            UUID nodeId0 = ignite0.cluster().localNode().id();
+            UUID nodeId1 = ignite1.cluster().localNode().id();
+            UUID nodeId2 = ignite2.cluster().localNode().id();
+
+            Integer cacheGrpId1 = ignite0.cachex("cache1").context().groupId();
+            Integer cacheGrpId2 = ignite0.cachex("cache2").context().groupId();
+
+            String owningState = GridDhtPartitionState.OWNING.name();
+            String movingState = GridDhtPartitionState.MOVING.name();
+
+            for (Ignite ignite : Arrays.asList(ignite0, ignite1, ignite2)) {
+                // Check partitions for cache1.
+                assertEquals(owningState, execute(ignite, partStateSql, cacheGrpId1, nodeId0, 0).get(0).get(0));
+                assertEquals(owningState, execute(ignite, partStateSql, cacheGrpId1, nodeId1, 0).get(0).get(0));
+                assertEquals(owningState, execute(ignite, partStateSql, cacheGrpId1, nodeId1, 1).get(0).get(0));
+                assertEquals(movingState, execute(ignite, partStateSql, cacheGrpId1, nodeId2, 1).get(0).get(0));
+                assertEquals(owningState, execute(ignite, partStateSql, cacheGrpId1, nodeId0, 2).get(0).get(0));
+                assertEquals(movingState, execute(ignite, partStateSql, cacheGrpId1, nodeId2, 2).get(0).get(0));
+
+                // Check partitions for cache2.
+                assertEquals(owningState, execute(ignite, partStateSql, cacheGrpId2, nodeId0, 0).get(0).get(0));
+                assertEquals(owningState, execute(ignite, partStateSql, cacheGrpId2, nodeId1, 0).get(0).get(0));
+                assertEquals(movingState, execute(ignite, partStateSql, cacheGrpId2, nodeId2, 0).get(0).get(0));
+                assertEquals(owningState, execute(ignite, partStateSql, cacheGrpId2, nodeId1, 1).get(0).get(0));
+            }
+
+            // Check primary flag.
+            String partPrimarySql = "SELECT IS_PRIMARY FROM SYS.PARTITION_STATES WHERE CACHE_GROUP_ID = ? " +
+                "AND NODE_ID = ? AND PARTITION_ID = ?";
+
+            for (Ignite ignite : Arrays.asList(ignite0, ignite1, ignite2)) {
+                // Check partitions for cache1.
+                assertEquals(true, execute(ignite, partPrimarySql, cacheGrpId1, nodeId0, 0).get(0).get(0));
+                assertEquals(false, execute(ignite, partPrimarySql, cacheGrpId1, nodeId1, 0).get(0).get(0));
+                assertEquals(true, execute(ignite, partPrimarySql, cacheGrpId1, nodeId1, 1).get(0).get(0));
+                assertEquals(false, execute(ignite, partPrimarySql, cacheGrpId1, nodeId2, 1).get(0).get(0));
+                assertEquals(true, execute(ignite, partPrimarySql, cacheGrpId1, nodeId0, 2).get(0).get(0));
+                assertEquals(false, execute(ignite, partPrimarySql, cacheGrpId1, nodeId2, 2).get(0).get(0));
+
+                // Check partitions for cache2.
+                assertEquals(true, execute(ignite, partPrimarySql, cacheGrpId2, nodeId0, 0).get(0).get(0));
+                assertEquals(false, execute(ignite, partPrimarySql, cacheGrpId2, nodeId1, 0).get(0).get(0));
+                assertEquals(false, execute(ignite, partPrimarySql, cacheGrpId2, nodeId2, 0).get(0).get(0));
+                assertEquals(true, execute(ignite, partPrimarySql, cacheGrpId2, nodeId1, 1).get(0).get(0));
+            }
+
+            // Check joins with cache groups and nodes views.
+            assertEquals(owningState, execute(ignite0, "SELECT p.STATE " +
+                "FROM SYS.PARTITION_STATES p " +
+                "JOIN SYS.CACHE_GROUPS g ON p.CACHE_GROUP_ID = g.CACHE_GROUP_ID " +
+                "JOIN SYS.NODES n ON p.NODE_ID = n.NODE_ID " +
+                "WHERE g.CACHE_GROUP_NAME = 'cache2' AND n.CONSISTENT_ID = ? AND p.PARTITION_ID = 1", nodeName1)
+                .get(0).get(0));
+
+            // Check malformed or invalid values for indexed columns.
+            assertEquals(0, execute(ignite0, "SELECT * FROM SYS.PARTITION_STATES WHERE PARTITION_ID = ?",
+                Integer.MAX_VALUE).size());
+            assertEquals(0, execute(ignite0, "SELECT * FROM SYS.PARTITION_STATES WHERE PARTITION_ID = -1")
+                .size());
+            assertEquals(0, execute(ignite0, "SELECT * FROM SYS.PARTITION_STATES WHERE NODE_ID = '123'")
+                .size());
+            assertEquals(0, execute(ignite0, "SELECT * FROM SYS.PARTITION_STATES WHERE NODE_ID = ?",
+                UUID.randomUUID()).size());
+            assertEquals(0, execute(ignite0, "SELECT * FROM SYS.PARTITION_STATES WHERE CACHE_GROUP_ID = 0")
+                .size());
+
+            AffinityTopologyVersion topVer = ignite0.context().discovery().topologyVersionEx();
+
+            ignite2.rebalanceEnabled(true);
+
+            // Wait until rebalance complete.
+            assertTrue(GridTestUtils.waitForCondition(() -> ignite0.context().discovery().topologyVersionEx()
+                .compareTo(topVer) > 0, 5_000L));
+
+            // Check that all partitions are in OWNING state now.
+            String cntByStateSql = "SELECT COUNT(*) FROM SYS.PARTITION_STATES " +
+                "WHERE CACHE_GROUP_ID IN (?, ?) AND STATE = ?";
+
+            for (Ignite ignite : Arrays.asList(ignite0, ignite1, ignite2)) {
+                assertEquals(10L, execute(ignite, cntByStateSql, cacheGrpId1, cacheGrpId2, owningState).get(0).get(0));
+                assertEquals(0L, execute(ignite, cntByStateSql, cacheGrpId1, cacheGrpId2, movingState).get(0).get(0));
+            }
+
+            // Check that assignment is now changed to ideal.
+            for (Ignite ignite : Arrays.asList(ignite0, ignite1, ignite2)) {
+                assertEquals(false, execute(ignite, partPrimarySql, cacheGrpId1, nodeId0, 2).get(0).get(0));
+                assertEquals(true, execute(ignite, partPrimarySql, cacheGrpId1, nodeId2, 2).get(0).get(0));
+            }
+        }
+        finally {
+            ignite0.cluster().setBaselineTopology(ignite0.cluster().topologyVersion());
+        }
+    }
+
     /**
      * Execute query on given node.
      *
@@ -935,4 +1073,60 @@ public class SqlViewExporterSpiTest extends AbstractExporterSpiTest {
 
         return queryProcessor(node).querySqlFields(qry, true).getAll();
     }
+
+    /**
+     * Affinity function with fixed partition allocation.
+     */
+    private static class TestAffinityFunction implements AffinityFunction {
+        /** Partitions to nodes map. */
+        private final String[][] partMap;
+
+        /**
+         * @param partMap Parition allocation map, contains nodes consistent ids for each partition.
+         */
+        private TestAffinityFunction(String[][] partMap) {
+            this.partMap = partMap;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partitions() {
+            return partMap.length;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partition(Object key) {
+            return key.hashCode() % partitions();
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+            List<List<ClusterNode>> parts = new ArrayList<>(partMap.length);
+
+            for (String[] nodes : partMap) {
+                List<ClusterNode> nodesList = new ArrayList<>();
+
+                for (String nodeConsistentId: nodes) {
+                    ClusterNode affNode = F.find(affCtx.currentTopologySnapshot(), null,
+                        (IgnitePredicate<ClusterNode>)node -> node.consistentId().equals(nodeConsistentId));
+
+                    if (affNode != null)
+                        nodesList.add(affNode);
+                }
+
+                parts.add(nodesList);
+            }
+
+            return parts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeNode(UUID nodeId) {
+            // No-op.
+        }
+    }
 }