You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/18 05:20:43 UTC
helix git commit: Fix GroupCommit issue for adding back current state
[Forced Update!]
Repository: helix
Updated Branches:
refs/heads/helix-0.8.1-hotfix a274340b2 -> 6d402143c (forced update)
Fix GroupCommit issue for adding back current state
There was an issue that GroupCommit will add DROPPED current state back if there are double sent OFFLINE -> DROPPED message.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/6d402143
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/6d402143
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/6d402143
Branch: refs/heads/helix-0.8.1-hotfix
Commit: 6d402143c348c4499e6ed826bfcbff876d875f9f
Parents: 2a770b2
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Jul 17 22:06:59 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Jul 17 22:20:37 2018 -0700
----------------------------------------------------------------------
.../main/java/org/apache/helix/GroupCommit.java | 4 +-
.../messaging/TestGroupCommitAddBackData.java | 138 +++++++++++++++++++
2 files changed, 140 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/6d402143/helix-core/src/main/java/org/apache/helix/GroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/GroupCommit.java b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
index b6c0ff1..e775d2d 100644
--- a/helix-core/src/main/java/org/apache/helix/GroupCommit.java
+++ b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
@@ -117,9 +117,9 @@ public class GroupCommit {
*/
if (merged == null) {
merged = new ZNRecord(first._record);
- } else {
- merged.merge(first._record);
}
+ merged.merge(first._record);
+
Iterator<Entry> it = queue._pending.iterator();
while (it.hasNext()) {
Entry ent = it.next();
http://git-wip-us.apache.org/repos/asf/helix/blob/6d402143/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java
new file mode 100644
index 0000000..70d4425
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java
@@ -0,0 +1,138 @@
+package org.apache.helix.integration.messaging;
+
+import java.util.Date;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestGroupCommitAddBackData extends ZkTestBase {
+ private static Logger LOG = LoggerFactory.getLogger(TestGroupCommitAddBackData.class);
+ private static final int START_PORT = 12918;
+ private static final int DEFAULT_TIMEOUT = 30 * 1000;
+
+ private HelixManager _manager;
+ private final String CLASS_NAME = getShortClassName();
+ private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+ private MockParticipantManager _participant;
+
+ private int _replica = 3;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+ // setup storage cluster
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + START_PORT;
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ _participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+ _participant.syncStart();
+
+ // create cluster manager
+ _manager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager.connect();
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ if (_participant != null && _participant.isConnected()) {
+ _participant.syncStop();
+ }
+
+ if (_manager != null && _manager.isConnected()) {
+ _manager.disconnect();
+ }
+
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ try {
+ _gSetupTool.deleteCluster(CLUSTER_NAME);
+ } catch (Exception ex) {
+ System.err.println(
+ "Failed to delete cluster " + CLUSTER_NAME + ", error: " + ex.getLocalizedMessage());
+ }
+ }
+
+ System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testGroupCommitAddCurrentStateBack() throws InterruptedException {
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ Message initMessage = generateMessage("OFFLINE", "ONLINE");
+ accessor.setProperty(
+ accessor.keyBuilder().message(_participant.getInstanceName(), initMessage.getMsgId()),
+ initMessage);
+ Assert.assertTrue(waitForMessageProcessed(accessor, initMessage.getMsgId()));
+ Message toOffline = generateMessage("ONLINE", "OFFLINE");
+ accessor.setProperty(
+ accessor.keyBuilder().message(_participant.getInstanceName(), toOffline.getMsgId()),
+ toOffline);
+ Assert.assertTrue(waitForMessageProcessed(accessor, toOffline.getMsgId()));
+
+ // Consequential 10 messages
+ for (int i = 0; i < 10; i++) {
+ Message dropped = generateMessage("OFFLINE", "DROPPED");
+ accessor.setProperty(
+ accessor.keyBuilder().message(_participant.getInstanceName(), dropped.getMsgId()),
+ dropped);
+ Assert.assertTrue(waitForMessageProcessed(accessor, dropped.getMsgId()));
+ Assert.assertFalse(accessor.getBaseDataAccessor().exists(accessor.keyBuilder()
+ .currentState(_participant.getInstanceName(), _participant.getSessionId(),
+ WorkflowGenerator.DEFAULT_TGT_DB).getPath(), 0));
+ }
+ }
+
+ private Message generateMessage(String from, String to) {
+ String uuid = UUID.randomUUID().toString();
+ Message message = new Message(Message.MessageType.STATE_TRANSITION, uuid);
+ message.setSrcName("ADMIN");
+ message.setTgtName(_participant.getInstanceName());
+ message.setMsgState(Message.MessageState.NEW);
+ message.setPartitionName("P");
+ message.setResourceName(WorkflowGenerator.DEFAULT_TGT_DB);
+ message.setFromState(from);
+ message.setToState(to);
+ message.setTgtSessionId(_participant.getSessionId());
+ message.setSrcSessionId(_manager.getSessionId());
+ message.setStateModelDef("OnlineOffline");
+ message.setStateModelFactoryName("DEFAULT");
+ return message;
+ }
+
+ private boolean waitForMessageProcessed(HelixDataAccessor accessor, String messageId)
+ throws InterruptedException {
+ String path =
+ accessor.keyBuilder().message(_participant.getInstanceName(), messageId).getPath();
+ long startTime = System.currentTimeMillis();
+ while (accessor.getBaseDataAccessor().exists(path, 0)) {
+ if (System.currentTimeMillis() - startTime > DEFAULT_TIMEOUT) {
+ return false;
+ }
+ Thread.sleep(200);
+ }
+ return true;
+ }
+}