You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2020/07/15 12:47:56 UTC
[ignite] branch master updated: IGNITE-13246 New baseline event
types added. - Fixes #8034.
This is an automated email from the ASF dual-hosted git repository.
akuznetsov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new fe77a25 IGNITE-13246 New baseline event types added. - Fixes #8034.
fe77a25 is described below
commit fe77a25f3c7aa4d10a6c6a56550d501995624bc0
Author: ibessonov <be...@gmail.com>
AuthorDate: Wed Jul 15 19:47:12 2020 +0700
IGNITE-13246 New baseline event types added. - Fixes #8034.
Signed-off-by: Alexey Kuznetsov <ak...@apache.org>
---
.../ignite/events/BaselineEventsLocalTest.java | 29 +++
.../ignite/events/BaselineEventsRemoteTest.java | 29 +++
.../apache/ignite/events/BaselineEventsTest.java | 287 +++++++++++++++++++++
.../testsuites/IgniteControlUtilityTestSuite.java | 5 +
.../apache/ignite/events/BaselineChangedEvent.java | 91 +++++++
.../events/BaselineConfigurationChangedEvent.java | 100 +++++++
.../java/org/apache/ignite/events/EventType.java | 44 ++++
.../cluster/DistributedBaselineConfiguration.java | 14 +-
.../cache/GridCachePartitionExchangeManager.java | 41 ++-
.../cluster/DiscoveryDataClusterState.java | 10 +
.../cluster/GridClusterStateProcessor.java | 37 ++-
.../distributed/DistributedProperty.java | 2 +-
.../distributed/SimpleDistributedProperty.java | 4 +-
13 files changed, 683 insertions(+), 10 deletions(-)
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsLocalTest.java b/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsLocalTest.java
new file mode 100644
index 0000000..017240d
--- /dev/null
+++ b/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsLocalTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/** */
+public class BaselineEventsLocalTest extends BaselineEventsTest {
+ /** {@inheritDoc} */
+ @Override protected void listen(IgniteEx ignite, IgnitePredicate<Event> lsnr, int... types) {
+ ignite.events().localListen(lsnr, types);
+ }
+}
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsRemoteTest.java b/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsRemoteTest.java
new file mode 100644
index 0000000..e6ac8a1
--- /dev/null
+++ b/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsRemoteTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/** */
+public class BaselineEventsRemoteTest extends BaselineEventsTest {
+ /** {@inheritDoc} */
+ @Override protected void listen(IgniteEx ignite, IgnitePredicate<Event> lsnr, int... types) {
+ ignite.events(ignite.cluster().forRemotes()).remoteListen((uuid, t) -> lsnr.apply(t), t -> true, types);
+ }
+}
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsTest.java b/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsTest.java
new file mode 100644
index 0000000..28b7b9e
--- /dev/null
+++ b/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.IgniteClusterEx;
+import org.apache.ignite.internal.commandline.CommandHandler;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/** */
+public abstract class BaselineEventsTest extends GridCommonAbstractTest {
+ /** */
+ private int[] includedEvtTypes = EventType.EVTS_ALL;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setConnectorConfiguration(new ConnectorConfiguration())
+ .setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ )
+ .setWalSegments(3)
+ .setWalSegmentSize(512 * 1024)
+ )
+ .setConsistentId(igniteInstanceName)
+ .setIncludeEventTypes(includedEvtTypes);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** */
+ protected abstract void listen(IgniteEx ignite, IgnitePredicate<Event> lsnr, int... types);
+
+ /** */
+ @Test
+ public void testChangeBltWithControlUtility() throws Exception {
+ startGrid(0).cluster().active(true);
+
+ AtomicBoolean baselineChanged = new AtomicBoolean();
+
+ startGrid(1);
+
+ String consistentIds = grid(0).localNode().consistentId() + "," + grid(1).localNode().consistentId();
+
+ listen(
+ grid(1),
+ event -> {
+ baselineChanged.set(true);
+
+ BaselineChangedEvent baselineChangedEvt = (BaselineChangedEvent)event;
+
+ assertEquals(2, baselineChangedEvt.baselineNodes().size());
+
+ return true;
+ },
+ EventType.EVT_BASELINE_CHANGED
+ );
+
+ assertEquals(
+ CommandHandler.EXIT_CODE_OK,
+ new CommandHandler().execute(Arrays.asList("--baseline", "set", consistentIds, "--yes"))
+ );
+
+ assertTrue(GridTestUtils.waitForCondition(baselineChanged::get, 3_000));
+ }
+
+ /** */
+ @Test
+ public void testChangeBltWithPublicApi() throws Exception {
+ startGrid(0).cluster().active(true);
+
+ AtomicBoolean baselineChanged = new AtomicBoolean();
+
+ listen(
+ startGrid(1),
+ event -> {
+ baselineChanged.set(true);
+
+ BaselineChangedEvent baselineChangedEvt = (BaselineChangedEvent)event;
+
+ assertEquals(2, baselineChangedEvt.baselineNodes().size());
+
+ return true;
+ },
+ EventType.EVT_BASELINE_CHANGED
+ );
+
+ grid(0).cluster().setBaselineTopology(grid(0).cluster().topologyVersion());
+
+ assertTrue(GridTestUtils.waitForCondition(baselineChanged::get, 3_000));
+ }
+
+ /** */
+ @Test
+ public void testDeactivateActivate() throws Exception {
+ IgniteEx ignite = startGrids(2);
+
+ AtomicBoolean baselineChanged = new AtomicBoolean();
+
+ listen(
+ ignite,
+ event -> {
+ baselineChanged.set(true);
+
+ return true;
+ },
+ EventType.EVT_BASELINE_CHANGED
+ );
+
+ ignite.cluster().active(true);
+
+ assertTrue(GridTestUtils.waitForCondition(baselineChanged::get, 3_000));
+ baselineChanged.set(false);
+
+ ignite.cluster().active(false);
+ ignite.cluster().active(true);
+
+ assertFalse(GridTestUtils.waitForCondition(baselineChanged::get, 3_000));
+ }
+
+ /** */
+ @Test
+ public void testChangeAutoAdjustEnabled() throws Exception {
+ IgniteClusterEx cluster = startGrids(2).cluster();
+
+ cluster.active(true);
+
+ assertFalse(cluster.isBaselineAutoAdjustEnabled());
+
+ AtomicBoolean autoAdjustEnabled = new AtomicBoolean();
+
+ listen(
+ grid(0),
+ event -> {
+ BaselineConfigurationChangedEvent bltCfgChangedEvt = (BaselineConfigurationChangedEvent)event;
+
+ autoAdjustEnabled.set(bltCfgChangedEvt.isAutoAdjustEnabled());
+
+ return true;
+ },
+ EventType.EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED
+ );
+
+ assertEquals(
+ CommandHandler.EXIT_CODE_OK,
+ new CommandHandler().execute(Arrays.asList("--baseline", "auto_adjust", "enable", "timeout", "10", "--yes"))
+ );
+ assertTrue(GridTestUtils.waitForCondition(autoAdjustEnabled::get, 3_000));
+
+ assertEquals(
+ CommandHandler.EXIT_CODE_OK,
+ new CommandHandler().execute(Arrays.asList("--baseline", "auto_adjust", "disable", "--yes"))
+ );
+ assertFalse(autoAdjustEnabled.get());
+
+ cluster.baselineAutoAdjustEnabled(true);
+ assertTrue(GridTestUtils.waitForCondition(autoAdjustEnabled::get, 3_000));
+
+ cluster.baselineAutoAdjustEnabled(false);
+ assertTrue(GridTestUtils.waitForCondition(() -> !autoAdjustEnabled.get(), 3_000));
+ }
+
+ /** */
+ @Test
+ public void testChangeAutoAdjustTimeout() throws Exception {
+ IgniteClusterEx cluster = startGrids(2).cluster();
+
+ cluster.active(true);
+
+ AtomicLong autoAdjustTimeout = new AtomicLong();
+
+ listen(
+ grid(0),
+ event -> {
+ BaselineConfigurationChangedEvent bltCfgChangedEvt = (BaselineConfigurationChangedEvent)event;
+
+ autoAdjustTimeout.set(bltCfgChangedEvt.autoAdjustTimeout());
+
+ return true;
+ },
+ EventType.EVT_BASELINE_AUTO_ADJUST_AWAITING_TIME_CHANGED
+ );
+
+ assertEquals(
+ CommandHandler.EXIT_CODE_OK,
+ new CommandHandler().execute(Arrays.asList("--baseline", "auto_adjust", "enable", "timeout", "10", "--yes"))
+ );
+ assertTrue(GridTestUtils.waitForCondition(() -> autoAdjustTimeout.get() == 10L, 3_000));
+
+ cluster.baselineAutoAdjustTimeout(50);
+ assertTrue(GridTestUtils.waitForCondition(() -> autoAdjustTimeout.get() == 50L, 3_000));
+ }
+
+ /** */
+ @Test
+ public void testEventsDisabledByDefault() throws Exception {
+ //noinspection ZeroLengthArrayAllocation
+ includedEvtTypes = new int[0];
+
+ IgniteClusterEx cluster = startGrid(0).cluster();
+ cluster.active(true);
+
+ AtomicInteger evtsTriggered = new AtomicInteger();
+
+ listen(
+ grid(0),
+ event -> {
+ evtsTriggered.incrementAndGet();
+
+ return true;
+ },
+ EventType.EVT_BASELINE_CHANGED,
+ EventType.EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED,
+ EventType.EVT_BASELINE_AUTO_ADJUST_AWAITING_TIME_CHANGED
+ );
+
+ startGrid(1);
+
+ String consistentIds = grid(0).localNode().consistentId() + "," + grid(1).localNode().consistentId();
+
+ assertEquals(
+ CommandHandler.EXIT_CODE_OK,
+ new CommandHandler().execute(Arrays.asList("--baseline", "set", consistentIds, "--yes"))
+ );
+
+ awaitPartitionMapExchange();
+
+ startGrid(2);
+
+ cluster.setBaselineTopology(cluster.topologyVersion());
+
+ awaitPartitionMapExchange();
+
+ assertEquals(
+ CommandHandler.EXIT_CODE_OK,
+ new CommandHandler().execute(Arrays.asList("--baseline", "auto_adjust", "enable", "timeout", "10", "--yes"))
+ );
+
+ cluster.baselineAutoAdjustEnabled(false);
+ cluster.baselineAutoAdjustTimeout(50);
+
+ assertFalse(GridTestUtils.waitForCondition(() -> evtsTriggered.get() > 0, 3_000L));
+ }
+}
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
index 7a57771..cc748e5 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
@@ -17,6 +17,8 @@
package org.apache.ignite.testsuites;
+import org.apache.ignite.events.BaselineEventsLocalTest;
+import org.apache.ignite.events.BaselineEventsRemoteTest;
import org.apache.ignite.internal.commandline.CommandHandlerParsingTest;
import org.apache.ignite.internal.processors.security.GridCommandHandlerSslWithSecurityTest;
import org.apache.ignite.util.GridCommandHandlerBrokenIndexTest;
@@ -61,6 +63,9 @@ import org.junit.runners.Suite;
GridCommandHandlerMetadataTest.class,
KillCommandsCommandShTest.class,
+
+ BaselineEventsLocalTest.class,
+ BaselineEventsRemoteTest.class,
})
public class IgniteControlUtilityTestSuite {
}
diff --git a/modules/core/src/main/java/org/apache/ignite/events/BaselineChangedEvent.java b/modules/core/src/main/java/org/apache/ignite/events/BaselineChangedEvent.java
new file mode 100644
index 0000000..bbbeb74
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/events/BaselineChangedEvent.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ * Baseline changed event.
+ * <p>
+ * Grid events are used for notification about what happens within the grid. Note that by
+ * design Ignite keeps all events generated on the local node locally and it provides
+ * APIs for performing a distributed queries across multiple nodes:
+ * <ul>
+ * <li>
+ * {@link IgniteEvents#remoteQuery(IgnitePredicate, long, int...)} -
+ * asynchronously querying events occurred on the nodes specified, including remote nodes.
+ * </li>
+ * <li>
+ * {@link IgniteEvents#localQuery(IgnitePredicate, int...)} -
+ * querying only local events stored on this local node.
+ * </li>
+ * <li>
+ * {@link IgniteEvents#localListen(IgnitePredicate, int...)} -
+ * listening to local grid events (events from remote nodes not included).
+ * </li>
+ * </ul>
+ * User can also wait for events using method {@link IgniteEvents#waitForLocal(IgnitePredicate, int...)}.
+ * <h1 class="header">Events and Performance</h1>
+ * Note that by default all events in Ignite are enabled and therefore generated and stored
+ * by whatever event storage SPI is configured. Ignite can and often does generate thousands events per seconds
+ * under the load and therefore it creates a significant additional load on the system. If these events are
+ * not needed by the application this load is unnecessary and leads to significant performance degradation.
+ * <p>
+ * It is <b>highly recommended</b> to enable only those events that your application logic requires
+ * by using {@link IgniteConfiguration#getIncludeEventTypes()} method in Ignite configuration. Note that certain
+ * events are required for Ignite's internal operations and such events will still be generated but not stored by
+ * event storage SPI if they are disabled in Ignite configuration.
+ * @see EventType#EVT_BASELINE_CHANGED
+ */
+public class BaselineChangedEvent extends EventAdapter {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Baseline nodes. */
+ private final Collection<BaselineNode> baselineNodes;
+
+ /**
+ * Creates baseline changed event with given parameters.
+ * @param node Node.
+ * @param msg Optional event message.
+ * @param type Event type.
+ * @param baselineNodes Collection of new baseline nodes.
+ */
+ public BaselineChangedEvent(
+ ClusterNode node,
+ String msg,
+ int type,
+ Collection<BaselineNode> baselineNodes
+ ) {
+ super(node, msg, type);
+
+ //noinspection AssignmentOrReturnOfFieldWithMutableType
+ this.baselineNodes = baselineNodes;
+ }
+
+ /** New baseline nodes. */
+ public Collection<BaselineNode> baselineNodes() {
+ //noinspection AssignmentOrReturnOfFieldWithMutableType
+ return baselineNodes;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/events/BaselineConfigurationChangedEvent.java b/modules/core/src/main/java/org/apache/ignite/events/BaselineConfigurationChangedEvent.java
new file mode 100644
index 0000000..c623014
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/events/BaselineConfigurationChangedEvent.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+import org.apache.ignite.IgniteCluster;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ * Baseline configuration changed event.
+ * <p>
+ * Grid events are used for notification about what happens within the grid. Note that by
+ * design Ignite keeps all events generated on the local node locally and it provides
+ * APIs for performing a distributed queries across multiple nodes:
+ * <ul>
+ * <li>
+ * {@link IgniteEvents#remoteQuery(IgnitePredicate, long, int...)} -
+ * asynchronously querying events occurred on the nodes specified, including remote nodes.
+ * </li>
+ * <li>
+ * {@link IgniteEvents#localQuery(IgnitePredicate, int...)} -
+ * querying only local events stored on this local node.
+ * </li>
+ * <li>
+ * {@link IgniteEvents#localListen(IgnitePredicate, int...)} -
+ * listening to local grid events (events from remote nodes not included).
+ * </li>
+ * </ul>
+ * User can also wait for events using method {@link IgniteEvents#waitForLocal(IgnitePredicate, int...)}.
+ * <h1 class="header">Events and Performance</h1>
+ * Note that by default all events in Ignite are enabled and therefore generated and stored
+ * by whatever event storage SPI is configured. Ignite can and often does generate thousands events per seconds
+ * under the load and therefore it creates a significant additional load on the system. If these events are
+ * not needed by the application this load is unnecessary and leads to significant performance degradation.
+ * <p>
+ * It is <b>highly recommended</b> to enable only those events that your application logic requires
+ * by using {@link IgniteConfiguration#getIncludeEventTypes()} method in Ignite configuration. Note that certain
+ * events are required for Ignite's internal operations and such events will still be generated but not stored by
+ * event storage SPI if they are disabled in Ignite configuration.
+ * @see EventType#EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED
+ * @see EventType#EVT_BASELINE_AUTO_ADJUST_AWAITING_TIME_CHANGED
+ */
+public class BaselineConfigurationChangedEvent extends EventAdapter {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** @see IgniteCluster#isBaselineAutoAdjustEnabled() */
+ private final boolean autoAdjustEnabled;
+
+ /** @see IgniteCluster#baselineAutoAdjustTimeout() */
+ private final long autoAdjustTimeout;
+
+ /**
+ * Creates baseline configuration changed event with given parameters.
+ * @param node Node.
+ * @param msg Optional event message.
+ * @param type Event type.
+ * @param autoAdjustEnabled Auto-adjust "enabled" flag value.
+ * @param autoAdjustTimeout Auto-adjust timeout value in milliseconds.
+ */
+ public BaselineConfigurationChangedEvent(
+ ClusterNode node,
+ String msg,
+ int type,
+ boolean autoAdjustEnabled,
+ long autoAdjustTimeout
+ ) {
+ super(node, msg, type);
+
+ this.autoAdjustEnabled = autoAdjustEnabled;
+ this.autoAdjustTimeout = autoAdjustTimeout;
+ }
+
+ /** Auto-adjust "enabled" flag value. */
+ public boolean isAutoAdjustEnabled() {
+ return autoAdjustEnabled;
+ }
+
+ /** Auto-adjust timeout value in milliseconds. */
+ public long autoAdjustTimeout() {
+ return autoAdjustTimeout;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index f6ba4da..fca1d02 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -17,6 +17,7 @@
package org.apache.ignite.events;
+import java.util.Collection;
import java.util.List;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCluster;
@@ -981,6 +982,49 @@ public interface EventType {
public static final int EVT_CLUSTER_STATE_CHANGE_STARTED = 145;
/**
+ * Built-in event type: baseline topology has been changed by either user request or auto-adjust timeout event.
+ * Event includes the following information: new baseline nodes.
+ *
+ * <p>
+ * Fired when new tag is successfully set on all nodes.
+ * </p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see IgniteCluster#setBaselineTopology(long)
+ * @see IgniteCluster#setBaselineTopology(Collection)
+ */
+ public static final int EVT_BASELINE_CHANGED = 146;
+
+ /**
+ * Built-in event type: baseline auto-adjust "enabled" flag has been changed by user request.
+ * Event includes the following information: auto-adjust enabled flag, auto-adjust timeout.
+ *
+ * <p>
+ * Fired when new tag is successfully set on all nodes.
+ * </p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see IgniteCluster#baselineAutoAdjustEnabled(boolean)
+ */
+ public static final int EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED = 147;
+
+ /**
+ * Built-in event type: baseline auto-adjust timeout has been changed by user request.
+ * Event includes the following information: auto-adjust "enabled" flag, auto-adjust timeout.
+ *
+ * <p>
+ * Fired when new tag is successfully set on all nodes.
+ * </p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see IgniteCluster#baselineAutoAdjustTimeout(long)
+ */
+ public static final int EVT_BASELINE_AUTO_ADJUST_AWAITING_TIME_CHANGED = 148;
+
+ /**
* All checkpoint events. This array can be directly passed into
* {@link IgniteEvents#localListen(IgnitePredicate, int...)} method to
* subscribe to all checkpoint events.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java
index aa0a978..855efe4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.cluster;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributePropertyListener;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
@@ -76,7 +77,8 @@ public class DistributedBaselineConfiguration {
public DistributedBaselineConfiguration(
GridInternalSubscriptionProcessor isp,
GridKernalContext ctx,
- IgniteLogger log) {
+ IgniteLogger log
+ ) {
this.log = log;
boolean persistenceEnabled = ctx.config() != null && CU.isPersistenceEnabled(ctx.config());
@@ -101,6 +103,16 @@ public class DistributedBaselineConfiguration {
);
}
+ /** */
+ public void listenAutoAdjustEnabled(DistributePropertyListener<? super Boolean> lsnr) {
+ baselineAutoAdjustEnabled.addListener(lsnr);
+ }
+
+ /** */
+ public void listenAutoAdjustTimeout(DistributePropertyListener<? super Long> lsnr) {
+ baselineAutoAdjustTimeout.addListener(lsnr);
+ }
+
/**
* Called when cluster performing activation.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index fc8bef2..d8f95db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -56,13 +56,17 @@ import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterGroupEmptyException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.BaselineChangedEvent;
import org.apache.ignite.events.ClusterActivationEvent;
import org.apache.ignite.events.ClusterStateChangeEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDiagnosticAware;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
@@ -111,6 +115,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.BooleanMetricImpl;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
@@ -581,7 +586,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
- exchFut.listen(f -> onClusterStateChangeFinish(f, exchActions));
+ boolean baselineChanging;
+ if (stateChangeMsg.forceChangeBaselineTopology())
+ baselineChanging = true;
+ else {
+ DiscoveryDataClusterState state = cctx.kernalContext().state().clusterState();
+
+ assert state.transition() : state;
+
+ baselineChanging = exchActions.changedBaseline()
+ // Or it is the first activation.
+ || state.state() != ClusterState.INACTIVE && !state.previouslyActive() && state.previousBaselineTopology() == null;
+ }
+
+ exchFut.listen(f -> onClusterStateChangeFinish(f, exchActions, baselineChanging));
}
}
else if (customMsg instanceof DynamicCacheChangeBatch) {
@@ -664,7 +682,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/** */
- private void onClusterStateChangeFinish(IgniteInternalFuture<AffinityTopologyVersion> fut, ExchangeActions exchActions) {
+ private void onClusterStateChangeFinish(IgniteInternalFuture<AffinityTopologyVersion> fut,
+ ExchangeActions exchActions, boolean baselineChanging) {
A.notNull(exchActions, "exchActions");
GridEventStorageManager evtMngr = cctx.kernalContext().event();
@@ -711,6 +730,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
cctx.kernalContext().getSystemExecutorService()
.submit(() -> evts.forEach(e -> cctx.kernalContext().event().record(e)));
}
+
+ GridKernalContext ctx = cctx.kernalContext();
+ DiscoveryDataClusterState state = ctx.state().clusterState();
+
+ if (baselineChanging) {
+ ctx.getStripedExecutorService().execute(new Runnable() {
+ @Override public void run() {
+ if (ctx.event().isRecordable(EventType.EVT_BASELINE_CHANGED)) {
+ ctx.event().record(new BaselineChangedEvent(
+ ctx.discovery().localNode(),
+ "Baseline changed.",
+ EventType.EVT_BASELINE_CHANGED,
+ ctx.cluster().get().currentBaselineTopology()
+ ));
+ }
+ }
+ });
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
index 8536dd2..73e771e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
@@ -211,6 +211,16 @@ public class DiscoveryDataClusterState implements Serializable {
}
/**
+ * @return Previous "active" flag value during transition.
+ */
+ public boolean previouslyActive() {
+ if (prevState != null)
+ return prevState.state != INACTIVE;
+
+ return state == INACTIVE;
+ }
+
+ /**
* @return State change exchange version.
*/
public AffinityTopologyVersion transitionTopologyVersion() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 57b6790..888ba7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.BaselineConfigurationChangedEvent;
import org.apache.ignite.events.ClusterStateChangeStartedEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
@@ -68,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadO
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import org.apache.ignite.internal.processors.cluster.baseline.autoadjust.BaselineAutoAdjustStatus;
import org.apache.ignite.internal.processors.cluster.baseline.autoadjust.ChangeTopologyWatcher;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributePropertyListener;
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -95,6 +97,8 @@ import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
import static org.apache.ignite.cluster.ClusterState.INACTIVE;
import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_STATE_ON_START;
+import static org.apache.ignite.events.EventType.EVT_BASELINE_AUTO_ADJUST_AWAITING_TIME_CHANGED;
+import static org.apache.ignite.events.EventType.EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -110,9 +114,6 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersi
*
*/
public class GridClusterStateProcessor extends GridProcessorAdapter implements IGridClusterStateProcessor, MetastorageLifecycleListener {
- /** Stripe id for cluster activation event. */
- public static final int CLUSTER_ACTIVATION_EVT_STRIPE_ID = Integer.MAX_VALUE;
-
/** */
private static final String METASTORE_CURR_BLT_KEY = "metastoreBltKey";
@@ -202,8 +203,37 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
ctx,
ctx.log(DistributedBaselineConfiguration.class)
);
+
+ distributedBaselineConfiguration.listenAutoAdjustEnabled(makeEventListener(
+ EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED
+ ));
+
+ distributedBaselineConfiguration.listenAutoAdjustTimeout(makeEventListener(
+ EVT_BASELINE_AUTO_ADJUST_AWAITING_TIME_CHANGED
+ ));
+ }
+
+ /** */
+ private DistributePropertyListener<Object> makeEventListener(int evtType) {
+ //noinspection CodeBlock2Expr
+ return (name, oldVal, newVal) -> {
+ ctx.getStripedExecutorService().execute(() -> {
+ if (ctx.event().isRecordable(evtType)) {
+ ctx.event().record(new BaselineConfigurationChangedEvent(
+ ctx.discovery().localNode(),
+ evtType == EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED
+ ? "Baseline auto-adjust \"enabled\" flag has been changed"
+ : "Baseline auto-adjust timeout has been changed",
+ evtType,
+ distributedBaselineConfiguration.isBaselineAutoAdjustEnabled(),
+ distributedBaselineConfiguration.getBaselineAutoAdjustTimeout()
+ ));
+ }
+ });
+ };
}
+
/** {@inheritDoc} */
@Override public @Nullable IgniteNodeValidationResult validateNode(ClusterNode node) {
if (!isBaselineAutoAdjustEnabled() || baselineAutoAdjustTimeout() != 0)
@@ -700,7 +730,6 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
if (newState.state() != state.state()) {
if (ctx.event().isRecordable(EventType.EVT_CLUSTER_STATE_CHANGE_STARTED)) {
ctx.getStripedExecutorService().execute(
- CLUSTER_ACTIVATION_EVT_STRIPE_ID,
() -> ctx.event().record(new ClusterStateChangeStartedEvent(
state.state(),
newState.state(),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedProperty.java
index f15da49..b47062c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedProperty.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedProperty.java
@@ -85,5 +85,5 @@ public interface DistributedProperty<T extends Serializable> {
/**
* @param listener Update listener.
*/
- void addListener(DistributePropertyListener<T> listener);
+ void addListener(DistributePropertyListener<? super T> listener);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/SimpleDistributedProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/SimpleDistributedProperty.java
index c549968..9819197 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/SimpleDistributedProperty.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/SimpleDistributedProperty.java
@@ -39,7 +39,7 @@ public class SimpleDistributedProperty<T extends Serializable> implements Distri
private volatile boolean attached = false;
/** Listeners of property update. */
- private final ConcurrentLinkedQueue<DistributePropertyListener<T>> updateListeners = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue<DistributePropertyListener<? super T>> updateListeners = new ConcurrentLinkedQueue<>();
/**
* Specific consumer for update value in cluster. It is null when property doesn't ready to update value on cluster
@@ -108,7 +108,7 @@ public class SimpleDistributedProperty<T extends Serializable> implements Distri
}
/** {@inheritDoc} */
- @Override public void addListener(DistributePropertyListener<T> listener) {
+ @Override public void addListener(DistributePropertyListener<? super T> listener) {
updateListeners.add(listener);
}