You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/14 00:16:43 UTC
helix git commit: Fix GroupCommit issue for adding back current state
Repository: helix
Updated Branches:
refs/heads/master 4385d01a3 -> 5e4e26cc8
Fix GroupCommit issue for adding back current state
There was an issue that GroupCommit will add DROPPED current state back if there are double sent OFFLINE -> DROPPED message.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5e4e26cc
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5e4e26cc
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5e4e26cc
Branch: refs/heads/master
Commit: 5e4e26cc8bfcc04236e87a721a39070f7be238c8
Parents: 4385d01
Author: Junkai Xue <jx...@linkedin.com>
Authored: Wed Jun 13 18:35:48 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Fri Jul 13 17:16:21 2018 -0700
----------------------------------------------------------------------
.../main/java/org/apache/helix/GroupCommit.java | 4 +-
.../controller/TestTargetExternalView.java | 2 +-
.../messaging/TestGroupCommitAddBackData.java | 138 +++++++++++++++++++
3 files changed, 141 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/5e4e26cc/helix-core/src/main/java/org/apache/helix/GroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/GroupCommit.java b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
index 8165f93..b23fc2e 100644
--- a/helix-core/src/main/java/org/apache/helix/GroupCommit.java
+++ b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
@@ -117,9 +117,9 @@ public class GroupCommit {
*/
if (merged == null) {
merged = new ZNRecord(first._record);
- } else {
- merged.merge(first._record);
}
+ merged.merge(first._record);
+
Iterator<Entry> it = queue._pending.iterator();
while (it.hasNext()) {
Entry ent = it.next();
http://git-wip-us.apache.org/repos/asf/helix/blob/5e4e26cc/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
index b5c828f..650456b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
@@ -81,7 +81,7 @@ public class TestTargetExternalView extends TaskTestBase {
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), false);
Assert.assertTrue(verifier.verifyByPolling());
-
+ Thread.sleep(1000);
targetExternalViews = _accessor.getChildValues(_accessor.keyBuilder().externalViews());
idealStates = _accessor.getChildValues(_accessor.keyBuilder().idealStates());
http://git-wip-us.apache.org/repos/asf/helix/blob/5e4e26cc/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java
new file mode 100644
index 0000000..70d4425
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java
@@ -0,0 +1,138 @@
+package org.apache.helix.integration.messaging;
+
+import java.util.Date;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestGroupCommitAddBackData extends ZkTestBase {
+ private static Logger LOG = LoggerFactory.getLogger(TestGroupCommitAddBackData.class);
+ private static final int START_PORT = 12918;
+ private static final int DEFAULT_TIMEOUT = 30 * 1000;
+
+ private HelixManager _manager;
+ private final String CLASS_NAME = getShortClassName();
+ private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+ private MockParticipantManager _participant;
+
+ private int _replica = 3;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+ // setup storage cluster
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + START_PORT;
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ _participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+ _participant.syncStart();
+
+ // create cluster manager
+ _manager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager.connect();
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ if (_participant != null && _participant.isConnected()) {
+ _participant.syncStop();
+ }
+
+ if (_manager != null && _manager.isConnected()) {
+ _manager.disconnect();
+ }
+
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ try {
+ _gSetupTool.deleteCluster(CLUSTER_NAME);
+ } catch (Exception ex) {
+ System.err.println(
+ "Failed to delete cluster " + CLUSTER_NAME + ", error: " + ex.getLocalizedMessage());
+ }
+ }
+
+ System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testGroupCommitAddCurrentStateBack() throws InterruptedException {
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ Message initMessage = generateMessage("OFFLINE", "ONLINE");
+ accessor.setProperty(
+ accessor.keyBuilder().message(_participant.getInstanceName(), initMessage.getMsgId()),
+ initMessage);
+ Assert.assertTrue(waitForMessageProcessed(accessor, initMessage.getMsgId()));
+ Message toOffline = generateMessage("ONLINE", "OFFLINE");
+ accessor.setProperty(
+ accessor.keyBuilder().message(_participant.getInstanceName(), toOffline.getMsgId()),
+ toOffline);
+ Assert.assertTrue(waitForMessageProcessed(accessor, toOffline.getMsgId()));
+
+ // Consequential 10 messages
+ for (int i = 0; i < 10; i++) {
+ Message dropped = generateMessage("OFFLINE", "DROPPED");
+ accessor.setProperty(
+ accessor.keyBuilder().message(_participant.getInstanceName(), dropped.getMsgId()),
+ dropped);
+ Assert.assertTrue(waitForMessageProcessed(accessor, dropped.getMsgId()));
+ Assert.assertFalse(accessor.getBaseDataAccessor().exists(accessor.keyBuilder()
+ .currentState(_participant.getInstanceName(), _participant.getSessionId(),
+ WorkflowGenerator.DEFAULT_TGT_DB).getPath(), 0));
+ }
+ }
+
+ private Message generateMessage(String from, String to) {
+ String uuid = UUID.randomUUID().toString();
+ Message message = new Message(Message.MessageType.STATE_TRANSITION, uuid);
+ message.setSrcName("ADMIN");
+ message.setTgtName(_participant.getInstanceName());
+ message.setMsgState(Message.MessageState.NEW);
+ message.setPartitionName("P");
+ message.setResourceName(WorkflowGenerator.DEFAULT_TGT_DB);
+ message.setFromState(from);
+ message.setToState(to);
+ message.setTgtSessionId(_participant.getSessionId());
+ message.setSrcSessionId(_manager.getSessionId());
+ message.setStateModelDef("OnlineOffline");
+ message.setStateModelFactoryName("DEFAULT");
+ return message;
+ }
+
+ private boolean waitForMessageProcessed(HelixDataAccessor accessor, String messageId)
+ throws InterruptedException {
+ String path =
+ accessor.keyBuilder().message(_participant.getInstanceName(), messageId).getPath();
+ long startTime = System.currentTimeMillis();
+ while (accessor.getBaseDataAccessor().exists(path, 0)) {
+ if (System.currentTimeMillis() - startTime > DEFAULT_TIMEOUT) {
+ return false;
+ }
+ Thread.sleep(200);
+ }
+ return true;
+ }
+}