You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2019/08/12 00:03:48 UTC

[ignite] branch master updated: IGNITE-11256 Implement read-only mode for grid - Fixes #6423.

This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a97dbe6  IGNITE-11256 Implement read-only mode for grid - Fixes #6423.
a97dbe6 is described below

commit a97dbe6385e248412429f46b5a26c4c6fc41b7f8
Author: Sergey Antonov <an...@gmail.com>
AuthorDate: Mon Aug 12 03:03:12 2019 +0300

    IGNITE-11256 Implement read-only mode for grid - Fixes #6423.
    
    Signed-off-by: Ivan Rakov <ir...@apache.org>
---
 .../internal/jdbc2/JdbcStreamingSelfTest.java      |  56 ++++
 .../ignite/jdbc/JdbcErrorsAbstractSelfTest.java    |  55 ++++
 .../thin/JdbcThinStreamingAbstractSelfTest.java    |   8 +
 .../main/java/org/apache/ignite/IgniteCluster.java |  15 +
 .../org/apache/ignite/internal/IgniteFeatures.java |   3 +
 .../org/apache/ignite/internal/IgniteKernal.java   |  19 ++
 .../internal/client/GridClientClusterState.java    |  14 +
 .../client/impl/GridClientClusterStateImpl.java    |  22 +-
 .../impl/connection/GridClientConnection.java      |  22 ++
 .../connection/GridClientNioTcpConnection.java     |  18 ++
 .../ClusterReadOnlyModeCheckedException.java       |  78 +++++
 .../internal/cluster/IgniteClusterAsyncImpl.java   |  10 +
 .../ignite/internal/cluster/IgniteClusterImpl.java |  37 +++
 .../internal/commandline/BaselineCommand.java      |   2 +-
 ...java => ClusterReadOnlyModeDisableCommand.java} |  47 +--
 ....java => ClusterReadOnlyModeEnableCommand.java} |  47 +--
 .../ignite/internal/commandline/CommandList.java   |   8 +-
 .../ignite/internal/commandline/StateCommand.java  |  13 +-
 .../ignite/internal/commandline/TxCommands.java    |   2 +-
 .../ignite/internal/jdbc2/JdbcBatchUpdateTask.java |   9 +-
 .../processors/cache/StateChangeRequest.java       |   7 +
 .../dht/GridDhtTopologyFutureAdapter.java          |   6 +-
 .../cache/query/IgniteQueryErrorCode.java          |   6 +
 .../cluster/ChangeGlobalStateMessage.java          |  23 +-
 .../cluster/DiscoveryDataClusterState.java         |  43 ++-
 .../cluster/GridClusterStateProcessor.java         | 242 ++++++++++++---
 .../cluster/IGridClusterStateProcessor.java        |  32 ++
 .../processors/datastreamer/DataStreamerImpl.java  |  21 +-
 .../internal/processors/odbc/SqlStateCode.java     |   8 +
 .../internal/processors/rest/GridRestCommand.java  |   9 +
 .../processors/rest/GridRestProcessor.java         |   2 +
 .../message/GridClientReadOnlyModeRequest.java     |  95 ++++++
 .../GridChangeReadOnlyModeCommandHandler.java      | 101 +++++++
 .../rest/protocols/tcp/GridTcpRestNioListener.java |  21 ++
 .../GridRestReadOnlyChangeModeRequest.java}        |  38 ++-
 .../org/apache/ignite/mxbean/IgniteMXBean.java     |  29 ++
 .../main/resources/META-INF/classnames.properties  |   1 +
 .../commandline/CommandHandlerParsingTest.java     | 253 +++++-----------
 .../cache/ClusterReadOnlyModeAbstractTest.java     |  70 +----
 .../processors/cache/ClusterReadOnlyModeTest.java  | 204 +++++++++----
 .../cache/ClusterReadOnlyModeTestUtils.java        | 185 ++++++++++++
 .../cluster/ClusterReadOnlyModeNodeJoinTest.java   |  76 +++++
 .../cluster/ClusterReadOnlyModeSelfTest.java       | 328 +++++++++++++++++++++
 .../junits/multijvm/IgniteClusterProcessProxy.java |  12 +-
 .../testsuites/IgniteKernalSelfTestSuite.java      |   4 +
 .../ignite/util/GridCommandHandlerSslTest.java     |   3 +-
 .../apache/ignite/util/GridCommandHandlerTest.java |  45 +++
 .../processors/query/h2/IgniteH2Indexing.java      |  12 +
 .../processors/query/h2/dml/DmlBatchSender.java    |  13 +
 .../cache/ttl/CacheTtlReadOnlyModeSelfTest.java    | 123 ++++++++
 .../IgniteCacheWithIndexingTestSuite.java          |   2 +
 .../ApiParity/ClusterMetricsParityTest.cs          |   2 +-
 .../ApiParity/ClusterParityTest.cs                 |   7 +-
 53 files changed, 2064 insertions(+), 444 deletions(-)

diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
index ecb3609..8162433 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.jdbc2;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collections;
@@ -31,7 +32,9 @@ import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.ConnectorConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.cluster.ClusterReadOnlyModeCheckedException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.jdbc.thin.JdbcThinAbstractSelfTest;
 import org.apache.ignite.lang.IgniteCallable;
@@ -165,6 +168,59 @@ public class JdbcStreamingSelfTest extends JdbcThinAbstractSelfTest {
      * @throws Exception if failed.
      */
     @Test
+    public void testStreamedInsertFailsOnReadOnlyMode() throws Exception {
+        grid(0).cluster().readOnly(true);
+
+        try {
+            assertTrue(grid(0).cluster().readOnly());
+
+            boolean failed = false;
+
+            try (Connection ordinalCon = createOrdinaryConnection();
+                 Statement selectStmt = ordinalCon.createStatement()
+            ) {
+                try (ResultSet rs = selectStmt.executeQuery("select count(*) from PUBLIC.Person")) {
+                    assertTrue(rs.next());
+
+                    assertEquals(0, rs.getLong(1));
+                }
+
+                try (Connection conn = createStreamedConnection(true)) {
+                    try (PreparedStatement stmt =
+                             conn.prepareStatement("insert into PUBLIC.Person(\"id\", \"name\") values (?, ?)")
+                    ) {
+                        for (int i = 1; i <= 2; i++) {
+                            stmt.setInt(1, i);
+                            stmt.setString(2, nameForId(i));
+
+                            stmt.executeUpdate();
+                        }
+                    }
+                }
+                catch (Exception e) {
+                    log.error("Insert failed", e);
+
+                    failed = X.hasCause(e, ClusterReadOnlyModeCheckedException.class);
+                }
+
+                try (ResultSet rs = selectStmt.executeQuery("select count(*) from PUBLIC.Person")) {
+                    assertTrue(rs.next());
+
+                    assertEquals("Insert should be failed", 0, rs.getLong(1));
+                }
+            }
+
+            assertTrue(failed);
+        }
+        finally {
+            grid(0).cluster().readOnly(false);
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
     public void testStreamedInsert() throws Exception {
         for (int i = 10; i <= 100; i += 10)
             put(i, nameForId(i * 100));
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java
index f8e745b..b2d3000 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java
@@ -744,6 +744,61 @@ public abstract class JdbcErrorsAbstractSelfTest extends GridCommonAbstractTest
     }
 
     /**
+     * Checks execution DML request on read-only cluster error code and message.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testUpdatesRejectedInReadOnlyMode() throws Exception {
+        try (Connection conn = getConnection()) {
+            try (Statement statement = conn.createStatement()) {
+                statement.executeUpdate("CREATE TABLE TEST_READ_ONLY (ID LONG PRIMARY KEY, VAL LONG)");
+            }
+        }
+
+        grid(0).cluster().readOnly(true);
+
+        try {
+            checkErrorState((conn) -> {
+                try (Statement statement = conn.createStatement()) {
+                    statement.executeUpdate("INSERT INTO TEST_READ_ONLY VALUES (1, 2)");
+                }
+            }, "90097", "Failed to execute DML statement. Cluster in read-only mode");
+        }
+        finally {
+            grid(0).cluster().readOnly(false);
+        }
+    }
+
+    /**
+     * Checks execution batch DML request on read-only cluster error code and message.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBatchUpdatesRejectedInReadOnlyMode() throws Exception {
+        try (Connection conn = getConnection()) {
+            try (Statement statement = conn.createStatement()) {
+                statement.executeUpdate("CREATE TABLE TEST_READ_ONLY_BATCH (ID LONG PRIMARY KEY, VAL LONG)");
+            }
+        }
+
+        grid(0).cluster().readOnly(true);
+
+        try {
+            checkErrorState((conn) -> {
+                try (Statement statement = conn.createStatement()) {
+                    statement.addBatch("INSERT INTO TEST_READ_ONLY_BATCH VALUES (1, 2)");
+                    statement.executeBatch();
+                }
+            }, "90097", null);
+        }
+        finally {
+            grid(0).cluster().readOnly(false);
+        }
+    }
+
+    /**
      * @return Connection to execute statements on.
      * @throws SQLException if failed.
      */
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
index 71b5049..97e863c 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.jetbrains.annotations.Nullable;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -211,6 +212,13 @@ public abstract class JdbcThinStreamingAbstractSelfTest extends JdbcStreamingSel
             assertEquals(i, grid(0).cache("T").get(i));
     }
 
+    /** {@inheritDoc} */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-11795")
+    @Test
+    @Override public void testStreamedInsertFailsOnReadOnlyMode() throws Exception {
+        super.testStreamedInsertFailsOnReadOnlyMode();
+    }
+
     /**
      *
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
index f93f4b5..5a5985c 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
@@ -459,6 +459,21 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport {
     public void active(boolean active);
 
     /**
+     * Checks Ignite grid in read-only mode or not.
+     *
+     * @return {@code true} if grid is in read-only mode and {@code false} If grid allows data modification operations.
+     */
+    public boolean readOnly();
+
+    /**
+     * Enable or disable Ignite grid read-only mode.
+     *
+     * @param readOnly If {@code true} enable read-only mode. If {@code false} disable read-only mode.
+     * @throws IgniteException If Ignite grid isn't active.
+     */
+    public void readOnly(boolean readOnly) throws IgniteException;
+
+    /**
      * Gets current baseline topology. If baseline topology was not set, will return {@code null}.
      *
      * @return Collection of nodes included to the current baseline topology.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index e5dbca1..534d5d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -56,6 +56,9 @@ public enum IgniteFeatures {
 
     /** Command which allow to detect and cleanup garbage which could left after destroying caches in shared groups */
     FIND_AND_DELETE_GARBAGE_COMMAND(8),
+    
+    /** Support of cluster read-only mode. */
+    CLUSTER_READ_ONLY_MODE(9),
 
     /** Distributed metastorage. */
     DISTRIBUTED_METASTORAGE(11),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index cc9b77d..a00b38c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -3800,6 +3800,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         Ignition.stop(igniteInstanceName, true);
     }
 
+    /** {@inheritDoc} */
     @Override public <K> Affinity<K> affinity(String cacheName) {
         CU.validateCacheName(cacheName);
         checkClusterState();
@@ -4533,6 +4534,24 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean readOnlyMode() {
+        return ctx.state().publicApiReadOnlyMode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readOnlyMode(boolean readOnly) {
+        ctx.state().changeGlobalState(readOnly);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getReadOnlyModeDuration() {
+        if (ctx.state().publicApiReadOnlyMode())
+            return U.currentTimeMillis() - ctx.state().readOnlyModeStateChangeTime();
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteKernal.class, this);
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java
index 4fa25ce..1d669f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java
@@ -30,4 +30,18 @@ public interface GridClientClusterState {
      * @return {@code Boolean} - Current cluster state. {@code True} active, {@code False} inactive.
      */
     public boolean active() throws GridClientException;
+
+    /**
+     * @return {@code True} if the cluster is in read-only mode and {@code false} otherwise.
+     * @throws GridClientException If request current cluster read-only mode failed.
+     */
+    public boolean readOnly() throws GridClientException;
+
+    /**
+     * Enable or disable Ignite grid read-only mode.
+     *
+     * @param readOnly If {@code True} enable read-only mode. If {@code false} disable read-only mode.
+     * @throws GridClientException If change of read-only mode is failed.
+     */
+    public void readOnly(boolean readOnly) throws GridClientException;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientClusterStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientClusterStateImpl.java
index 2dcf06d..97901e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientClusterStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientClusterStateImpl.java
@@ -55,7 +55,8 @@ public class GridClientClusterStateImpl extends GridClientAbstractProjection<Gri
     @Override public void active(final boolean active) throws GridClientException {
         withReconnectHandling(new ClientProjectionClosure<Void>() {
             @Override public GridClientFuture apply(
-                GridClientConnection conn, UUID nodeId
+                GridClientConnection conn,
+                UUID nodeId
             ) throws GridClientConnectionResetException, GridClientClosedException {
                 return conn.changeState(active, nodeId);
             }
@@ -64,11 +65,22 @@ public class GridClientClusterStateImpl extends GridClientAbstractProjection<Gri
 
     /** {@inheritDoc} */
     @Override public boolean active() throws GridClientException {
-        return withReconnectHandling(new ClientProjectionClosure<Boolean>() {
-            @Override public GridClientFuture<Boolean> apply(
-                GridClientConnection conn, UUID nodeId
+        return withReconnectHandling(GridClientConnection::currentState).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readOnly() throws GridClientException {
+        return withReconnectHandling(GridClientConnection::readOnlyState).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readOnly(boolean readOnly) throws GridClientException {
+        withReconnectHandling(new ClientProjectionClosure<Void>() {
+            @Override public GridClientFuture apply(
+                GridClientConnection conn,
+                UUID nodeId
             ) throws GridClientConnectionResetException, GridClientClosedException {
-                return conn.currentState(nodeId);
+                return conn.changeReadOnlyState(readOnly, nodeId);
             }
         }).get();
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnection.java
index c75bd24..de4347c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnection.java
@@ -325,6 +325,28 @@ public abstract class GridClientConnection {
         throws GridClientClosedException, GridClientConnectionResetException;
 
     /**
+     * Get current read-only mode status. If future contains {@code true} - read-only mode enabled, if {@code false} -
+     * read-only mode disabled.
+     *
+     * @param destNodeId Destination node id.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract GridClientFuture<Boolean> readOnlyState(UUID destNodeId)
+        throws GridClientClosedException, GridClientConnectionResetException;
+
+    /**
+     * Change read-only mode. Cluster must be activated.
+     *
+     * @param readOnly Read-only mode enabled flag.
+     * @param destNodeId Destination node id.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract GridClientFuture<?> changeReadOnlyState(boolean readOnly, UUID destNodeId)
+        throws GridClientClosedException, GridClientConnectionResetException;
+
+    /**
      * Gets node by node ID.
      *
      * @param id Node ID.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index c863a0c..5e489ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.client.marshaller.optimized.GridClientOptimize
 import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientAuthenticationRequest;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientCacheRequest;
+import org.apache.ignite.internal.processors.rest.client.message.GridClientReadOnlyModeRequest;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientStateRequest;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeRequest;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage;
@@ -818,6 +819,23 @@ public class GridClientNioTcpConnection extends GridClientConnection {
     }
 
     /** {@inheritDoc} */
+    @Override public GridClientFuture<?> changeReadOnlyState(
+        boolean readOnly,
+        UUID destNodeId
+    ) throws GridClientClosedException, GridClientConnectionResetException {
+        return readOnly ?
+            makeRequest(GridClientReadOnlyModeRequest.enableReadOnly(), destNodeId) :
+            makeRequest(GridClientReadOnlyModeRequest.disableReadOnly(), destNodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridClientFuture<Boolean> readOnlyState(
+        UUID destNodeId
+    ) throws GridClientClosedException, GridClientConnectionResetException {
+        return makeRequest(GridClientReadOnlyModeRequest.currentReadOnlyMode(), destNodeId);
+    }
+
+    /** {@inheritDoc} */
     @Override public GridClientFuture<Boolean> currentState(UUID destNodeId)
         throws GridClientClosedException, GridClientConnectionResetException {
         GridClientStateRequest msg = new GridClientStateRequest();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterReadOnlyModeCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterReadOnlyModeCheckedException.java
new file mode 100644
index 0000000..2dcde5a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterReadOnlyModeCheckedException.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCluster;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Exception defines modification data error in read-only cluster. See {@link IgniteCluster#readOnly()}
+ */
+public class ClusterReadOnlyModeCheckedException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Create empty exception.
+     */
+    public ClusterReadOnlyModeCheckedException() {
+        // No-op.
+    }
+
+    /**
+     * Creates new exception with given error message.
+     *
+     * @param msg Error message.
+     */
+    public ClusterReadOnlyModeCheckedException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Creates new grid exception with given throwable as a cause and
+     * source of error message.
+     *
+     * @param cause Non-null throwable cause.
+     */
+    public ClusterReadOnlyModeCheckedException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Creates new exception with given error message and optional nested exception.
+     *
+     * @param msg Error message.
+     * @param cause Optional nested exception (can be {@code null}).
+     * @param writableStackTrace whether or not the stack trace should
+     *                           be writable
+     */
+    public ClusterReadOnlyModeCheckedException(String msg, @Nullable Throwable cause, boolean writableStackTrace) {
+        super(msg, cause, writableStackTrace);
+    }
+
+    /**
+     * Creates new exception with given error message and optional nested exception.
+     *
+     * @param msg Error message.
+     * @param cause Optional nested exception (can be {@code null}).
+     */
+    public ClusterReadOnlyModeCheckedException(String msg, @Nullable Throwable cause) {
+        super(msg, cause);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
index 4bcfa8e..aaf598d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
@@ -414,4 +414,14 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(cluster);
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean readOnly() {
+        return cluster.readOnly();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readOnly(boolean readOnly) throws IgniteException {
+        cluster.readOnly(readOnly);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
index 6249301..a935a32 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -72,6 +72,8 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.IgniteFeatures.CLUSTER_READ_ONLY_MODE;
+import static org.apache.ignite.internal.IgniteFeatures.allNodesSupports;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
 import static org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.parseFile;
@@ -321,6 +323,41 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean readOnly() {
+        guard();
+
+        try {
+            return ctx.state().publicApiReadOnlyMode();
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readOnly(boolean readOnly) throws IgniteException {
+        guard();
+
+        try {
+            verifyReadOnlyModeSupport();
+
+            ctx.state().changeGlobalState(readOnly).get();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** */
+    private void verifyReadOnlyModeSupport() {
+        if (!allNodesSupports(ctx.discovery().discoCache().serverNodes(), CLUSTER_READ_ONLY_MODE))
+            throw new IgniteException("Not all nodes in cluster supports cluster read-only mode");
+    }
+
     /** */
     private Collection<BaselineNode> baselineNodes() {
         return new ArrayList<>(ctx.cluster().get().forServers().nodes());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/BaselineCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/BaselineCommand.java
index 51e3511..b792d8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/BaselineCommand.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/BaselineCommand.java
@@ -73,7 +73,7 @@ public class BaselineCommand implements Command<BaselineArguments> {
 
     /** {@inheritDoc} */
     @Override public String confirmationPrompt() {
-        if (BaselineSubcommands.COLLECT != baselineArgs.getCmd())
+        if (baselineArgs != null && BaselineSubcommands.COLLECT != baselineArgs.getCmd())
             return "Warning: the command will perform changes in baseline.";
 
         return null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/StateCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/ClusterReadOnlyModeDisableCommand.java
similarity index 60%
copy from modules/core/src/main/java/org/apache/ignite/internal/commandline/StateCommand.java
copy to modules/core/src/main/java/org/apache/ignite/internal/commandline/ClusterReadOnlyModeDisableCommand.java
index d93dda3..fca9832 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/StateCommand.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/ClusterReadOnlyModeDisableCommand.java
@@ -19,35 +19,25 @@ package org.apache.ignite.internal.commandline;
 
 import java.util.logging.Logger;
 import org.apache.ignite.internal.client.GridClient;
-import org.apache.ignite.internal.client.GridClientClusterState;
 import org.apache.ignite.internal.client.GridClientConfiguration;
 
-import static org.apache.ignite.internal.commandline.CommandList.STATE;
+import static org.apache.ignite.internal.commandline.CommandList.READ_ONLY_DISABLE;
+import static org.apache.ignite.internal.commandline.CommandLogger.optional;
+import static org.apache.ignite.internal.commandline.CommonArgParser.CMD_AUTO_CONFIRMATION;
 
 /**
- * Command to print cluster state.
+ * Command to disable cluster read-only mode.
  */
-public class StateCommand implements Command<Void> {
+public class ClusterReadOnlyModeDisableCommand implements Command<Void> {
     /** {@inheritDoc} */
-    @Override public void printUsage(Logger logger) {
-        Command.usage(logger, "Print current cluster state:", STATE);
-    }
-
-    /**
-     * Print cluster state.
-     *
-     * @param clientCfg Client configuration.
-     * @throws Exception If failed to print state.
-     */
-    @Override public Object execute(GridClientConfiguration clientCfg, Logger logger) throws Exception {
-        try (GridClient client = Command.startClient(clientCfg)){
-            GridClientClusterState state = client.state();
+    @Override public Object execute(GridClientConfiguration clientCfg, Logger log) throws Exception {
+        try (GridClient client = Command.startClient(clientCfg)) {
+            client.state().readOnly(false);
 
-            logger.info("Cluster is " + (state.active() ? "active" : "inactive"));
+            log.info("Cluster read-only mode disabled");
         }
         catch (Throwable e) {
-            if (!CommandHandler.isAuthError(e))
-                logger.severe("Failed to get cluster state.");
+            log.info("Failed to disable read-only mode");
 
             throw e;
         }
@@ -56,12 +46,27 @@ public class StateCommand implements Command<Void> {
     }
 
     /** {@inheritDoc} */
+    @Override public String confirmationPrompt() {
+        return "Warning: the command will disable read-only mode on a cluster.";
+    }
+
+    /** {@inheritDoc} */
     @Override public Void arg() {
         return null;
     }
 
     /** {@inheritDoc} */
+    @Override public void printUsage(Logger log) {
+        Command.usage(
+            log,
+            "Disable read-only mode on active cluster:",
+            READ_ONLY_DISABLE,
+            optional(CMD_AUTO_CONFIRMATION)
+        );
+    }
+
+    /** {@inheritDoc} */
     @Override public String name() {
-        return STATE.toCommandName();
+        return READ_ONLY_DISABLE.toCommandName();
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/StateCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/ClusterReadOnlyModeEnableCommand.java
similarity index 60%
copy from modules/core/src/main/java/org/apache/ignite/internal/commandline/StateCommand.java
copy to modules/core/src/main/java/org/apache/ignite/internal/commandline/ClusterReadOnlyModeEnableCommand.java
index d93dda3..ca3fe6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/StateCommand.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/ClusterReadOnlyModeEnableCommand.java
@@ -19,35 +19,25 @@ package org.apache.ignite.internal.commandline;
 
 import java.util.logging.Logger;
 import org.apache.ignite.internal.client.GridClient;
-import org.apache.ignite.internal.client.GridClientClusterState;
 import org.apache.ignite.internal.client.GridClientConfiguration;
 
-import static org.apache.ignite.internal.commandline.CommandList.STATE;
+import static org.apache.ignite.internal.commandline.CommandList.READ_ONLY_ENABLE;
+import static org.apache.ignite.internal.commandline.CommandLogger.optional;
+import static org.apache.ignite.internal.commandline.CommonArgParser.CMD_AUTO_CONFIRMATION;
 
 /**
- * Command to print cluster state.
+ * Command to enable cluster read-only mode.
  */
-public class StateCommand implements Command<Void> {
+public class ClusterReadOnlyModeEnableCommand implements Command<Void> {
     /** {@inheritDoc} */
-    @Override public void printUsage(Logger logger) {
-        Command.usage(logger, "Print current cluster state:", STATE);
-    }
-
-    /**
-     * Print cluster state.
-     *
-     * @param clientCfg Client configuration.
-     * @throws Exception If failed to print state.
-     */
-    @Override public Object execute(GridClientConfiguration clientCfg, Logger logger) throws Exception {
-        try (GridClient client = Command.startClient(clientCfg)){
-            GridClientClusterState state = client.state();
+    @Override public Object execute(GridClientConfiguration clientCfg, Logger log) throws Exception {
+        try (GridClient client = Command.startClient(clientCfg)) {
+            client.state().readOnly(true);
 
-            logger.info("Cluster is " + (state.active() ? "active" : "inactive"));
+            log.info("Cluster read-only mode enabled");
         }
         catch (Throwable e) {
-            if (!CommandHandler.isAuthError(e))
-                logger.severe("Failed to get cluster state.");
+            log.info("Failed to enable read-only mode");
 
             throw e;
         }
@@ -56,12 +46,27 @@ public class StateCommand implements Command<Void> {
     }
 
     /** {@inheritDoc} */
+    @Override public String confirmationPrompt() {
+        return "Warning: the command will enable read-only mode on a cluster.";
+    }
+
+    /** {@inheritDoc} */
     @Override public Void arg() {
         return null;
     }
 
     /** {@inheritDoc} */
+    @Override public void printUsage(Logger log) {
+        Command.usage(
+            log,
+            "Enable read-only mode on active cluster:",
+            READ_ONLY_ENABLE,
+            optional(CMD_AUTO_CONFIRMATION)
+        );
+    }
+
+    /** {@inheritDoc} */
     @Override public String name() {
-        return STATE.toCommandName();
+        return READ_ONLY_ENABLE.toCommandName();
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandList.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandList.java
index 8ce6356..055276b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandList.java
@@ -46,7 +46,13 @@ public enum CommandList {
     WAL("--wal", new WalCommands()),
 
     /** */
-    DIAGNOSTIC("--diagnostic", new DiagnosticCommand());
+    DIAGNOSTIC("--diagnostic", new DiagnosticCommand()),
+
+    /** */
+    READ_ONLY_ENABLE("--read-only-on", new ClusterReadOnlyModeEnableCommand()),
+
+    /** */
+    READ_ONLY_DISABLE("--read-only-off", new ClusterReadOnlyModeDisableCommand());
 
     /** Private values copy so there's no need in cloning it every time. */
     private static final CommandList[] VALUES = CommandList.values();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/StateCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/StateCommand.java
index d93dda3..510c31c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/StateCommand.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/StateCommand.java
@@ -39,15 +39,22 @@ public class StateCommand implements Command<Void> {
      * @param clientCfg Client configuration.
      * @throws Exception If failed to print state.
      */
-    @Override public Object execute(GridClientConfiguration clientCfg, Logger logger) throws Exception {
+    @Override public Object execute(GridClientConfiguration clientCfg, Logger log) throws Exception {
         try (GridClient client = Command.startClient(clientCfg)){
             GridClientClusterState state = client.state();
 
-            logger.info("Cluster is " + (state.active() ? "active" : "inactive"));
+            if (state.active()) {
+                if (state.readOnly())
+                    log.info("Cluster is active (read-only)");
+                else
+                    log.info("Cluster is active");
+            }
+            else
+                log.info("Cluster is inactive");
         }
         catch (Throwable e) {
             if (!CommandHandler.isAuthError(e))
-                logger.severe("Failed to get cluster state.");
+                log.severe("Failed to get cluster state.");
 
             throw e;
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/TxCommands.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/TxCommands.java
index 8f0f573..8c1d64e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/TxCommands.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/TxCommands.java
@@ -190,7 +190,7 @@ public class TxCommands implements Command<VisorTxTaskArg> {
 
     /** {@inheritDoc} */
     @Override public String confirmationPrompt() {
-        if (args.getOperation() == VisorTxOperation.KILL)
+        if (args != null && args.getOperation() == VisorTxOperation.KILL)
             return "Warning: the command will kill some transactions.";
 
         return null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java
index 774f922..6ea1bd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java
@@ -30,7 +30,9 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.resources.IgniteInstanceResource;
 
@@ -147,7 +149,12 @@ class JdbcBatchUpdateTask implements IgniteCallable<int[]> {
             }
         }
         catch (Exception ex) {
-            throw new BatchUpdateException(Arrays.copyOf(updCntrs, idx), ex);
+            IgniteSQLException sqlEx = X.cause(ex, IgniteSQLException.class);
+
+            if (sqlEx != null)
+                throw new BatchUpdateException(sqlEx.getMessage(), sqlEx.sqlState(), Arrays.copyOf(updCntrs, idx), ex);
+            else
+                throw new BatchUpdateException(Arrays.copyOf(updCntrs, idx), ex);
         }
 
         return updCntrs;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java
index 30a42bb..cd37016 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java
@@ -85,6 +85,13 @@ public class StateChangeRequest {
     }
 
     /**
+     * @return Read-only mode flag.
+     */
+    public boolean readOnly() {
+        return msg.readOnly();
+    }
+
+    /**
      * @return {@code True} if active state was changed.
      */
     public boolean activeChanged() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
index 33b20fe..766c9fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.internal.cluster.ClusterReadOnlyModeCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
@@ -35,6 +36,7 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_ALL;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isSystemCache;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.WRITE;
 
 /**
@@ -98,8 +100,8 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
 
         PartitionLossPolicy lossPlc = grp.config().getPartitionLossPolicy();
 
-        if (cctx.shared().readOnlyMode() && opType == WRITE)
-            return new IgniteCheckedException("Failed to perform cache operation (cluster is in read only mode)");
+        if (cctx.shared().readOnlyMode() && opType == WRITE && !isSystemCache(cctx.name()))
+            return new ClusterReadOnlyModeCheckedException("Failed to perform cache operation (cluster is in read-only mode)");
 
         if (grp.needsRecovery() && !recovery) {
             if (opType == WRITE && (lossPlc == READ_ONLY_SAFE || lossPlc == READ_ONLY_ALL))
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
index 6fdac22..bf8c464 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
@@ -115,6 +115,9 @@ public final class IgniteQueryErrorCode {
     /** Attempt to INSERT, UPDATE or MERGE value which scale exceed maximum DECIMAL column scale. */
     public static final int KEY_SCALE_OUT_OF_RANGE = 4010;
 
+    /** Attempt to INSERT, UPDATE or DELETE value on read-only cluster. */
+    public static final int CLUSTER_READ_ONLY_MODE_ENABLED = 4011;
+
     /* 5xxx - transactions related runtime errors. */
 
     /** Transaction is already open. */
@@ -199,6 +202,9 @@ public final class IgniteQueryErrorCode {
             case QUERY_CANCELED:
                 return SqlStateCode.QUERY_CANCELLED;
 
+            case CLUSTER_READ_ONLY_MODE_ENABLED:
+                return SqlStateCode.CLUSTER_READ_ONLY_MODE_ENABLED;
+
             default:
                 return SqlStateCode.INTERNAL_ERROR;
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
index 23f02ac..8fd2f66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
@@ -50,6 +50,9 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
     /** If true activate else deactivate. */
     private boolean activate;
 
+    /** If true read-only mode. */
+    private boolean readOnly;
+
     /** Configurations read from persistent store. */
     private List<StoredCacheData> storedCfgs;
 
@@ -75,6 +78,7 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
      * @param initiatingNodeId Node initiated state change.
      * @param storedCfgs Configurations read from persistent store.
      * @param activate New cluster state.
+     * @param readOnly New read-only mode flag.
      * @param baselineTopology Baseline topology.
      * @param forceChangeBaselineTopology Force change baseline topology flag.
      * @param timestamp Timestamp.
@@ -84,9 +88,11 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
         UUID initiatingNodeId,
         @Nullable List<StoredCacheData> storedCfgs,
         boolean activate,
+        boolean readOnly,
         BaselineTopology baselineTopology,
         boolean forceChangeBaselineTopology,
-        long timestamp) {
+        long timestamp
+    ) {
         assert reqId != null;
         assert initiatingNodeId != null;
 
@@ -94,6 +100,7 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
         this.initiatingNodeId = initiatingNodeId;
         this.storedCfgs = storedCfgs;
         this.activate = activate;
+        this.readOnly = readOnly;
         this.baselineTopology = baselineTopology;
         this.forceChangeBaselineTopology = forceChangeBaselineTopology;
         this.timestamp = timestamp;
@@ -157,8 +164,11 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
-        DiscoCache discoCache) {
+    @Override public DiscoCache createDiscoCache(
+        GridDiscoveryManager mgr,
+        AffinityTopologyVersion topVer,
+        DiscoCache discoCache
+    ) {
         return mgr.createDiscoCacheOnCacheChange(topVer, discoCache);
     }
 
@@ -177,6 +187,13 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage {
     }
 
     /**
+     * @return Read-only mode flag.
+     */
+    public boolean readOnly() {
+        return readOnly;
+    }
+
+    /**
      * @return Force change BaselineTopology flag.
      */
     public boolean forceChangeBaselineTopology() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
index 6f607d8..8beef24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -45,6 +46,12 @@ public class DiscoveryDataClusterState implements Serializable {
     /** Flag indicating if the cluster in in active state. */
     private final boolean active;
 
+    /** Flag indicating if the cluster in read-only mode. */
+    private final boolean readOnly;
+
+    /** Read-only mode change time. Correctly work's only for enabling read-only mode. */
+    private final long readOnlyChangeTime;
+
     /** Current cluster baseline topology. */
     @Nullable private final BaselineTopology baselineTopology;
 
@@ -86,12 +93,17 @@ public class DiscoveryDataClusterState implements Serializable {
      * @param active Current status.
      * @return State instance.
      */
-    static DiscoveryDataClusterState createState(boolean active, @Nullable BaselineTopology baselineTopology) {
-        return new DiscoveryDataClusterState(null, active, baselineTopology, null, null, null);
+    static DiscoveryDataClusterState createState(
+        boolean active,
+        boolean readOnly,
+        @Nullable BaselineTopology baselineTopology
+    ) {
+        return new DiscoveryDataClusterState(null, active, readOnly, baselineTopology, null, null, null);
     }
 
     /**
      * @param active New status.
+     * @param readOnly New read-only mode.
      * @param transitionReqId State change request ID.
      * @param transitionTopVer State change topology version.
      * @param transitionNodes Nodes participating in state change exchange.
@@ -100,6 +112,7 @@ public class DiscoveryDataClusterState implements Serializable {
     static DiscoveryDataClusterState createTransitionState(
         DiscoveryDataClusterState prevState,
         boolean active,
+        boolean readOnly,
         @Nullable BaselineTopology baselineTopology,
         UUID transitionReqId,
         AffinityTopologyVersion transitionTopVer,
@@ -113,15 +126,18 @@ public class DiscoveryDataClusterState implements Serializable {
         return new DiscoveryDataClusterState(
             prevState,
             active,
+            readOnly,
             baselineTopology,
             transitionReqId,
             transitionTopVer,
-            transitionNodes);
+            transitionNodes
+        );
     }
 
     /**
      * @param prevState Previous state. May be non-null only for transitional states.
      * @param active New state.
+     * @param readOnly New read-only mode.
      * @param transitionReqId State change request ID.
      * @param transitionTopVer State change topology version.
      * @param transitionNodes Nodes participating in state change exchange.
@@ -129,6 +145,7 @@ public class DiscoveryDataClusterState implements Serializable {
     private DiscoveryDataClusterState(
         DiscoveryDataClusterState prevState,
         boolean active,
+        boolean readOnly,
         @Nullable BaselineTopology baselineTopology,
         @Nullable UUID transitionReqId,
         @Nullable AffinityTopologyVersion transitionTopVer,
@@ -136,6 +153,8 @@ public class DiscoveryDataClusterState implements Serializable {
     ) {
         this.prevState = prevState;
         this.active = active;
+        this.readOnly = readOnly;
+        this.readOnlyChangeTime = U.currentTimeMillis();
         this.baselineTopology = baselineTopology;
         this.transitionReqId = transitionReqId;
         this.transitionTopVer = transitionTopVer;
@@ -190,6 +209,20 @@ public class DiscoveryDataClusterState implements Serializable {
     }
 
     /**
+     * @return Read only mode enabled flag.
+     */
+    public boolean readOnly() {
+        return readOnly;
+    }
+
+    /**
+     * @return Change time read-only mode.
+     */
+    public long readOnlyModeChangeTime() {
+        return readOnlyChangeTime;
+    }
+
+    /**
      * @return Baseline topology.
      */
     @Nullable public BaselineTopology baselineTopology() {
@@ -278,13 +311,13 @@ public class DiscoveryDataClusterState implements Serializable {
             new DiscoveryDataClusterState(
                 null,
                 active,
+                readOnly,
                 baselineTopology,
                 null,
                 null,
                 null
             ) :
-            prevState != null ? prevState :
-                    DiscoveryDataClusterState.createState(false, null);
+            prevState != null ? prevState : createState(false, false, null);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 7591103..ecbc2aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -230,6 +230,16 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
     }
 
     /** {@inheritDoc} */
+    @Override public boolean publicApiReadOnlyMode() {
+        return globalState.readOnly();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long readOnlyModeStateChangeTime() {
+        return globalState.readOnlyModeChangeTime();
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean publicApiActiveState(boolean waitForTransition) {
         return publicApiActiveStateAsync(waitForTransition).get();
     }
@@ -250,6 +260,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
                 return new IgniteFinishedFutureImpl<>(transitionRes);
             else {
                 GridFutureAdapter<Void> fut = transitionFuts.get(globalState.transitionRequestId());
+
                 if (fut != null) {
                     if (asyncWaitForTransition) {
                          return new IgniteFutureImpl<>(fut.chain(new C1<IgniteInternalFuture<Void>, Boolean>() {
@@ -368,7 +379,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         // Start first node as inactive if persistence is enabled.
         boolean activeOnStart = inMemoryMode && ctx.config().isActiveOnStart();
 
-        globalState = DiscoveryDataClusterState.createState(activeOnStart, null);
+        globalState = DiscoveryDataClusterState.createState(activeOnStart, false, null);
 
         ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
@@ -482,6 +493,17 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
 
             ctx.cache().onStateChangeFinish(msg);
 
+            boolean prev = ctx.cache().context().readOnlyMode();
+
+            if (prev != globalState.readOnly()) {
+                ctx.cache().context().readOnlyMode(globalState.readOnly());
+
+                if (globalState.readOnly())
+                    log.info("Read-only mode is enabled");
+                else
+                    log.info("Read-only mode is disabled");
+            }
+
             TransitionOnJoinWaitFuture joinFut = this.joinFut;
 
             if (joinFut != null)
@@ -512,10 +534,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
     ) {
         DiscoveryDataClusterState state = globalState;
 
-        if (log.isInfoEnabled())
-            U.log(log, "Received " + prettyStr(msg.activate()) + " request with BaselineTopology" +
-                (msg.baselineTopology() == null ? ": null"
-                    : "[id=" + msg.baselineTopology().id() + "]"));
+        U.log(
+            log,
+            "Received " + prettyStr(msg.activate(), msg.readOnly(), readOnlyChanged(state, msg.readOnly())) +
+                " request with BaselineTopology" +
+                (msg.baselineTopology() == null ? ": null" : "[id=" + msg.baselineTopology().id() + "]") +
+                " initiator node ID: " + msg.initiatorNodeId()
+        );
 
         if (msg.baselineTopology() != null)
             compatibilityMode = false;
@@ -588,6 +613,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
                 globalState = DiscoveryDataClusterState.createTransitionState(
                     prevState,
                     msg.activate(),
+                    msg.readOnly(),
                     msg.activate() ? msg.baselineTopology() : prevState.baselineTopology(),
                     msg.requestId(),
                     topVer,
@@ -634,7 +660,9 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
      * @return {@code True} if states are equivalent.
      */
     protected static boolean isEquivalent(ChangeGlobalStateMessage msg, DiscoveryDataClusterState state) {
-        return (msg.activate() == state.active() && BaselineTopology.equals(msg.baselineTopology(), state.baselineTopology()));
+        return msg.activate() == state.active() &&
+            msg.readOnly() == state.readOnly() &&
+            BaselineTopology.equals(msg.baselineTopology(), state.baselineTopology());
     }
 
     /** {@inheritDoc} */
@@ -644,8 +672,11 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
 
     /** {@inheritDoc} */
     @Override public DiscoveryDataClusterState pendingState(ChangeGlobalStateMessage stateMsg) {
-        return DiscoveryDataClusterState.createState(stateMsg.activate() || stateMsg.forceChangeBaselineTopology(),
-            stateMsg.baselineTopology());
+        return DiscoveryDataClusterState.createState(
+            stateMsg.activate() || stateMsg.forceChangeBaselineTopology(),
+            stateMsg.readOnly(),
+            stateMsg.baselineTopology()
+        );
     }
 
     /**
@@ -791,6 +822,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
 
             compatibilityMode = true;
 
+            ctx.cache().context().readOnlyMode(globalState.readOnly());
+
             return;
         }
 
@@ -808,6 +841,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
                 for (BaselineTopologyHistoryItem item : stateDiscoData.recentHistory.history())
                     bltHist.bufferHistoryItemForStore(item);
             }
+
+            ctx.cache().context().readOnlyMode(globalState.readOnly());
         }
     }
 
@@ -820,8 +855,57 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         return changeGlobalState(activate, baselineNodes, forceChangeBaselineTopology, false);
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> changeGlobalState(
+        boolean activate,
+        boolean readOnly,
+        Collection<? extends BaselineNode> baselineNodes,
+        boolean forceChangeBaselineTopology
+    ) {
+        return changeGlobalState(activate, readOnly, baselineNodes, forceChangeBaselineTopology, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> changeGlobalState(boolean readOnly) {
+        if (!publicApiActiveState(false))
+            return new GridFinishedFuture<>(new IgniteException("Cluster not active"));
+
+        DiscoveryDataClusterState state = globalState;
+
+        List<BaselineNode> bltNodes = state.hasBaselineTopology() ? state.baselineTopology().currentBaseline() : null;
+
+        return changeGlobalState(state.active(), readOnly, bltNodes, false, false);
+    }
+
+    /**
+     * @param activate New activate state.
+     * @param baselineNodes New BLT nodes.
+     * @param forceChangeBaselineTopology Force change BLT.
+     * @param isAutoAdjust Auto adjusting flag.
+     * @return Global change state future.
+     */
+    public IgniteInternalFuture<?> changeGlobalState(
+        final boolean activate,
+        Collection<? extends BaselineNode> baselineNodes,
+        boolean forceChangeBaselineTopology,
+        boolean isAutoAdjust
+    ) {
+        boolean readOnly = activate && globalState.readOnly();
+
+        return changeGlobalState(activate, readOnly, baselineNodes, forceChangeBaselineTopology, isAutoAdjust);
+    }
+
+    /**
+     * @param activate New activate state.
+     * @param readOnly Read-only mode.
+     * @param baselineNodes New BLT nodes.
+     * @param forceChangeBaselineTopology Force change BLT.
+     * @param isAutoAdjust Auto adjusting flag.
+     * @return Global change state future.
+     */
     public IgniteInternalFuture<?> changeGlobalState(
         final boolean activate,
+        boolean readOnly,
         Collection<? extends BaselineNode> baselineNodes,
         boolean forceChangeBaselineTopology,
         boolean isAutoAdjust
@@ -829,15 +913,17 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         BaselineTopology newBlt = (compatibilityMode && !forceChangeBaselineTopology) ? null :
             calculateNewBaselineTopology(activate, baselineNodes, forceChangeBaselineTopology);
 
-        return changeGlobalState0(activate, newBlt, forceChangeBaselineTopology, isAutoAdjust);
+        return changeGlobalState0(activate, readOnly, newBlt, forceChangeBaselineTopology, isAutoAdjust);
     }
 
     /**
      *
      */
-    private BaselineTopology calculateNewBaselineTopology(final boolean activate,
+    private BaselineTopology calculateNewBaselineTopology(
+        final boolean activate,
         Collection<? extends BaselineNode> baselineNodes,
-        boolean forceChangeBaselineTopology) {
+        boolean forceChangeBaselineTopology
+    ) {
         BaselineTopology newBlt;
 
         BaselineTopology currentBlt = globalState.baselineTopology();
@@ -897,14 +983,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
     }
 
     /** */
-    private IgniteInternalFuture<?> changeGlobalState0(final boolean activate,
-        BaselineTopology blt, boolean forceChangeBaselineTopology) {
-        return changeGlobalState0(activate, blt, forceChangeBaselineTopology, false);
-    }
-
-    /** */
-    private IgniteInternalFuture<?> changeGlobalState0(final boolean activate,
-        BaselineTopology blt, boolean forceChangeBaselineTopology, boolean isAutoAdjust) {
+    private IgniteInternalFuture<?> changeGlobalState0(
+        final boolean activate,
+        boolean readOnly,
+        BaselineTopology blt,
+        boolean forceChangeBaselineTopology,
+        boolean isAutoAdjust
+    ) {
         boolean isBaselineAutoAdjustEnabled = isBaselineAutoAdjustEnabled();
 
         if (forceChangeBaselineTopology && isBaselineAutoAdjustEnabled != isAutoAdjust)
@@ -913,19 +998,24 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         if (ctx.isDaemon() || ctx.clientNode()) {
             GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
 
-            sendComputeChangeGlobalState(activate, blt, forceChangeBaselineTopology, fut);
+            sendComputeChangeGlobalState(activate, readOnly, blt, forceChangeBaselineTopology, fut);
 
             return fut;
         }
 
         if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null) {
-            return new GridFinishedFuture<>(new IgniteCheckedException("Failed to " + prettyStr(activate) +
-                " cluster (must invoke the method outside of an active transaction)."));
+            return new GridFinishedFuture<>(
+                new IgniteCheckedException("Failed to " +
+                    prettyStr(activate, readOnly, readOnlyChanged(globalState, readOnly)) +
+                    " (must invoke the method outside of an active transaction).")
+            );
         }
 
         DiscoveryDataClusterState curState = globalState;
 
-        if (!curState.transition() && curState.active() == activate
+        if (!curState.transition() &&
+            curState.active() == activate &&
+            curState.readOnly() == readOnly
             && (!activate || BaselineTopology.equals(curState.baselineTopology(), blt)))
             return new GridFinishedFuture<>();
 
@@ -934,7 +1024,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         GridChangeGlobalStateFuture fut = stateChangeFut.get();
 
         while (fut == null || fut.isDone()) {
-            fut = new GridChangeGlobalStateFuture(UUID.randomUUID(), activate, ctx);
+            fut = new GridChangeGlobalStateFuture(UUID.randomUUID(), activate, readOnly, ctx);
 
             if (stateChangeFut.compareAndSet(null, fut)) {
                 startedFut = fut;
@@ -946,9 +1036,14 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         }
 
         if (startedFut == null) {
-            if (fut.activate != activate) {
-                return new GridFinishedFuture<>(new IgniteCheckedException("Failed to " + prettyStr(activate) +
-                    ", because another state change operation is currently in progress: " + prettyStr(fut.activate)));
+            if (fut.activate != activate && fut.readOnly != readOnly) {
+                return new GridFinishedFuture<>(
+                    new IgniteCheckedException(
+                        "Failed to " + prettyStr(activate, readOnly, readOnlyChanged(globalState, readOnly)) +
+                        ", because another state change operation is currently in progress: " +
+                            prettyStr(fut.activate, fut.readOnly, readOnlyChanged(globalState, fut.readOnly))
+                    )
+                );
             }
             else
                 return fut;
@@ -988,6 +1083,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
             ctx.localNodeId(),
             storedCfgs,
             activate,
+            readOnly,
             blt,
             forceChangeBaselineTopology,
             System.currentTimeMillis()
@@ -996,15 +1092,22 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         IgniteInternalFuture<?> resFut = wrapStateChangeFuture(startedFut, msg);
 
         try {
-            if (log.isInfoEnabled())
-                U.log(log, "Sending " + prettyStr(activate) + " request with BaselineTopology " + blt);
+            U.log(
+                log,
+                "Sending " + prettyStr(activate, readOnly, readOnlyChanged(globalState, readOnly)) +
+                    " request with BaselineTopology " + blt
+            );
 
             ctx.discovery().sendCustomEvent(msg);
 
             if (ctx.isStopping()) {
-                String errMsg = "Failed to execute " + prettyStr(activate) + " request, node is stopping.";
-
-                startedFut.onDone(new IgniteCheckedException(errMsg));
+                startedFut.onDone(
+                    new IgniteCheckedException(
+                        "Failed to execute " +
+                            prettyStr(activate, readOnly, readOnlyChanged(globalState, readOnly)) +
+                            " request , node is stopping."
+                    )
+                );
             }
         }
         catch (IgniteCheckedException e) {
@@ -1017,10 +1120,20 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node, DiscoveryDataBag.JoiningNodeDiscoveryData discoData) {
+    @Nullable @Override public IgniteNodeValidationResult validateNode(
+        ClusterNode node,
+        DiscoveryDataBag.JoiningNodeDiscoveryData discoData
+    ) {
         if (node.isClient() || node.isDaemon())
             return null;
 
+        if (globalState.readOnly() && !IgniteFeatures.nodeSupports(node, IgniteFeatures.CLUSTER_READ_ONLY_MODE)) {
+            String msg = "Node not supporting cluster read-only mode is not allowed to join the cluster with enabled" +
+                " read-only mode";
+
+            return new IgniteNodeValidationResult(node.id(), msg, msg);
+        }
+
         if (discoData.joiningNodeData() == null) {
             if (globalState.baselineTopology() != null) {
                 String msg = "Node not supporting BaselineTopology" +
@@ -1130,28 +1243,31 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
 
     /**
      * @param activate New cluster state.
+     * @param readOnly New read-only mode.
      * @param resFut State change future.
      */
     private void sendComputeChangeGlobalState(
         boolean activate,
+        boolean readOnly,
         BaselineTopology blt,
         boolean forceBlt,
         final GridFutureAdapter<Void> resFut
     ) {
         AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
 
-        if (log.isInfoEnabled()) {
-            log.info("Sending " + prettyStr(activate) + " request from node [id=" + ctx.localNodeId() +
+        U.log(
+            log,
+            "Sending " + prettyStr(activate, readOnly, readOnlyChanged(globalState, readOnly)) +
+                " request from node [id=" + ctx.localNodeId() +
                 ", topVer=" + topVer +
                 ", client=" + ctx.clientNode() +
-                ", daemon=" + ctx.isDaemon() + "]");
-        }
+                ", daemon=" + ctx.isDaemon() + "]"
+        );
 
         IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute();
 
-        IgniteFuture<Void> fut = comp.runAsync(
-            new ClientChangeGlobalStateComputeRequest(activate, blt, forceBlt)
-        );
+        IgniteFuture<Void> fut =
+            comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate, readOnly, blt, forceBlt));
 
         fut.listen(new CI1<IgniteFuture>() {
             @Override public void apply(IgniteFuture fut) {
@@ -1222,7 +1338,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
 
         if (fut != null) {
             IgniteCheckedException e = new IgniteCheckedException(
-                "Failed to " + prettyStr(req.activate()) + " cluster",
+                "Failed to " + prettyStr(req.activate(), req.readOnly(), readOnlyChanged(globalState, req.readOnly())),
                 null,
                 false
             );
@@ -1299,8 +1415,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
     }
 
     /** {@inheritDoc} */
-    @Override public void onBaselineTopologyChanged
-    (
+    @Override public void onBaselineTopologyChanged(
         BaselineTopology blt,
         BaselineTopologyHistoryItem prevBltHistItem
     ) throws IgniteCheckedException {
@@ -1386,7 +1501,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         DiscoveryDataClusterState state = globalState;
 
         if (!state.active() && !state.transition() && state.baselineTopology() == null) {
-            DiscoveryDataClusterState newState = DiscoveryDataClusterState.createState(false, blt);
+            DiscoveryDataClusterState newState = DiscoveryDataClusterState.createState(false, false, blt);
 
             globalState = newState;
         }
@@ -1442,6 +1557,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
                     nodeId,
                     null,
                     true,
+                    oldState.readOnly(),
                     newBlt,
                     true,
                     System.currentTimeMillis()
@@ -1482,6 +1598,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
                 ctx.localNodeId(),
                 null,
                 true,
+                clusterState.readOnly(),
                 blt,
                 true,
                 System.currentTimeMillis()
@@ -1602,6 +1719,25 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         return activate ? "activate" : "deactivate";
     }
 
+    /**
+     * @param activate Activate flag.
+     * @param readOnly Read-only flag.
+     * @param readOnlyChanged Read only state changed.
+     * @return Activate or read-only message string.
+     */
+    private static String prettyStr(boolean activate, boolean readOnly, boolean readOnlyChanged) {
+        return readOnlyChanged ? prettyStr(readOnly) + " read-only mode" : prettyStr(activate) + " cluster";
+    }
+
+    /**
+     * @param curState Current cluster state.
+     * @param newReadOnly New read-only mode value.
+     * @return {@code True} if read-only mode changed and {@code False} otherwise.
+     */
+    private boolean readOnlyChanged(DiscoveryDataClusterState curState, boolean newReadOnly) {
+        return curState.readOnly() != newReadOnly;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridClusterStateProcessor.class, this);
@@ -1618,6 +1754,9 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         /** Activate. */
         private final boolean activate;
 
+        /** Read only. */
+        private final boolean readOnly;
+
         /** Nodes. */
         @GridToStringInclude
         private final Set<UUID> remaining = new HashSet<>();
@@ -1647,9 +1786,10 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
          * @param activate New cluster state.
          * @param ctx Context.
          */
-        GridChangeGlobalStateFuture(UUID requestId, boolean activate, GridKernalContext ctx) {
+        GridChangeGlobalStateFuture(UUID requestId, boolean activate, boolean readOnly, GridKernalContext ctx) {
             this.requestId = requestId;
             this.activate = activate;
+            this.readOnly = readOnly;
             this.ctx = ctx;
 
             log = ctx.log(getClass());
@@ -1770,6 +1910,9 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         private final boolean activate;
 
         /** */
+        private final boolean readOnly;
+
+        /** */
         private final BaselineTopology baselineTopology;
 
         /** */
@@ -1782,8 +1925,14 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
         /**
          * @param activate New cluster state.
          */
-        private ClientChangeGlobalStateComputeRequest(boolean activate, BaselineTopology blt, boolean forceBlt) {
+        private ClientChangeGlobalStateComputeRequest(
+            boolean activate,
+            boolean readOnly,
+            BaselineTopology blt,
+            boolean forceBlt
+        ) {
             this.activate = activate;
+            this.readOnly = readOnly;
             this.baselineTopology = blt;
             this.forceChangeBaselineTopology = forceBlt;
         }
@@ -1793,6 +1942,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
             try {
                 ig.context().state().changeGlobalState(
                     activate,
+                    readOnly,
                     baselineTopology != null ? baselineTopology.currentBaseline() : null,
                     forceChangeBaselineTopology
                 ).get();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
index d71b4cf..37aff52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
@@ -47,6 +47,16 @@ public interface IGridClusterStateProcessor extends GridProcessor {
     IgniteFuture<Boolean> publicApiActiveStateAsync(boolean waitForTransition);
 
     /**
+     * @return Grid read only mode to be used on public API.
+     */
+    boolean publicApiReadOnlyMode();
+
+    /**
+     * @return Time change of read only mode to be used on public API.
+     */
+    long readOnlyModeStateChangeTime();
+
+    /**
      * @param discoCache Discovery data cache.
      * @return If transition is in progress returns future which is completed when transition finishes.
      */
@@ -90,15 +100,37 @@ public interface IGridClusterStateProcessor extends GridProcessor {
 
     /**
      * @param activate New cluster state.
+     * @param baselineNodes New baseline nodes.
+     * @param forceChangeBaselineTopology Force change baseline topology.
+     * @return State change future.
+     */
+    IgniteInternalFuture<?> changeGlobalState(
+        boolean activate,
+        Collection<? extends BaselineNode> baselineNodes,
+        boolean forceChangeBaselineTopology
+    );
+
+    /**
+     * @param activate New cluster state.
+     * @param readOnly Enable read-only mode.
+     * @param baselineNodes New baseline nodes.
+     * @param forceChangeBaselineTopology Force change baseline topology.
      * @return State change future.
      */
     IgniteInternalFuture<?> changeGlobalState(
         boolean activate,
+        boolean readOnly,
         Collection<? extends BaselineNode> baselineNodes,
         boolean forceChangeBaselineTopology
     );
 
     /**
+     * @param readOnly Enable/disable read-only mode.
+     * @return State change future.
+     */
+    IgniteInternalFuture<?> changeGlobalState(boolean readOnly);
+
+    /**
      * @param errs Errors.
      * @param req State change request.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 4f8ae3d..ccb0f1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.cluster.ClusterReadOnlyModeCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -425,7 +426,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
     /**
      * Acquires read or write lock.
-     * 
+     *
      * @param writeLock {@code True} if acquires write lock.
      */
     private void lock(boolean writeLock) {
@@ -963,6 +964,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                                     resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
                                         + remaps, e1));
                                 }
+                                else if (X.hasCause(e1, ClusterReadOnlyModeCheckedException.class)) {
+                                    resFut.onDone(new ClusterReadOnlyModeCheckedException(
+                                        "Failed to finish operation. Cluster in read-only mode!",
+                                        e1
+                                    ));
+                                }
                                 else {
                                     try {
                                         remapSem.acquire();
@@ -1198,6 +1205,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                             log.debug("Failed to flush buffer: " + e);
 
                         err = true;
+
+                        if (X.cause(e, ClusterReadOnlyModeCheckedException.class) != null)
+                            throw e;
                     }
                 }
 
@@ -2060,9 +2070,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
                     final String msg = "DataStreamer request failed [node=" + nodeId + "]";
 
-                    err = cause instanceof ClusterTopologyCheckedException ?
-                        new ClusterTopologyCheckedException(msg, cause) :
-                        new IgniteCheckedException(msg, cause);
+                    if (cause instanceof ClusterTopologyCheckedException)
+                        err = new ClusterTopologyCheckedException(msg, cause);
+                    else if (X.hasCause(cause, ClusterReadOnlyModeCheckedException.class))
+                        err = new ClusterReadOnlyModeCheckedException(msg, cause);
+                    else
+                        err = new IgniteCheckedException(msg, cause);
                 }
                 catch (IgniteCheckedException e) {
                     f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlStateCode.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlStateCode.java
index bc7fbf3..0648fb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlStateCode.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlStateCode.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.odbc;
 
+import org.apache.ignite.IgniteCluster;
+
 /**
  * SQL state codes.
  */
@@ -73,6 +75,12 @@ public final class SqlStateCode {
     /** Internal error. */
     public static final String INTERNAL_ERROR = "50000";  // Generic value for custom "50" class.
 
+    /**
+     * Read only mode enabled on cluster. {@link IgniteCluster#readOnly()}.
+     * Value is equal to {@code org.h2.api.ErrorCode#DATABASE_IS_READ_ONLY} code.
+     */
+    public static final String CLUSTER_READ_ONLY_MODE_ENABLED = "90097";
+
     /** Query canceled. */
     public static final String QUERY_CANCELLED = "57014";
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java
index 6864cad..7531917 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java
@@ -177,6 +177,15 @@ public enum GridRestCommand {
     CLUSTER_CURRENT_STATE("currentstate"),
 
     /** */
+    CLUSTER_CURRENT_READ_ONLY_MODE("currentreadonlymode"),
+
+    /** */
+    CLUSTER_READ_ONLY_ENABLE("readonlyenable"),
+
+    /** */
+    CLUSTER_READ_ONLY_DISABLE("readonlydisable"),
+
+    /** */
     BASELINE_CURRENT_STATE("baseline"),
 
     /** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index 2780e8b..82faa86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandle
 import org.apache.ignite.internal.processors.rest.handlers.auth.AuthenticationCommandHandler;
 import org.apache.ignite.internal.processors.rest.handlers.cache.GridCacheCommandHandler;
 import org.apache.ignite.internal.processors.rest.handlers.cluster.GridBaselineCommandHandler;
+import org.apache.ignite.internal.processors.rest.handlers.cluster.GridChangeReadOnlyModeCommandHandler;
 import org.apache.ignite.internal.processors.rest.handlers.cluster.GridChangeStateCommandHandler;
 import org.apache.ignite.internal.processors.rest.handlers.memory.MemoryMetricsCommandHandler;
 import org.apache.ignite.internal.processors.rest.handlers.datastructures.DataStructuresCommandHandler;
@@ -542,6 +543,7 @@ public class GridRestProcessor extends GridProcessorAdapter {
             addHandler(new QueryCommandHandler(ctx));
             addHandler(new GridLogCommandHandler(ctx));
             addHandler(new GridChangeStateCommandHandler(ctx));
+            addHandler(new GridChangeReadOnlyModeCommandHandler(ctx));
             addHandler(new AuthenticationCommandHandler(ctx));
             addHandler(new UserActionCommandHandler(ctx));
             addHandler(new GridBaselineCommandHandler(ctx));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientReadOnlyModeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientReadOnlyModeRequest.java
new file mode 100644
index 0000000..093be35
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientReadOnlyModeRequest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.rest.client.message;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ *
+ */
+public class  GridClientReadOnlyModeRequest extends GridClientAbstractMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Request current state. */
+    private boolean reqCurrentState;
+
+    /** Read only. */
+    private boolean readOnly;
+
+    /** */
+    public boolean isReqCurrentState() {
+        return reqCurrentState;
+    }
+
+    /** */
+    public boolean readOnly() {
+        return readOnly;
+    }
+
+    /**
+     * @return Current read-only mode request.
+     */
+    public static GridClientReadOnlyModeRequest currentReadOnlyMode() {
+        GridClientReadOnlyModeRequest msg = new GridClientReadOnlyModeRequest();
+
+        msg.reqCurrentState = true;
+
+        return msg;
+    }
+
+    /**
+     * @return Enable read-only mode request.
+     */
+    public static GridClientReadOnlyModeRequest enableReadOnly() {
+        GridClientReadOnlyModeRequest msg = new GridClientReadOnlyModeRequest();
+
+        msg.readOnly = true;
+
+        return msg;
+    }
+
+    /**
+     * @return Disable read-only mode request.
+     */
+    public static GridClientReadOnlyModeRequest disableReadOnly() {
+        GridClientReadOnlyModeRequest msg = new GridClientReadOnlyModeRequest();
+
+        msg.readOnly = false;
+
+        return msg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+
+        out.writeBoolean(reqCurrentState);
+        out.writeBoolean(readOnly);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+
+        reqCurrentState = in.readBoolean();
+        readOnly = in.readBoolean();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeReadOnlyModeCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeReadOnlyModeCommandHandler.java
new file mode 100644
index 0000000..a220361
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeReadOnlyModeCommandHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.rest.handlers.cluster;
+
+import java.util.Collection;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.rest.GridRestCommand;
+import org.apache.ignite.internal.processors.rest.GridRestResponse;
+import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandlerAdapter;
+import org.apache.ignite.internal.processors.rest.request.GridRestReadOnlyChangeModeRequest;
+import org.apache.ignite.internal.processors.rest.request.GridRestRequest;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.rest.GridRestCommand.CLUSTER_CURRENT_READ_ONLY_MODE;
+import static org.apache.ignite.internal.processors.rest.GridRestCommand.CLUSTER_READ_ONLY_DISABLE;
+import static org.apache.ignite.internal.processors.rest.GridRestCommand.CLUSTER_READ_ONLY_ENABLE;
+
+/**
+ *
+ */
+public class GridChangeReadOnlyModeCommandHandler extends GridRestCommandHandlerAdapter {
+    /** Commands. */
+    private static final Collection<GridRestCommand> COMMANDS =
+        U.sealList(CLUSTER_CURRENT_READ_ONLY_MODE, CLUSTER_READ_ONLY_DISABLE, CLUSTER_READ_ONLY_ENABLE);
+
+    /**
+     * @param ctx Context.
+     */
+    public GridChangeReadOnlyModeCommandHandler(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridRestCommand> supportedCommands() {
+        return COMMANDS;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<GridRestResponse> handleAsync(GridRestRequest restReq) {
+        GridRestReadOnlyChangeModeRequest req = (GridRestReadOnlyChangeModeRequest)restReq;
+
+        final GridFutureAdapter<GridRestResponse> fut = new GridFutureAdapter<>();
+
+        final GridRestResponse res = new GridRestResponse();
+
+        try {
+            switch (req.command()) {
+                case CLUSTER_CURRENT_READ_ONLY_MODE:
+                    res.setResponse(ctx.grid().cluster().readOnly());
+
+                    break;
+
+                default:
+                    if (req.readOnly())
+                        U.log(log, "Received enable read-only mode request from client node with ID: " + req.clientId());
+                    else
+                        U.log(log, "Received disable read-only mode request from client node with ID: " + req.clientId());
+
+                    ctx.grid().cluster().readOnly(req.readOnly());
+
+                    res.setResponse(req.command().key() + " done");
+
+                    break;
+            }
+
+            fut.onDone(res);
+        }
+        catch (Exception e) {
+            SB sb = new SB();
+
+            sb.a(e.getMessage()).a("\n").a("suppressed: \n");
+
+            for (Throwable t : X.getSuppressedList(e))
+                sb.a(t.getMessage()).a("\n");
+
+            res.setError(sb.toString());
+
+            fut.onDone(res);
+        }
+        return fut;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
index b6c1e19..e6012e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.rest.client.message.GridClientHands
 import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeResponse;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientPingPacket;
+import org.apache.ignite.internal.processors.rest.client.message.GridClientReadOnlyModeRequest;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientResponse;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientStateRequest;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientTaskRequest;
@@ -48,6 +49,7 @@ import org.apache.ignite.internal.processors.rest.protocols.tcp.redis.GridRedisM
 import org.apache.ignite.internal.processors.rest.protocols.tcp.redis.GridRedisNioListener;
 import org.apache.ignite.internal.processors.rest.request.GridRestCacheRequest;
 import org.apache.ignite.internal.processors.rest.request.GridRestChangeStateRequest;
+import org.apache.ignite.internal.processors.rest.request.GridRestReadOnlyChangeModeRequest;
 import org.apache.ignite.internal.processors.rest.request.GridRestRequest;
 import org.apache.ignite.internal.processors.rest.request.GridRestTaskRequest;
 import org.apache.ignite.internal.processors.rest.request.GridRestTopologyRequest;
@@ -69,6 +71,9 @@ import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_P
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_REMOVE;
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_REMOVE_ALL;
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_REPLACE;
+import static org.apache.ignite.internal.processors.rest.GridRestCommand.CLUSTER_CURRENT_READ_ONLY_MODE;
+import static org.apache.ignite.internal.processors.rest.GridRestCommand.CLUSTER_READ_ONLY_DISABLE;
+import static org.apache.ignite.internal.processors.rest.GridRestCommand.CLUSTER_READ_ONLY_ENABLE;
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXE;
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.NODE;
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.NOOP;
@@ -378,6 +383,22 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
 
             restReq = restChangeReq;
         }
+        else if (msg instanceof GridClientReadOnlyModeRequest) {
+            GridClientReadOnlyModeRequest req = (GridClientReadOnlyModeRequest)msg;
+
+            GridRestReadOnlyChangeModeRequest restChangeReq = new GridRestReadOnlyChangeModeRequest();
+
+            if (req.isReqCurrentState()) {
+                restChangeReq.reqCurrentMode();
+                restChangeReq.command(CLUSTER_CURRENT_READ_ONLY_MODE);
+            }
+            else {
+                restChangeReq.readOnly(req.readOnly());
+                restChangeReq.command(req.readOnly() ? CLUSTER_READ_ONLY_ENABLE : CLUSTER_READ_ONLY_DISABLE);
+            }
+
+            restReq = restChangeReq;
+        }
 
         if (restReq != null) {
             restReq.destinationId(msg.destinationId());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestReadOnlyChangeModeRequest.java
similarity index 57%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestReadOnlyChangeModeRequest.java
index 4fa25ce..6d1e7f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestReadOnlyChangeModeRequest.java
@@ -15,19 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client;
+package org.apache.ignite.internal.processors.rest.request;
 
 /**
- *  Interface for manage state of grid cluster.
+ *
  */
-public interface GridClientClusterState {
-    /**
-     * @param active {@code True} activate, {@code False} deactivate.
-     */
-    public void active(boolean active) throws GridClientException;
+public class GridRestReadOnlyChangeModeRequest extends GridRestRequest {
+    /** Request current state. */
+    private boolean reqCurrentMode;
+
+    /** Read only. */
+    private boolean readOnly;
+
+    /** */
+    public void reqCurrentMode() {
+        reqCurrentMode = true;
+    }
+
+    /** */
+    public boolean isReqCurrentMode() {
+        return reqCurrentMode;
+    }
+
+    /** */
+    public void readOnly(boolean readOnly) {
+        this.readOnly = readOnly;
+    }
 
-    /**
-     * @return {@code Boolean} - Current cluster state. {@code True} active, {@code False} inactive.
-     */
-    public boolean active() throws GridClientException;
+    /** */
+    public boolean readOnly() {
+        return readOnly;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
index 84fcbea..e83b0c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
@@ -505,4 +505,33 @@ public interface IgniteMXBean {
     @MXBeanParametersNames("registry")
     @MXBeanParametersDescriptions("Metrics registry.")
     public void resetMetrics(String registry);
+
+
+    /**
+     * Gets cluster read-only mode status.
+     *
+     * @return {@code true} if cluster active and read-only mode enabled, and {@code false} otherwise.
+     */
+    @MXBeanDescription("Cluster read-only mode status.")
+    boolean readOnlyMode();
+
+    /**
+     * Enable or disable cluster read-only mode. If {@code readOnly} flag is {@code true} read-only mode will be
+     * enabled. If {@code readOnly} flag is {@code false} read-only mode will be disabled.
+     *
+     * @param readOnly enable/disable cluster read-only mode flag.
+     */
+    @MXBeanDescription("Enable or disable cluster read-only mode.")
+    @MXBeanParametersNames("readOnly")
+    @MXBeanParametersDescriptions("True - enable read-only mode, false - disable read-only mode.")
+    void readOnlyMode(boolean readOnly);
+
+    /**
+     * Gets duration of read-only mode enabled on cluster.
+     *
+     * @return {@code 0} if cluster read-only mode disabled, and time in milliseconds since enabling cluster read-only
+     * mode.
+     */
+    @MXBeanDescription("Duration of read-only mode enabled on cluster.")
+    long getReadOnlyModeDuration();
 }
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index bb2747b..9e58951 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1658,6 +1658,7 @@ org.apache.ignite.internal.processors.rest.client.message.GridClientMessage
 org.apache.ignite.internal.processors.rest.client.message.GridClientNodeBean
 org.apache.ignite.internal.processors.rest.client.message.GridClientNodeMetricsBean
 org.apache.ignite.internal.processors.rest.client.message.GridClientPingPacket
+org.apache.ignite.internal.processors.rest.client.message.GridClientReadOnlyModeRequest
 org.apache.ignite.internal.processors.rest.client.message.GridClientResponse
 org.apache.ignite.internal.processors.rest.client.message.GridClientStateRequest
 org.apache.ignite.internal.processors.rest.client.message.GridClientTaskRequest
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
index 04782d9..468672a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.commandline;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -39,6 +38,7 @@ import org.apache.ignite.internal.visor.tx.VisorTxSortOrder;
 import org.apache.ignite.internal.visor.tx.VisorTxTaskArg;
 import org.apache.ignite.testframework.junits.SystemPropertiesRule;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.jetbrains.annotations.Nullable;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -56,9 +56,9 @@ import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.FIND
 import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.VALIDATE_INDEXES;
 import static org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_FIRST;
 import static org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_THROUGH;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -85,7 +85,7 @@ public class CommandHandlerParsingTest {
             int expectedCheckThrough = 11;
             UUID nodeId = UUID.randomUUID();
 
-            ConnectionAndSslParameters args = parseArgs(Arrays.asList(
+            ConnectionAndSslParameters args = parseArgs(asList(
                 CACHE.text(),
                 VALIDATE_INDEXES.text(),
                 "cache1, cache2",
@@ -114,7 +114,7 @@ public class CommandHandlerParsingTest {
             int expectedParam = 11;
             UUID nodeId = UUID.randomUUID();
 
-            ConnectionAndSslParameters args = parseArgs(Arrays.asList(
+            ConnectionAndSslParameters args = parseArgs(asList(
                     CACHE.text(),
                     VALIDATE_INDEXES.text(),
                     nodeId.toString(),
@@ -137,35 +137,11 @@ public class CommandHandlerParsingTest {
             e.printStackTrace();
         }
 
-        try {
-            parseArgs(
-                Arrays.asList(
-                    CACHE.text(),
-                    VALIDATE_INDEXES.text(),
-                    CHECK_FIRST.toString(),
-                    "0"
-                )
-            );
-
-            fail("Expected exception hasn't been thrown");
-        }
-        catch (IllegalArgumentException e) {
-            e.printStackTrace();
-        }
-
-        try {
-            parseArgs(Arrays.asList(CACHE.text(), VALIDATE_INDEXES.text(), CHECK_THROUGH.toString()));
-
-            fail("Expected exception hasn't been thrown");
-        }
-        catch (IllegalArgumentException e) {
-            e.printStackTrace();
-        }
+        assertParseArgsThrows("Value for '--check-first' property should be positive.", CACHE.text(), VALIDATE_INDEXES.text(), CHECK_FIRST.toString(), "0");
+        assertParseArgsThrows("Numeric value for '--check-through' parameter expected.", CACHE.text(), VALIDATE_INDEXES.text(), CHECK_THROUGH.toString());
     }
 
-    /**
-     *
-     */
+    /** */
     @Test
     public void testFindAndDeleteGarbage() {
         String nodeId = UUID.randomUUID().toString();
@@ -188,26 +164,23 @@ public class CommandHandlerParsingTest {
 
             FindAndDeleteGarbage.Arguments arg = (FindAndDeleteGarbage.Arguments)subcommand.subcommand().arg();
 
-            if (list.contains(nodeId)) {
+            if (list.contains(nodeId))
                 assertEquals("nodeId parameter unexpected value", nodeId, arg.nodeId().toString());
-            }
-            else {
+            else
                 assertNull(arg.nodeId());
-            }
 
             assertEquals(list.contains(delete), arg.delete());
 
-            if (list.contains(groups)) {
+            if (list.contains(groups))
                 assertEquals(3, arg.groups().size());
-            }
-            else {
+            else
                 assertNull(arg.groups());
-            }
         }
     }
 
+    /** */
     private List<List<String>> generateArgumentList(String subcommand, T2<String, Boolean>...optional) {
-        List<List<T2<String, Boolean>>> lists = generateAllCombinations(Arrays.asList(optional), (x) -> x.get2());
+        List<List<T2<String, Boolean>>> lists = generateAllCombinations(asList(optional), (x) -> x.get2());
 
         ArrayList<List<String>> res = new ArrayList<>();
 
@@ -229,6 +202,7 @@ public class CommandHandlerParsingTest {
         return res;
     }
 
+    /** */
     private <T> List<List<T>> generateAllCombinations(List<T> source, Predicate<T> stopFunc) {
         List<List<T>> res = new ArrayList<>();
 
@@ -243,12 +217,12 @@ public class CommandHandlerParsingTest {
         return res;
     }
 
+    /** */
     private <T> void generateAllCombinations(List<T> res, List<T> source, Predicate<T> stopFunc, List<List<T>> acc) {
         acc.add(res);
 
-        if (stopFunc != null && stopFunc.test(res.get(res.size() - 1))) {
+        if (stopFunc != null && stopFunc.test(res.get(res.size() - 1)))
             return;
-        }
 
         if (source.size() == 1) {
             ArrayList<T> list = new ArrayList<>(res);
@@ -282,14 +256,7 @@ public class CommandHandlerParsingTest {
             if (cmd == CommandList.CACHE || cmd == CommandList.WAL)
                 continue; // --cache subcommand requires its own specific arguments.
 
-            try {
-                parseArgs(asList("--truststore"));
-
-                fail("expected exception: Expected truststore");
-            }
-            catch (IllegalArgumentException e) {
-                e.printStackTrace();
-            }
+            assertParseArgsThrows("Expected SSL trust store path", "--truststore");
 
             ConnectionAndSslParameters args = parseArgs(asList("--keystore", "testKeystore", "--keystore-password", "testKeystorePassword", "--keystore-type", "testKeystoreType",
                 "--truststore", "testTruststore", "--truststore-password", "testTruststorePassword", "--truststore-type", "testTruststoreType",
@@ -318,23 +285,8 @@ public class CommandHandlerParsingTest {
             if (cmd == CommandList.CACHE || cmd == CommandList.WAL)
                 continue; // --cache subcommand requires its own specific arguments.
 
-            try {
-                parseArgs(asList("--user"));
-
-                fail("expected exception: Expected user name");
-            }
-            catch (IllegalArgumentException e) {
-                e.printStackTrace();
-            }
-
-            try {
-                parseArgs(asList("--password"));
-
-                fail("expected exception: Expected password");
-            }
-            catch (IllegalArgumentException e) {
-                e.printStackTrace();
-            }
+            assertParseArgsThrows("Expected user name", "--user");
+            assertParseArgsThrows("Expected password", "--password");
 
             ConnectionAndSslParameters args = parseArgs(asList("--user", "testUser", "--password", "testPass", cmd.text()));
 
@@ -349,7 +301,7 @@ public class CommandHandlerParsingTest {
      */
     @Test
     public void testParseAndValidateWalActions() {
-        ConnectionAndSslParameters args = parseArgs(Arrays.asList(WAL.text(), WAL_PRINT));
+        ConnectionAndSslParameters args = parseArgs(asList(WAL.text(), WAL_PRINT));
 
         assertEquals(WAL.command(), args.command());
 
@@ -359,7 +311,7 @@ public class CommandHandlerParsingTest {
 
         String nodes = UUID.randomUUID().toString() + "," + UUID.randomUUID().toString();
 
-        args = parseArgs(Arrays.asList(WAL.text(), WAL_DELETE, nodes));
+        args = parseArgs(asList(WAL.text(), WAL_DELETE, nodes));
 
         arg = ((WalCommands)args.command()).arg();
 
@@ -367,23 +319,11 @@ public class CommandHandlerParsingTest {
 
         assertEquals(nodes, arg.get2());
 
-        try {
-            parseArgs(Collections.singletonList(WAL.text()));
-
-            fail("expected exception: invalid arguments for --wal command");
-        }
-        catch (IllegalArgumentException e) {
-            e.printStackTrace();
-        }
+        assertParseArgsThrows("Expected arguments for " + WAL.text(), WAL.text());
 
-        try {
-            parseArgs(Arrays.asList(WAL.text(), UUID.randomUUID().toString()));
+        String rnd = UUID.randomUUID().toString();
 
-            fail("expected exception: invalid arguments for --wal command");
-        }
-        catch (IllegalArgumentException e) {
-            e.printStackTrace();
-        }
+        assertParseArgsThrows("Unexpected action "  + rnd + " for " + WAL.text(), WAL.text(), rnd);
     }
 
     /**
@@ -392,26 +332,20 @@ public class CommandHandlerParsingTest {
     @Test
     public void testParseAutoConfirmationFlag() {
         for (CommandList cmd : CommandList.values()) {
-            if (cmd != CommandList.DEACTIVATE
-                && cmd != CommandList.BASELINE
-                && cmd != CommandList.TX)
+            if (cmd.command().confirmationPrompt() == null)
                 continue;
 
             ConnectionAndSslParameters args = parseArgs(asList(cmd.text()));
 
-            assertEquals(cmd.command(), args.command());
-            assertEquals(DFLT_HOST, args.host());
-            assertEquals(DFLT_PORT, args.port());
-            assertFalse(args.autoConfirmation());
+            checkCommonParametersCorrectlyParsed(cmd, args, false);
 
             switch (cmd) {
-                case DEACTIVATE: {
+                case DEACTIVATE:
+                case READ_ONLY_DISABLE:
+                case READ_ONLY_ENABLE: {
                     args = parseArgs(asList(cmd.text(), "--yes"));
 
-                    assertEquals(cmd.command(), args.command());
-                    assertEquals(DFLT_HOST, args.host());
-                    assertEquals(DFLT_PORT, args.port());
-                    assertTrue(args.autoConfirmation());
+                    checkCommonParametersCorrectlyParsed(cmd, args, true);
 
                     break;
                 }
@@ -419,15 +353,12 @@ public class CommandHandlerParsingTest {
                     for (String baselineAct : asList("add", "remove", "set")) {
                         args = parseArgs(asList(cmd.text(), baselineAct, "c_id1,c_id2", "--yes"));
 
-                        assertEquals(cmd.command(), args.command());
-                        assertEquals(DFLT_HOST, args.host());
-                        assertEquals(DFLT_PORT, args.port());
-                        assertTrue(args.autoConfirmation());
+                        checkCommonParametersCorrectlyParsed(cmd, args, true);
 
                         BaselineArguments arg = ((BaselineCommand)args.command()).arg();
 
                         assertEquals(baselineAct, arg.getCmd().text());
-                        assertEquals(new HashSet<>(Arrays.asList("c_id1","c_id2")), new HashSet<>(arg.getConsistentIds()));
+                        assertEquals(new HashSet<>(asList("c_id1","c_id2")), new HashSet<>(arg.getConsistentIds()));
                     }
 
                     break;
@@ -436,10 +367,7 @@ public class CommandHandlerParsingTest {
                 case TX: {
                     args = parseArgs(asList(cmd.text(), "--xid", "xid1", "--min-duration", "10", "--kill", "--yes"));
 
-                    assertEquals(cmd.command(), args.command());
-                    assertEquals(DFLT_HOST, args.host());
-                    assertEquals(DFLT_PORT, args.port());
-                    assertTrue(args.autoConfirmation());
+                    checkCommonParametersCorrectlyParsed(cmd, args, true);
 
                     VisorTxTaskArg txTaskArg = ((TxCommands)args.command()).arg();
 
@@ -451,6 +379,18 @@ public class CommandHandlerParsingTest {
         }
     }
 
+    /** */
+    private void checkCommonParametersCorrectlyParsed(
+        CommandList cmd,
+        ConnectionAndSslParameters args,
+        boolean autoConfirm
+    ) {
+        assertEquals(cmd.command(), args.command());
+        assertEquals(DFLT_HOST, args.host());
+        assertEquals(DFLT_PORT, args.port());
+        assertEquals(autoConfirm, args.autoConfirmation());
+    }
+
     /**
      * Tests host and port arguments.
      * Tests connection settings arguments.
@@ -476,32 +416,9 @@ public class CommandHandlerParsingTest {
             assertEquals(5000, args.pingInterval());
             assertEquals(40000, args.pingTimeout());
 
-            try {
-                parseArgs(asList("--port", "wrong-port", cmd.text()));
-
-                fail("expected exception: Invalid value for port:");
-            }
-            catch (IllegalArgumentException e) {
-                e.printStackTrace();
-            }
-
-            try {
-                parseArgs(asList("--ping-interval", "-10", cmd.text()));
-
-                fail("expected exception: Ping interval must be specified");
-            }
-            catch (IllegalArgumentException e) {
-                e.printStackTrace();
-            }
-
-            try {
-                parseArgs(asList("--ping-timeout", "-20", cmd.text()));
-
-                fail("expected exception: Ping timeout must be specified");
-            }
-            catch (IllegalArgumentException e) {
-                e.printStackTrace();
-            }
+            assertParseArgsThrows("Invalid value for port: wrong-port", "--port", "wrong-port", cmd.text());
+            assertParseArgsThrows("Invalid value for ping interval: -10", "--ping-interval", "-10", cmd.text());
+            assertParseArgsThrows("Invalid value for ping timeout: -20", "--ping-timeout", "-20", cmd.text());
         }
     }
 
@@ -514,61 +431,13 @@ public class CommandHandlerParsingTest {
 
         parseArgs(asList("--tx"));
 
-        try {
-            parseArgs(asList("--tx", "minDuration"));
-
-            fail("Expected exception");
-        }
-        catch (IllegalArgumentException ignored) {
-        }
-
-        try {
-            parseArgs(asList("--tx", "minDuration", "-1"));
-
-            fail("Expected exception");
-        }
-        catch (IllegalArgumentException ignored) {
-        }
-
-        try {
-            parseArgs(asList("--tx", "minSize"));
-
-            fail("Expected exception");
-        }
-        catch (IllegalArgumentException ignored) {
-        }
-
-        try {
-            parseArgs(asList("--tx", "minSize", "-1"));
-
-            fail("Expected exception");
-        }
-        catch (IllegalArgumentException ignored) {
-        }
-
-        try {
-            parseArgs(asList("--tx", "label"));
-
-            fail("Expected exception");
-        }
-        catch (IllegalArgumentException ignored) {
-        }
-
-        try {
-            parseArgs(asList("--tx", "label", "tx123["));
-
-            fail("Expected exception");
-        }
-        catch (IllegalArgumentException ignored) {
-        }
-
-        try {
-            parseArgs(asList("--tx", "servers", "nodes", "1,2,3"));
-
-            fail("Expected exception");
-        }
-        catch (IllegalArgumentException ignored) {
-        }
+        assertParseArgsThrows("Expecting --min-duration", "--tx", "--min-duration");
+        assertParseArgsThrows("Invalid value for --min-duration: -1", "--tx", "--min-duration", "-1");
+        assertParseArgsThrows("Expecting --min-size", "--tx", "--min-size");
+        assertParseArgsThrows("Invalid value for --min-size: -1", "--tx", "--min-size", "-1");
+        assertParseArgsThrows("--label", "--tx", "--label");
+        assertParseArgsThrows("Illegal regex syntax", "--tx", "--label", "tx123[");
+        assertParseArgsThrows("Projection can't be used together with list of consistent ids.", "--tx", "--servers", "--nodes", "1,2,3");
 
         args = parseArgs(asList("--tx", "--min-duration", "120", "--min-size", "10", "--limit", "100", "--order", "SIZE", "--servers"));
 
@@ -596,7 +465,7 @@ public class CommandHandlerParsingTest {
         arg = ((TxCommands)args.command()).arg();
 
         assertNull(arg.getProjection());
-        assertEquals(Arrays.asList("1", "2", "3"), arg.getConsistentIds());
+        assertEquals(asList("1", "2", "3"), arg.getConsistentIds());
     }
 
     /**
@@ -622,4 +491,14 @@ public class CommandHandlerParsingTest {
 
         return result;
     }
+
+    /**
+     * Checks that parse arguments fails with {@link IllegalArgumentException} and {@code failMsg} message.
+     *
+     * @param failMsg Exception message (optional).
+     * @param args Incoming arguments.
+     */
+    private void assertParseArgsThrows(@Nullable String failMsg, String... args) {
+        assertThrows(null, () -> parseArgs(asList(args)), IllegalArgumentException.class, failMsg);
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeAbstractTest.java
index 3134f1c..5e92b33 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeAbstractTest.java
@@ -18,22 +18,10 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
-import java.util.Collections;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.QueryEntity;
-import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-
 /**
  *
  */
@@ -41,27 +29,8 @@ public class ClusterReadOnlyModeAbstractTest extends GridCommonAbstractTest {
     /** */
     private static final int SRVS = 3;
 
-    /** Replicated atomic cache. */
-    private static final String REPL_ATOMIC_CACHE = "repl_atomic_cache";
-
-    /** Replicated transactional cache. */
-    private static final String REPL_TX_CACHE = "repl_tx_cache";
-
-    /** Replicated transactional cache. */
-    private static final String REPL_MVCC_CACHE = "repl_mvcc_cache";
-
-    /** Partitioned atomic cache. */
-    private static final String PART_ATOMIC_CACHE = "part_atomic_cache";
-
-    /** Partitioned transactional cache. */
-    private static final String PART_TX_CACHE = "part_tx_cache";
-
-    /** Partitioned mvcc transactional cache. */
-    private static final String PART_MVCC_CACHE = "part_mvcc_cache";
-
     /** Cache names. */
-    protected static final Collection<String> CACHE_NAMES = F.asList(REPL_ATOMIC_CACHE, REPL_TX_CACHE,
-        PART_ATOMIC_CACHE, PART_TX_CACHE);
+    protected static final Collection<String> CACHE_NAMES = ClusterReadOnlyModeTestUtils.cacheNames();
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
@@ -75,37 +44,20 @@ public class ClusterReadOnlyModeAbstractTest extends GridCommonAbstractTest {
         super.afterTest();
 
         changeClusterReadOnlyMode(false);
+
+        for (String cacheName : CACHE_NAMES)
+            grid(0).cache(cacheName).removeAll();
     }
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        cfg.setCacheConfiguration(
-            cacheConfiguration(REPL_ATOMIC_CACHE, REPLICATED, ATOMIC, null),
-            cacheConfiguration(REPL_TX_CACHE, REPLICATED, TRANSACTIONAL, null),
-            cacheConfiguration(REPL_MVCC_CACHE, REPLICATED, TRANSACTIONAL_SNAPSHOT, "mvcc_repl_grp"),
-            cacheConfiguration(PART_ATOMIC_CACHE, PARTITIONED, ATOMIC, "part_grp"),
-            cacheConfiguration(PART_TX_CACHE, PARTITIONED, TRANSACTIONAL, "part_grp"),
-            cacheConfiguration(PART_MVCC_CACHE, PARTITIONED, TRANSACTIONAL_SNAPSHOT, "mvcc_part_grp")
-        );
+        cfg.setCacheConfiguration(ClusterReadOnlyModeTestUtils.cacheConfigurations());
 
-        return cfg;
-    }
+        cfg.setFailureHandler(new StopNodeOrHaltFailureHandler());
 
-    /**
-     * @param cacheMode Cache mode.
-     * @param atomicityMode Atomicity mode.
-     * @param grpName Cache group name.
-     */
-    private CacheConfiguration<Integer, Integer> cacheConfiguration(String name, CacheMode cacheMode,
-        CacheAtomicityMode atomicityMode, String grpName) {
-        return new CacheConfiguration<Integer, Integer>()
-            .setName(name)
-            .setCacheMode(cacheMode)
-            .setAtomicityMode(atomicityMode)
-            .setGroupName(grpName)
-            .setQueryEntities(Collections.singletonList(new QueryEntity(Integer.class, Integer.class)));
+        return cfg;
     }
 
     /**
@@ -114,10 +66,6 @@ public class ClusterReadOnlyModeAbstractTest extends GridCommonAbstractTest {
      * @param readOnly Read only.
      */
     protected void changeClusterReadOnlyMode(boolean readOnly) {
-        for (int idx = 0; idx < SRVS; idx++) {
-            IgniteEx ignite = grid(idx);
-
-            ignite.context().cache().context().readOnlyMode(readOnly);
-        }
+        grid(0).cluster().readOnly(readOnly);
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTest.java
index b9cd4c7..e826f73 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTest.java
@@ -17,15 +17,23 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.Random;
-import javax.cache.CacheException;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
-import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterReadOnlyModeCheckedException;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
+import static org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTestUtils.assertCachesReadOnlyMode;
+import static org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTestUtils.assertDataStreamerReadOnlyMode;
+import static org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTestUtils.cacheNames;
+
 /**
  * Tests cache get/put/remove and data streaming in read-only cluster mode.
  */
@@ -35,15 +43,15 @@ public class ClusterReadOnlyModeTest extends ClusterReadOnlyModeAbstractTest {
      */
     @Test
     public void testCacheGetPutRemove() {
-        assertCachesReadOnlyMode(false);
+        assertCachesReadOnlyMode(false, CACHE_NAMES);
 
         changeClusterReadOnlyMode(true);
 
-        assertCachesReadOnlyMode(true);
+        assertCachesReadOnlyMode(true, CACHE_NAMES);
 
         changeClusterReadOnlyMode(false);
 
-        assertCachesReadOnlyMode(false);
+        assertCachesReadOnlyMode(false, CACHE_NAMES);
     }
 
     /**
@@ -51,87 +59,153 @@ public class ClusterReadOnlyModeTest extends ClusterReadOnlyModeAbstractTest {
      */
     @Test
     public void testDataStreamerReadOnly() {
-        assertDataStreamerReadOnlyMode(false);
+        assertDataStreamerReadOnlyMode(false, CACHE_NAMES);
 
         changeClusterReadOnlyMode(true);
 
-        assertDataStreamerReadOnlyMode(true);
+        assertDataStreamerReadOnlyMode(true, CACHE_NAMES);
 
         changeClusterReadOnlyMode(false);
 
-        assertDataStreamerReadOnlyMode(false);
+        assertDataStreamerReadOnlyMode(false, CACHE_NAMES);
+    }
+
+    /**
+     * Tests data streamer.
+     */
+    @Test
+    public void testDataStreamerReadOnlyConcurrent() throws Exception {
+        testDataStreamerReadOnlyConcurrent(false, false);
     }
 
     /**
-     * Asserts that all caches in read-only or in read/write mode on all nodes.
+     * Tests data streamer.
+     */
+    @Test
+    public void testDataStreamerReadOnlyConcurrentWithFlush() throws Exception {
+        testDataStreamerReadOnlyConcurrent(true, false);
+    }
+
+    /**
+     * Tests data streamer.
+     */
+    @Test
+    public void testDataStreamerReadOnlyConcurrentAllowOverride() throws Exception {
+        testDataStreamerReadOnlyConcurrent(false, true);
+    }
+
+    /**
+     * Tests data streamer.
+     */
+    @Test
+    public void testDataStreamerReadOnlyConcurrentWithFlushAllowOverride() throws Exception {
+        testDataStreamerReadOnlyConcurrent(true, true);
+    }
+
+    /**
+     * Common logic for different datastreamers' tests.
      *
-     * @param readOnly If {@code true} then cache must be in read only mode, else in read/write mode.
+     * @param manualFlush If {@code True} {@link IgniteDataStreamer#flush()} will be invoked in the each batch load.
+     * @param allowOverride value for {@link IgniteDataStreamer#allowOverwrite(boolean)} method.
+     * @throws Exception If something goes wrong.
      */
-    private void assertCachesReadOnlyMode(boolean readOnly) {
-        Random rnd = new Random();
-
-        for (Ignite ignite : G.allGrids()) {
-            for (String cacheName : CACHE_NAMES) {
-                IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
-
-                for (int i = 0; i < 10; i++) {
-                    cache.get(rnd.nextInt(100)); // All gets must succeed.
-
-                    if (readOnly) {
-                        // All puts must fail.
-                        try {
-                            cache.put(rnd.nextInt(100), rnd.nextInt());
-
-                            fail("Put must fail for cache " + cacheName);
-                        }
-                        catch (Exception e) {
-                            // No-op.
-                        }
-
-                        // All removes must fail.
-                        try {
-                            cache.remove(rnd.nextInt(100));
-
-                            fail("Remove must fail for cache " + cacheName);
-                        }
-                        catch (Exception e) {
-                            // No-op.
-                        }
+    private void testDataStreamerReadOnlyConcurrent(boolean manualFlush, boolean allowOverride) throws Exception {
+        final CountDownLatch firstPackLatch = new CountDownLatch(cacheNames().size());
+        final CountDownLatch finishLatch = new CountDownLatch(cacheNames().size());
+
+        final CountDownLatch readOnlyEnabled = new CountDownLatch(1);
+
+        final Map<String, Exception> eMap = new ConcurrentHashMap<>(cacheNames().size());
+
+        Map<String, IgniteInternalFuture<?>> futs = new HashMap<>(cacheNames().size());
+
+        try {
+            for (String cacheName : cacheNames()) {
+                futs.put(cacheName, GridTestUtils.runAsync(() -> {
+                    try (IgniteDataStreamer<Integer, Integer> streamer = grid(0).dataStreamer(cacheName)) {
+                        streamer.allowOverwrite(allowOverride);
+
+                        doLoad(streamer, 0, 100, manualFlush);
+
+                        firstPackLatch.countDown();
+
+                        readOnlyEnabled.await(5, TimeUnit.SECONDS);
+
+                        doLoad(streamer, 100, 1000000, manualFlush);
+
+                        finishLatch.countDown();
                     }
-                    else {
-                        cache.put(rnd.nextInt(100), rnd.nextInt()); // All puts must succeed.
+                    catch (Exception e) {
+                        log.error("Streamer cache exception is thrown for cache " + cacheName, e);
 
-                        cache.remove(rnd.nextInt(100)); // All removes must succeed.
+                        assertNull(cacheName, eMap.put(cacheName, e));
                     }
-                }
+                    finally {
+                        // Avoid to hanging test in case of unexpected behaviour.
+                        firstPackLatch.countDown();
+                        finishLatch.countDown();
+                    }
+                }));
+            }
+
+            firstPackLatch.await(5, TimeUnit.SECONDS);
+
+            changeClusterReadOnlyMode(true);
+
+            readOnlyEnabled.countDown();
+
+            finishLatch.await(5, TimeUnit.SECONDS);
+
+            assertEquals("exceptions: " + eMap, cacheNames().size(), eMap.size());
+
+            for (String cacheName : cacheNames()) {
+                Exception e = eMap.get(cacheName);
+
+                assertNotNull(cacheName, e);
+                assertTrue(cacheName + " " + e, X.hasCause(e, ClusterReadOnlyModeCheckedException.class));
             }
         }
+        finally {
+            // Avoid to hanging test in case of unexpected behaviour.
+            readOnlyEnabled.countDown();
+
+            awaitThreads(futs);
+        }
     }
 
     /**
-     * @param readOnly If {@code true} then data streamer must fail, else succeed.
+     *
      */
-    private void assertDataStreamerReadOnlyMode(boolean readOnly) {
-        Random rnd = new Random();
-
-        for (Ignite ignite : G.allGrids()) {
-            for (String cacheName : CACHE_NAMES) {
-                boolean failed = false;
+    private void awaitThreads(Map<String, IgniteInternalFuture<?>> futs) {
+        for (String cacheName : futs.keySet()) {
+            IgniteInternalFuture<?> fut = futs.get(cacheName);
 
-                try (IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(cacheName)) {
-                    for (int i = 0; i < 10; i++) {
-                        ((DataStreamerImpl)streamer).maxRemapCount(5);
+            try {
+                fut.get(5, TimeUnit.SECONDS);
+            }
+            catch (Exception e) {
+                log.error("Failed to get future " + cacheName, e);
 
-                        streamer.addData(rnd.nextInt(1000), rnd.nextInt());
-                    }
+                try {
+                    fut.cancel();
                 }
-                catch (CacheException ignored) {
-                    failed = true;
+                catch (IgniteCheckedException ce) {
+                    log.error("Failed to cancel future " + cacheName, e);
                 }
-
-                if (failed != readOnly)
-                    fail("Streaming to " + cacheName + " must " + (readOnly ? "fail" : "succeed"));
             }
         }
     }
+
+    /**
+     *
+     */
+    private void doLoad(IgniteDataStreamer<Integer, Integer> streamer, int from, int count, boolean flush) {
+        assertTrue(count > 0);
+
+        for (int i = from; i < from + count; i++)
+            streamer.addData(i, i);
+
+        if (flush)
+            streamer.flush();
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTestUtils.java
new file mode 100644
index 0000000..56a041a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterReadOnlyModeTestUtils.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Random;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.junit.Assert.fail;
+
+/**
+ * Utility class for testing grid read-only mode.
+ */
+public class ClusterReadOnlyModeTestUtils {
+    /** Replicated atomic cache. */
+    private static final String REPL_ATOMIC_CACHE = "repl_atomic_cache";
+
+    /** Replicated transactional cache. */
+    private static final String REPL_TX_CACHE = "repl_tx_cache";
+
+    /** Replicated transactional cache. */
+    private static final String REPL_MVCC_CACHE = "repl_mvcc_cache";
+
+    /** Partitioned atomic cache. */
+    public static final String PART_ATOMIC_CACHE = "part_atomic_cache";
+
+    /** Partitioned transactional cache. */
+    private static final String PART_TX_CACHE = "part_tx_cache";
+
+    /** Partitioned mvcc transactional cache. */
+    private static final String PART_MVCC_CACHE = "part_mvcc_cache";
+
+    /**
+     * @return Configured cache names.
+     */
+    public static Collection<String> cacheNames() {
+        return F.asList(REPL_ATOMIC_CACHE, REPL_TX_CACHE, REPL_MVCC_CACHE, PART_ATOMIC_CACHE, PART_TX_CACHE, PART_MVCC_CACHE);
+    }
+
+    /**
+     * @return Various types of cache configuration.
+     */
+    public static CacheConfiguration[] cacheConfigurations() {
+        return F.asArray(
+            cacheConfiguration(REPL_ATOMIC_CACHE, REPLICATED, ATOMIC, null),
+            cacheConfiguration(REPL_TX_CACHE, REPLICATED, TRANSACTIONAL, null),
+            cacheConfiguration(REPL_MVCC_CACHE, REPLICATED, TRANSACTIONAL_SNAPSHOT, "mvcc_repl_grp"),
+            cacheConfiguration(PART_ATOMIC_CACHE, PARTITIONED, ATOMIC, "part_grp"),
+            cacheConfiguration(PART_TX_CACHE, PARTITIONED, TRANSACTIONAL, "part_grp"),
+            cacheConfiguration(PART_MVCC_CACHE, PARTITIONED, TRANSACTIONAL_SNAPSHOT, "mvcc_part_grp")
+        );
+    }
+
+    /**
+     * Asserts that all caches in read-only or in read/write mode on all nodes.
+     *
+     * @param readOnly If {@code true} then cache must be in read only mode, else in read/write mode.
+     * @param cacheNames Checked cache names.
+     */
+    public static void assertCachesReadOnlyMode(boolean readOnly, Collection<String> cacheNames) {
+        Random rnd = new Random();
+
+        for (Ignite ignite : G.allGrids()) {
+            for (String cacheName : cacheNames) {
+                IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
+
+                for (int i = 0; i < 10; i++) {
+                    cache.get(rnd.nextInt(100)); // All gets must succeed.
+
+                    if (readOnly) {
+                        // All puts must fail.
+                        try {
+                            cache.put(rnd.nextInt(100), rnd.nextInt());
+
+                            fail("Put must fail for cache " + cacheName);
+                        }
+                        catch (Exception ignored) {
+                            // No-op.
+                        }
+
+                        // All removes must fail.
+                        try {
+                            cache.remove(rnd.nextInt(100));
+
+                            fail("Remove must fail for cache " + cacheName);
+                        }
+                        catch (Exception ignored) {
+                            // No-op.
+                        }
+                    }
+                    else {
+                        int key = rnd.nextInt(100);
+
+                        cache.put(key, rnd.nextInt()); // All puts must succeed.
+
+                        cache.remove(key); // All removes must succeed.
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param readOnly If {@code true} then data streamer must fail, else succeed.
+     * @param cacheNames Checked cache names.
+     */
+    public static void assertDataStreamerReadOnlyMode(boolean readOnly, Collection<String> cacheNames) {
+        Random rnd = new Random();
+
+        for (Ignite ignite : G.allGrids()) {
+            for (String cacheName : cacheNames) {
+                boolean failed = false;
+
+                try (IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(cacheName)) {
+                    for (int i = 0; i < 10; i++) {
+                        ((DataStreamerImpl)streamer).maxRemapCount(5);
+
+                        int key = rnd.nextInt(1000);
+
+                        streamer.addData(key, rnd.nextInt());
+
+                        streamer.removeData(key);
+                    }
+                }
+                catch (CacheException ignored) {
+                    failed = true;
+                }
+
+                if (failed != readOnly)
+                    fail("Streaming to " + cacheName + " must " + (readOnly ? "fail" : "succeed"));
+            }
+        }
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Atomicity mode.
+     * @param grpName Cache group name.
+     */
+    private static CacheConfiguration<Integer, Integer> cacheConfiguration(String name, CacheMode cacheMode,
+        CacheAtomicityMode atomicityMode, String grpName) {
+        return new CacheConfiguration<Integer, Integer>()
+            .setName(name)
+            .setCacheMode(cacheMode)
+            .setAtomicityMode(atomicityMode)
+            .setGroupName(grpName)
+            .setBackups(1)
+            .setQueryEntities(Collections.singletonList(new QueryEntity(Integer.class, Integer.class)));
+    }
+
+    /** */
+    private ClusterReadOnlyModeTestUtils() {
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/ClusterReadOnlyModeNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/ClusterReadOnlyModeNodeJoinTest.java
new file mode 100644
index 0000000..7189710
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/ClusterReadOnlyModeNodeJoinTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cluster;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTestUtils.assertCachesReadOnlyMode;
+import static org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTestUtils.assertDataStreamerReadOnlyMode;
+import static org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTestUtils.cacheConfigurations;
+import static org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTestUtils.cacheNames;
+
+/**
+ * Checks that new joining node accept enabled read-only mode.
+ */
+public class ClusterReadOnlyModeNodeJoinTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName).setCacheConfiguration(cacheConfigurations());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** */
+    @Test
+    public void testJoinNodeToReadOnlyCluster() throws Exception {
+        IgniteEx grid = startGrid(0);
+
+        assertTrue(grid.cluster().active());
+        assertFalse(grid.cluster().readOnly());
+
+        grid.cluster().readOnly(true);
+
+        assertCachesReadOnlyMode(true, cacheNames());
+        assertDataStreamerReadOnlyMode(true, cacheNames());
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        for (int i = 0; i < 2; i++)
+            assertTrue(grid(i).configuration().getIgniteInstanceName(), grid(i).cluster().readOnly());
+
+        assertCachesReadOnlyMode(true, cacheNames());
+        assertDataStreamerReadOnlyMode(true, cacheNames());
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/ClusterReadOnlyModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/ClusterReadOnlyModeSelfTest.java
new file mode 100644
index 0000000..c56a9fa
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/ClusterReadOnlyModeSelfTest.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cluster;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Stream;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.ClusterReadOnlyModeCheckedException;
+import org.apache.ignite.internal.processors.service.GridServiceAssignmentsKey;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTestUtils.assertCachesReadOnlyMode;
+import static org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTestUtils.assertDataStreamerReadOnlyMode;
+import static org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTestUtils.cacheConfigurations;
+import static org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTestUtils.cacheNames;
+
+/**
+ * Checks main functionality of cluster read-only mode. In this mode cluster will be available only for read operations,
+ * all data modification operations in user caches will be rejected with {@link ClusterReadOnlyModeCheckedException}
+ *
+ * 1) Read-only mode could be enabled on active cluster only.
+ * 2) Read-only mode doesn't store on PDS (i.e. after cluster restart enabled read-only mode will be forgotten)
+ * 3) Updates to ignite-sys-cache will be available with enabled read-only mode.
+ * 4) Updates to distributed metastorage will be available with enabled read-only mode.
+ * 5) Read-only mode can't be enabled inside transaction.
+ * 6) Lock can't be get with enabled read-only mode.
+ *
+ */
+public class ClusterReadOnlyModeSelfTest extends GridCommonAbstractTest {
+    /** Server nodes count. */
+    private static final int SERVER_NODES_COUNT = 2;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setConsistentId(igniteInstanceName)
+            .setFailureHandler(new StopNodeFailureHandler())
+            .setClientMode("client".equals(igniteInstanceName))
+            .setCacheConfiguration(cacheConfigurations())
+            .setDataStorageConfiguration(
+                new DataStorageConfiguration()
+                    .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
+            );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    @Test
+    public void testDistributedMetastorageAvailableForUpdatesOnReadOnlyCluster() throws Exception {
+        IgniteEx node = startGrids(SERVER_NODES_COUNT);
+
+        node.cluster().active(true);
+
+        assertFalse(node.cluster().readOnly());
+
+        String key = "1";
+        String val = "val1";
+
+        node.context().distributedMetastorage().write(key, val);
+
+        node.cluster().readOnly(true);
+
+        assertTrue(node.cluster().readOnly());
+
+        assertEquals(val, node.context().distributedMetastorage().read(key));
+        assertEquals(val, grid(1).context().distributedMetastorage().read(key));
+
+        grid(1).context().distributedMetastorage().remove(key);
+
+        assertNull(node.context().distributedMetastorage().read(key));
+        assertNull(grid(1).context().distributedMetastorage().read(key));
+    }
+
+    /** */
+    @Test
+    public void testChangeReadOnlyModeOnInactiveClusterFails() throws Exception {
+        startGrid(0);
+
+        GridTestUtils.assertThrows(
+            log,
+            () -> {
+                grid(0).cluster().readOnly(true);
+
+                return null;
+            },
+            IgniteException.class,
+            "Cluster not active"
+        );
+    }
+
+    /** */
+    @Test
+    public void testLocksNotAvaliableOnReadOnlyCluster() throws Exception {
+        IgniteEx grid = startGrid(SERVER_NODES_COUNT);
+
+        grid.cluster().active(true);
+
+        final int key = 0;
+
+        for (CacheConfiguration cfg : grid.configuration().getCacheConfiguration()) {
+            if (cfg.getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL && !CU.isSystemCache(cfg.getName()))
+                grid.cache(cfg.getName()).put(key, cfg.getName().hashCode());
+        }
+
+        grid.cluster().readOnly(true);
+
+        for (CacheConfiguration cfg : grid.configuration().getCacheConfiguration()) {
+            if (cfg.getAtomicityMode() != CacheAtomicityMode.TRANSACTIONAL || CU.isSystemCache(cfg.getName()))
+                continue;
+
+            GridTestUtils.assertThrows(
+                log,
+                () -> {
+                    grid.cache(cfg.getName()).lock(key).lock();
+
+                    return null;
+                },
+                CacheException.class,
+                "Failed to perform cache operation (cluster is in read-only mode)"
+            );
+
+        }
+    }
+
+    /** */
+    @Test
+    public void testEnableReadOnlyModeInsideTransaction() throws Exception {
+        IgniteEx grid = startGrids(SERVER_NODES_COUNT);
+
+        grid.cluster().active(true);
+
+        CacheConfiguration cfg = Stream.of(grid.configuration().getCacheConfiguration())
+            .filter(c -> c.getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL)
+            .filter(c -> !CU.isSystemCache(c.getName())).findAny().get();
+
+        final int key = 1;
+
+        IgniteCache<Integer, Integer> cache = grid.cache(cfg.getName());
+
+        cache.put(key, 0);
+
+        CountDownLatch startTxLatch = new CountDownLatch(1);
+        CountDownLatch clusterReadOnlyLatch = new CountDownLatch(1);
+
+        Thread t = new Thread(() -> {
+            try {
+                startTxLatch.await();
+
+                grid(1).cluster().readOnly(true);
+
+                assertTrue(grid(1).cluster().readOnly());
+
+                clusterReadOnlyLatch.countDown();
+            }
+            catch (InterruptedException e) {
+                log.error("Thread interrupted", e);
+
+                fail("Thread interrupted");
+            }
+        });
+
+        t.start();
+
+        Transaction tx = grid(0).transactions().txStart();
+
+        try {
+            cache.put(key, 1);
+
+            startTxLatch.countDown();
+
+            tx.commit();
+        }
+        catch (Exception e) {
+            log.error("TX Failed", e);
+
+            tx.rollback();
+        }
+
+        assertEquals(1, (int) cache.get(key));
+
+        t.join();
+    }
+
+    /** */
+    @Test
+    public void testEnableReadOnlyModeInsideTransactionFailed() throws Exception {
+        IgniteEx grid = startGrid(0);
+
+        grid.cluster().active(true);
+
+        GridTestUtils.assertThrows(
+            log,
+            () -> {
+                Transaction tx = grid.transactions().txStart();
+
+                try {
+                    grid.cluster().readOnly(true);
+
+                    return null;
+                }
+                finally {
+                    tx.commit();
+                }
+            },
+            IgniteException.class,
+            "Failed to activate read-only mode (must invoke the method outside of an active transaction)."
+        );
+
+    }
+
+    /** */
+    @Test
+    public void testIgniteUtilityCacheAvailableForUpdatesOnReadOnlyCluster() throws Exception {
+        IgniteEx grid = startGrid(0);
+
+        grid.cluster().active(true);
+        grid.cluster().readOnly(true);
+
+        checkClusterInReadOnlyMode(true, grid);
+
+        grid.utilityCache().put(new GridServiceAssignmentsKey("test"), "test");
+
+        assertEquals("test", grid.utilityCache().get(new GridServiceAssignmentsKey("test")));
+    }
+
+    /** */
+    @Test
+    public void testReadOnlyFromClient() throws Exception {
+        startGrids(1);
+        startGrid("client");
+
+        grid(0).cluster().active(true);
+
+        awaitPartitionMapExchange();
+
+        IgniteEx client = grid("client");
+
+        assertTrue("Should be client", client.configuration().isClientMode());
+
+        checkClusterInReadOnlyMode(false, client);
+
+        client.cluster().readOnly(true);
+
+        checkClusterInReadOnlyMode(true, client);
+
+        client.cluster().readOnly(false);
+
+        checkClusterInReadOnlyMode(false, client);
+    }
+
+    /** */
+    @Test
+    public void testReadOnlyModeForgottenAfterClusterRestart() throws Exception {
+        IgniteEx grid = startGrids(2);
+
+        grid.cluster().active(true);
+
+        awaitPartitionMapExchange();
+
+        grid.cluster().readOnly(true);
+
+        checkClusterInReadOnlyMode(true, grid);
+
+        stopAllGrids();
+
+        grid = startGrids(2);
+
+        awaitPartitionMapExchange();
+
+        assertTrue("Cluster must be activate", grid.cluster().active());
+
+        checkClusterInReadOnlyMode(false, grid);
+    }
+
+    /** */
+    private void checkClusterInReadOnlyMode(boolean readOnly, IgniteEx node) {
+        assertEquals("Unexpected read-only mode", readOnly, node.cluster().readOnly());
+
+        assertCachesReadOnlyMode(readOnly, cacheNames());
+
+        assertDataStreamerReadOnlyMode(readOnly, cacheNames());
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
index 735396c..5fc9f50 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
@@ -185,7 +185,7 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
     @Override public boolean disableWal(String cacheName) throws IgniteException {
         throw new UnsupportedOperationException("Operation is not supported yet.");
     }
-    
+
     /** {@inheritDoc} */
     @Override public boolean isWalEnabled(String cacheName) {
         throw new UnsupportedOperationException("Operation is not supported yet.");
@@ -391,6 +391,16 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
         throw new UnsupportedOperationException("Operation is not supported yet.");
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean readOnly() {
+        throw new UnsupportedOperationException("Operation is not supported yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readOnly(boolean readOnly) throws IgniteException {
+        throw new UnsupportedOperationException("Operation is not supported yet.");
+    }
+
     /**
      *
      */
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index ac01b0f..7e2f734 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -49,6 +49,8 @@ import org.apache.ignite.internal.managers.discovery.IgniteTopologyPrintFormatSe
 import org.apache.ignite.internal.managers.events.GridEventStorageManagerSelfTest;
 import org.apache.ignite.internal.processors.cluster.BaselineAutoAdjustInMemoryTest;
 import org.apache.ignite.internal.processors.cluster.BaselineAutoAdjustTest;
+import org.apache.ignite.internal.processors.cluster.ClusterReadOnlyModeNodeJoinTest;
+import org.apache.ignite.internal.processors.cluster.ClusterReadOnlyModeSelfTest;
 import org.apache.ignite.internal.processors.cluster.GridAddressResolverSelfTest;
 import org.apache.ignite.internal.processors.cluster.GridUpdateNotifierSelfTest;
 import org.apache.ignite.internal.processors.port.GridPortProcessorSelfTest;
@@ -62,6 +64,8 @@ import org.junit.runners.Suite;
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
+    ClusterReadOnlyModeSelfTest.class,
+    ClusterReadOnlyModeNodeJoinTest.class,
     GridGetOrStartSelfTest.class,
     GridSameVmStartupSelfTest.class,
     GridSpiExceptionSelfTest.class,
diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerSslTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerSslTest.java
index ca96409..06c27a7 100644
--- a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerSslTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerSslTest.java
@@ -84,7 +84,6 @@ public class GridCommandHandlerSslTest extends GridCommandHandlerAbstractTest {
         final CommandHandler cmd = new CommandHandler();
 
         List<String> params = new ArrayList<>();
-        params.add("--activate");
         params.add("--keystore");
         params.add(GridTestUtils.keyStorePath("node01"));
         params.add("--keystore-password");
@@ -95,6 +94,8 @@ public class GridCommandHandlerSslTest extends GridCommandHandlerAbstractTest {
             params.add(utilityCipherSuite);
         }
 
+        params.add("--activate");
+
         assertEquals(expRes, execute(params));
 
         if (expRes == EXIT_CODE_OK)
diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 9833950..ef3a88a 100644
--- a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -206,6 +206,29 @@ public class GridCommandHandlerTest extends GridCommandHandlerAbstractTest {
         assertEquals(EXIT_CODE_OK, execute("--activate"));
 
         assertTrue(ignite.cluster().active());
+        assertFalse(ignite.cluster().readOnly());
+    }
+
+    /**
+     * Test enabling/disabling read-only mode works via control.sh
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReadOnlyEnableDisable() throws Exception {
+        Ignite ignite = startGrids(1);
+
+        ignite.cluster().active(true);
+
+        assertFalse(ignite.cluster().readOnly());
+
+        assertEquals(EXIT_CODE_OK, execute("--read-only-on"));
+
+        assertTrue(ignite.cluster().readOnly());
+
+        assertEquals(EXIT_CODE_OK, execute("--read-only-off"));
+
+        assertFalse(ignite.cluster().readOnly());
     }
 
     /**
@@ -237,13 +260,35 @@ public class GridCommandHandlerTest extends GridCommandHandlerAbstractTest {
     public void testState() throws Exception {
         Ignite ignite = startGrids(1);
 
+        injectTestSystemOut();
+
         assertFalse(ignite.cluster().active());
 
         assertEquals(EXIT_CODE_OK, execute("--state"));
 
+        assertTrue(testOut.toString(), testOut.toString().contains("Cluster is inactive"));
+
+        testOut.reset();
+
         ignite.cluster().active(true);
 
+        assertTrue(ignite.cluster().active());
+
+        assertEquals(EXIT_CODE_OK, execute("--state"));
+
+        assertTrue(testOut.toString(), testOut.toString().contains("Cluster is active"));
+
+        testOut.reset();
+
+        ignite.cluster().readOnly(true);
+
+        awaitPartitionMapExchange();
+
+        assertTrue(ignite.cluster().readOnly());
+
         assertEquals(EXIT_CODE_OK, execute("--state"));
+
+        assertTrue(testOut.toString(), testOut.toString().contains("Cluster is active (read-only)"));
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index ac519e8..92656aa 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.cluster.ClusterReadOnlyModeCheckedException;
 import org.apache.ignite.internal.managers.IgniteMBeansManager;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -1172,6 +1173,17 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         catch (IgniteCheckedException e) {
             fail = true;
 
+            ClusterReadOnlyModeCheckedException roEx = X.cause(e, ClusterReadOnlyModeCheckedException.class);
+
+            if (roEx != null) {
+                throw new IgniteSQLException(
+                    "Failed to execute DML statement. Cluster in read-only mode [stmt=" + qryDesc.sql() +
+                    ", params=" + Arrays.deepToString(qryParams.arguments()) + "]",
+                    IgniteQueryErrorCode.CLUSTER_READ_ONLY_MODE_ENABLED,
+                    e
+                );
+            }
+
             throw new IgniteSQLException("Failed to execute DML statement [stmt=" + qryDesc.sql() +
                 ", params=" + Arrays.deepToString(qryParams.arguments()) + "]", e);
         }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java
index 2015965..e9b0986 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java
@@ -36,12 +36,14 @@ import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.binary.BinaryObjectImpl;
+import org.apache.ignite.internal.cluster.ClusterReadOnlyModeCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 
 import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
 
@@ -231,6 +233,17 @@ public class DmlBatchSender {
                 cntPerRow[rowNum] = Statement.EXECUTE_FAILED;
             }
 
+            if (X.hasCause(e, ClusterReadOnlyModeCheckedException.class)) {
+                SQLException sqlEx = new SQLException(
+                    e.getMessage(),
+                    SqlStateCode.CLUSTER_READ_ONLY_MODE_ENABLED,
+                    IgniteQueryErrorCode.CLUSTER_READ_ONLY_MODE_ENABLED,
+                    e
+                );
+
+                return new DmlPageProcessingResult(0, null, sqlEx);
+            }
+
             return new DmlPageProcessingResult(0, null,
                 new SQLException(e.getMessage(), SqlStateCode.INTERNAL_ERROR, IgniteQueryErrorCode.UNKNOWN, e));
         }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ttl/CacheTtlReadOnlyModeSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ttl/CacheTtlReadOnlyModeSelfTest.java
new file mode 100644
index 0000000..03dd238
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ttl/CacheTtlReadOnlyModeSelfTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.ttl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTestUtils.assertCachesReadOnlyMode;
+import static org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTestUtils.assertDataStreamerReadOnlyMode;
+import static org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTestUtils.cacheConfigurations;
+
+/**
+ * Checks that enabled read-only mode doesn't affect data expiration.
+ */
+public class CacheTtlReadOnlyModeSelfTest extends GridCommonAbstractTest {
+    /** Expiration timeout in seconds. */
+    private static final int EXPIRATION_TIMEOUT = 10;
+
+    /** Cache configurations. */
+    private static final CacheConfiguration[] CACHE_CONFIGURATIONS = getCacheConfigurations();
+
+    /** Cache names. */
+    private static final Collection<String> CACHE_NAMES =
+        Stream.of(CACHE_CONFIGURATIONS).map(CacheConfiguration::getName).collect(Collectors.toList());
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName).setCacheConfiguration(CACHE_CONFIGURATIONS);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** */
+    @Test
+    public void testTtlExpirationWorksInReadOnlyMode() throws Exception {
+        Ignite grid = startGrid();
+
+        assertTrue(grid.cluster().active());
+        assertFalse(grid.cluster().readOnly());
+
+        assertCachesReadOnlyMode(grid.cluster().readOnly(), CACHE_NAMES);
+
+        for (String cacheName : CACHE_NAMES) {
+            assertEquals(cacheName, 0, grid.cache(cacheName).size());
+
+            for (int i = 0; i < 10; i++)
+                grid.cache(cacheName).put(i, i);
+
+            assertEquals(cacheName, 10, grid.cache(cacheName).size());
+        }
+
+        grid.cluster().readOnly(true);
+        assertTrue(grid.cluster().readOnly());
+
+        assertCachesReadOnlyMode(grid.cluster().readOnly(), CACHE_NAMES);
+        assertDataStreamerReadOnlyMode(grid.cluster().readOnly(), CACHE_NAMES);
+
+        SECONDS.sleep(EXPIRATION_TIMEOUT + 1);
+
+        for (String cacheName : CACHE_NAMES)
+            assertEquals(cacheName, 0, grid.cache(cacheName).size());
+    }
+
+    /** */
+    private static CacheConfiguration[] getCacheConfigurations() {
+        CacheConfiguration[] cfgs = cacheConfigurations();
+
+        List<CacheConfiguration> newCfgs = new ArrayList<>(cfgs.length);
+
+        for (CacheConfiguration cfg : cfgs) {
+            if (cfg.getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT) {
+                // Expiry policy cannot be used with TRANSACTIONAL_SNAPSHOT.
+                continue;
+            }
+
+            cfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(SECONDS, EXPIRATION_TIMEOUT)));
+            cfg.setEagerTtl(true);
+
+            newCfgs.add(cfg);
+        }
+
+        return newCfgs.toArray(new CacheConfiguration[0]);
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index 7580346..dfcce97 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.IgniteClientReconnectQueriesT
 import org.apache.ignite.internal.processors.cache.persistence.RebuildIndexLogMessageTest;
 import org.apache.ignite.internal.processors.cache.ttl.CacheTtlAtomicLocalSelfTest;
 import org.apache.ignite.internal.processors.cache.ttl.CacheTtlAtomicPartitionedSelfTest;
+import org.apache.ignite.internal.processors.cache.ttl.CacheTtlReadOnlyModeSelfTest;
 import org.apache.ignite.internal.processors.cache.ttl.CacheTtlTransactionalLocalSelfTest;
 import org.apache.ignite.internal.processors.cache.ttl.CacheTtlTransactionalPartitionedSelfTest;
 import org.apache.ignite.internal.processors.client.IgniteDataStreamerTest;
@@ -61,6 +62,7 @@ import org.junit.runners.Suite;
     CacheTtlTransactionalPartitionedSelfTest.class,
     CacheTtlAtomicLocalSelfTest.class,
     CacheTtlAtomicPartitionedSelfTest.class,
+    CacheTtlReadOnlyModeSelfTest.class,
 
     GridCacheOffheapIndexGetSelfTest.class,
     GridCacheOffheapIndexEntryEvictTest.class,
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterMetricsParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterMetricsParityTest.cs
index 21160c1..bbab78c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterMetricsParityTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterMetricsParityTest.cs
@@ -45,4 +45,4 @@ namespace Apache.Ignite.Core.Tests.ApiParity
                 knownMissingMembers: MissingProperties);
         }
     }
-}
\ No newline at end of file
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs
index 568aa46..9714dab 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
  namespace Apache.Ignite.Core.Tests.ApiParity
 {
     using Apache.Ignite.Core.Cluster;
@@ -36,7 +36,8 @@
             "isBaselineAutoAdjustEnabled",
             "baselineAutoAdjustEnabled",
             "baselineAutoAdjustTimeout",
-            "baselineAutoAdjustStatus"
+            "baselineAutoAdjustStatus",
+            "readOnly"
         };
 
         /** Members that are missing on .NET side and should be added in future. */
@@ -58,4 +59,4 @@
                 UnneededMembers, MissingMembers);
         }
     }
-}
\ No newline at end of file
+}