You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/01/29 22:44:36 UTC

[helix] branch helix-0.9.x updated: Fix the ConcurrentModificationException in ClusterEvent.java (#785)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch helix-0.9.x
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/helix-0.9.x by this push:
     new bcad177  Fix the ConcurrentModificationException in ClusterEvent.java (#785)
bcad177 is described below

commit bcad17768a5d84f47ba2d4b47cf77b7c83f452d7
Author: Yi Wang <i3...@gmail.com>
AuthorDate: Tue Feb 25 13:49:57 2020 -0800

    Fix the ConcurrentModificationException in ClusterEvent.java (#785)
---
 .../helix/controller/stages/ClusterEvent.java      |  8 ++--
 .../helix/controller/stages/TestClusterEvent.java  | 44 +++++++++++++++++++++-
 2 files changed, 46 insertions(+), 6 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
index b09c297..ac67335 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
@@ -19,10 +19,10 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import java.util.HashMap;
 import java.util.Map;
-
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +37,7 @@ public class ClusterEvent {
   @Deprecated
   public ClusterEvent(ClusterEventType eventType) {
     _eventType = eventType;
-    _eventAttributeMap = new HashMap<>();
+    _eventAttributeMap = new ConcurrentHashMap<>();
     _creationTime = System.currentTimeMillis();
     _eventId = UUID.randomUUID().toString();
   }
@@ -50,7 +50,7 @@ public class ClusterEvent {
     _clusterName = clusterName;
     _eventType = eventType;
 
-    _eventAttributeMap = new HashMap<>();
+    _eventAttributeMap = new ConcurrentHashMap<>();
     _creationTime = System.currentTimeMillis();
     _eventId = eventId;
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEvent.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEvent.java
index e31c12b..953a78e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEvent.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEvent.java
@@ -19,16 +19,56 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.ConcurrentModificationException;
+import java.util.concurrent.CountDownLatch;
+
+import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
-@Test
+
 public class TestClusterEvent {
+
   @Test
-  public void testSimplePutandGet() {
+  public void testSimplePutAndGet() {
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
     AssertJUnit.assertEquals(event.getEventType(), ClusterEventType.Unknown);
     event.addAttribute("attr1", "value");
     AssertJUnit.assertEquals(event.getAttribute("attr1"), "value");
   }
+
+  @Test
+  public void testThreadSafeClone() throws InterruptedException {
+    String clusterName = "TestCluster";
+    ClusterEvent event = new ClusterEvent(clusterName, ClusterEventType.Unknown, "testId");
+    for (int i = 0; i < 100; i++) {
+      event.addAttribute(String.valueOf(i), i);
+    }
+    final CountDownLatch wait = new CountDownLatch(1);
+    Thread thread = new Thread(() -> {
+      String threadName = Thread.currentThread().getName();
+      try {
+        wait.await();
+      } catch (InterruptedException e) {
+        //ignore the exception
+      }
+      // update the original event's attribute map
+      for (int i = 0; i < 100; i++) {
+        event.addAttribute(threadName + i, threadName);
+      }
+    });
+
+    thread.start();
+
+    try {
+      wait.countDown();
+      ClusterEvent clonedEvent = event.clone("cloneId");
+      Assert.assertEquals(clonedEvent.getClusterName(), clusterName);
+      Assert.assertEquals(clonedEvent.getEventId(), "cloneId");
+    } catch (ConcurrentModificationException e) {
+      Assert.fail("Didn't expect any ConcurrentModificationException to occur", e);
+    } finally {
+      thread.join();
+    }
+  }
 }