You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/14 00:16:43 UTC

helix git commit: Fix GroupCommit issue for adding back current state

Repository: helix
Updated Branches:
  refs/heads/master 4385d01a3 -> 5e4e26cc8


Fix GroupCommit issue for adding back current state

There was an issue that GroupCommit will add DROPPED current state back if there are double sent OFFLINE -> DROPPED message.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5e4e26cc
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5e4e26cc
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5e4e26cc

Branch: refs/heads/master
Commit: 5e4e26cc8bfcc04236e87a721a39070f7be238c8
Parents: 4385d01
Author: Junkai Xue <jx...@linkedin.com>
Authored: Wed Jun 13 18:35:48 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Fri Jul 13 17:16:21 2018 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/GroupCommit.java |   4 +-
 .../controller/TestTargetExternalView.java      |   2 +-
 .../messaging/TestGroupCommitAddBackData.java   | 138 +++++++++++++++++++
 3 files changed, 141 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/5e4e26cc/helix-core/src/main/java/org/apache/helix/GroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/GroupCommit.java b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
index 8165f93..b23fc2e 100644
--- a/helix-core/src/main/java/org/apache/helix/GroupCommit.java
+++ b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
@@ -117,9 +117,9 @@ public class GroupCommit {
            */
           if (merged == null) {
             merged = new ZNRecord(first._record);
-          } else {
-            merged.merge(first._record);
           }
+          merged.merge(first._record);
+
           Iterator<Entry> it = queue._pending.iterator();
           while (it.hasNext()) {
             Entry ent = it.next();

http://git-wip-us.apache.org/repos/asf/helix/blob/5e4e26cc/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
index b5c828f..650456b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
@@ -81,7 +81,7 @@ public class TestTargetExternalView extends TaskTestBase {
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), false);
 
     Assert.assertTrue(verifier.verifyByPolling());
-
+    Thread.sleep(1000);
     targetExternalViews = _accessor.getChildValues(_accessor.keyBuilder().externalViews());
     idealStates = _accessor.getChildValues(_accessor.keyBuilder().idealStates());
 

http://git-wip-us.apache.org/repos/asf/helix/blob/5e4e26cc/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java
new file mode 100644
index 0000000..70d4425
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java
@@ -0,0 +1,138 @@
+package org.apache.helix.integration.messaging;
+
+import java.util.Date;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestGroupCommitAddBackData extends ZkTestBase {
+  private static Logger LOG = LoggerFactory.getLogger(TestGroupCommitAddBackData.class);
+  private static final int START_PORT = 12918;
+  private static final int DEFAULT_TIMEOUT = 30 * 1000;
+
+  private HelixManager _manager;
+  private final String CLASS_NAME = getShortClassName();
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  private MockParticipantManager _participant;
+
+  private int _replica = 3;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    // setup storage cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+    String storageNodeName = PARTICIPANT_PREFIX + "_" + START_PORT;
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    _participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+    _participant.syncStart();
+
+    // create cluster manager
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    if (_participant != null && _participant.isConnected()) {
+      _participant.syncStop();
+    }
+
+    if (_manager != null && _manager.isConnected()) {
+      _manager.disconnect();
+    }
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      try {
+        _gSetupTool.deleteCluster(CLUSTER_NAME);
+      } catch (Exception ex) {
+        System.err.println(
+            "Failed to delete cluster " + CLUSTER_NAME + ", error: " + ex.getLocalizedMessage());
+      }
+    }
+
+    System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testGroupCommitAddCurrentStateBack() throws InterruptedException {
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    Message initMessage = generateMessage("OFFLINE", "ONLINE");
+    accessor.setProperty(
+        accessor.keyBuilder().message(_participant.getInstanceName(), initMessage.getMsgId()),
+        initMessage);
+    Assert.assertTrue(waitForMessageProcessed(accessor, initMessage.getMsgId()));
+    Message toOffline = generateMessage("ONLINE", "OFFLINE");
+    accessor.setProperty(
+        accessor.keyBuilder().message(_participant.getInstanceName(), toOffline.getMsgId()),
+        toOffline);
+    Assert.assertTrue(waitForMessageProcessed(accessor, toOffline.getMsgId()));
+
+    // Consequential 10 messages
+    for (int i = 0; i < 10; i++) {
+      Message dropped = generateMessage("OFFLINE", "DROPPED");
+      accessor.setProperty(
+          accessor.keyBuilder().message(_participant.getInstanceName(), dropped.getMsgId()),
+          dropped);
+      Assert.assertTrue(waitForMessageProcessed(accessor, dropped.getMsgId()));
+      Assert.assertFalse(accessor.getBaseDataAccessor().exists(accessor.keyBuilder()
+          .currentState(_participant.getInstanceName(), _participant.getSessionId(),
+              WorkflowGenerator.DEFAULT_TGT_DB).getPath(), 0));
+    }
+  }
+
+  private Message generateMessage(String from, String to) {
+    String uuid = UUID.randomUUID().toString();
+    Message message = new Message(Message.MessageType.STATE_TRANSITION, uuid);
+    message.setSrcName("ADMIN");
+    message.setTgtName(_participant.getInstanceName());
+    message.setMsgState(Message.MessageState.NEW);
+    message.setPartitionName("P");
+    message.setResourceName(WorkflowGenerator.DEFAULT_TGT_DB);
+    message.setFromState(from);
+    message.setToState(to);
+    message.setTgtSessionId(_participant.getSessionId());
+    message.setSrcSessionId(_manager.getSessionId());
+    message.setStateModelDef("OnlineOffline");
+    message.setStateModelFactoryName("DEFAULT");
+    return message;
+  }
+
+  private boolean waitForMessageProcessed(HelixDataAccessor accessor, String messageId)
+      throws InterruptedException {
+    String path =
+        accessor.keyBuilder().message(_participant.getInstanceName(), messageId).getPath();
+    long startTime = System.currentTimeMillis();
+    while (accessor.getBaseDataAccessor().exists(path, 0)) {
+      if (System.currentTimeMillis() - startTime > DEFAULT_TIMEOUT) {
+        return false;
+      }
+      Thread.sleep(200);
+    }
+    return true;
+  }
+}