You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/11/01 23:02:15 UTC

[1/8] helix git commit: Allow to stop periodical rebalancer by resetting the clusterconfig. Add additional log for debug.

Repository: helix
Updated Branches:
  refs/heads/master a09a18ac5 -> 44d7d2eb8


Allow to stop periodical rebalancer by resetting the clusterconfig. Add additional log for debug.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3721c1fb
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3721c1fb
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3721c1fb

Branch: refs/heads/master
Commit: 3721c1fbcfbb3ea40be7d485f341b013753210c6
Parents: a09a18a
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Oct 3 19:07:30 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Nov 1 14:38:21 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/HelixProperty.java    |  8 ++++++-
 .../helix/common/caches/AbstractDataCache.java  |  7 ++++++
 .../controller/GenericHelixController.java      | 23 +++++++++++---------
 3 files changed, 27 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/3721c1fb/helix-core/src/main/java/org/apache/helix/HelixProperty.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixProperty.java b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
index 2f3e68d..02e0594 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
@@ -124,6 +124,12 @@ public class HelixProperty {
       result = 31 * result + (int) (_modifiedTime ^ (_modifiedTime >>> 32));
       return result;
     }
+
+    @Override
+    public String toString() {
+      return "Stat {" + "_version=" + _version + ", _creationTime=" + _creationTime
+          + ", _modifiedTime=" + _modifiedTime + '}';
+    }
   }
 
   private Stat _stat;
@@ -180,7 +186,7 @@ public class HelixProperty {
 
   @Override
   public String toString() {
-    return _record.toString();
+    return "ZnRecord=" + _record.toString() + ", Stat=" + _stat.toString() ;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/3721c1fb/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
index 4bee84d..a4808b3 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/AbstractDataCache.java
@@ -77,6 +77,8 @@ public abstract class AbstractDataCache {
       }
     }
 
+
+
     List<T> reloadedProperty = accessor.getProperty(reloadKeys, true);
     Iterator<PropertyKey> csKeyIter = reloadKeys.iterator();
     for (T property : reloadedProperty) {
@@ -88,6 +90,11 @@ public abstract class AbstractDataCache {
       }
     }
 
+    LOG.info(reloadKeys.size() + " properties refreshed from zk.");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("refreshed keys: " + reloadKeys);
+    }
+
     return refreshedPropertyMap;
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/3721c1fb/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index eb75286..bd049f8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -223,14 +223,13 @@ public class GenericHelixController implements IdealStateChangeListener,
             eventType, _clusterName));
   }
 
-  // TODO who should stop this timer
   /**
    * Starts the rebalancing timer with the specified period. Start the timer if necessary; If the
    * period is smaller than the current period, cancel the current timer and use the new period.
    */
-  void startRebalancingTimer(long period, HelixManager manager) {
+  void startPeriodRebalance(long period, HelixManager manager) {
     if (period != _timerPeriod) {
-      logger.info("Controller starting timer at period " + period);
+      logger.info("Controller starting periodical rebalance timer at period " + period);
       if (_periodicalRebalanceTimer != null) {
         _periodicalRebalanceTimer.cancel();
       }
@@ -240,19 +239,21 @@ public class GenericHelixController implements IdealStateChangeListener,
           .scheduleAtFixedRate(new RebalanceTask(manager, ClusterEventType.PeriodicalRebalance),
               _timerPeriod, _timerPeriod);
     } else {
-      logger.info("Controller already has timer at period " + _timerPeriod);
+      logger.info("Controller already has periodical rebalance timer at period " + _timerPeriod);
     }
   }
 
   /**
-   * Stops the rebalancing timer
+   * Stops the rebalancing timer.
    */
-  void stopRebalancingTimers() {
+  void stopPeriodRebalance() {
+    logger.info("Controller stopping periodical rebalance timer at period " + _timerPeriod);
     if (_periodicalRebalanceTimer != null) {
       _periodicalRebalanceTimer.cancel();
       _periodicalRebalanceTimer = null;
+      _timerPeriod = Long.MAX_VALUE;
+      logger.info("Controller stopped periodical rebalance timer at period " + _timerPeriod);
     }
-    _timerPeriod = Integer.MAX_VALUE;
   }
 
   private static PipelineRegistry createDefaultRegistry(String pipelineName) {
@@ -457,7 +458,7 @@ public class GenericHelixController implements IdealStateChangeListener,
 
     if (context != null) {
       if (context.getType() == Type.FINALIZE) {
-        stopRebalancingTimers();
+        stopPeriodRebalance();
         logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getEventType());
         return;
       } else {
@@ -685,7 +686,9 @@ public class GenericHelixController implements IdealStateChangeListener,
     }
 
     if (minPeriod != Long.MAX_VALUE) {
-      startRebalancingTimer(minPeriod, manager);
+      startPeriodRebalance(minPeriod, manager);
+    } else {
+      stopPeriodRebalance();
     }
   }
 
@@ -885,7 +888,7 @@ public class GenericHelixController implements IdealStateChangeListener,
   }
 
   public void shutdown() throws InterruptedException {
-    stopRebalancingTimers();
+    stopPeriodRebalance();
 
     terminateEventThread(_eventThread);
     terminateEventThread(_taskEventThread);


[7/8] helix git commit: Improve helix message timeout task

Posted by jx...@apache.org.
Improve helix message timeout task

>From logs and code, it could be a very rare race condition that the message actually has been processed and completed but message has not been removed. Once it is completed, it should cancel the timeout task running with separated thread. But just before it tried to cancel the task, the message has been timed out and message handling thread has been interrupted by time out task thread, which shown in the log.

So the message handling thread did not catch the interrupted exception at that moment and failed to remove message from ZK with READ state. After I manually removed the message, we got an error log that showing the partition is already LEADER now. That proves the assumption that the message has been successfully process.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1507f016
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1507f016
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1507f016

Branch: refs/heads/master
Commit: 1507f0161df24f4bec3ba3632b2d23a7a9bed5d4
Parents: c783ae7
Author: Junkai Xue <jx...@linkedin.com>
Authored: Wed Oct 10 16:59:28 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Nov 1 14:38:58 2018 -0700

----------------------------------------------------------------------
 .../helix/messaging/handling/HelixTask.java     | 28 +++++++++++++-------
 1 file changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1507f016/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 2f3d805..fb55e76 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -22,7 +22,6 @@ package org.apache.helix.messaging.handling;
 import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -96,6 +95,9 @@ public class HelixTask implements MessageTask {
       handlerStart = System.currentTimeMillis();
       taskResult = _handler.handleMessage();
       handlerEnd = System.currentTimeMillis();
+
+      // cancel timeout task
+      _executor.cancelTimeoutTask(this);
     } catch (InterruptedException e) {
       taskResult = new HelixTaskResult();
       taskResult.setException(e);
@@ -116,9 +118,6 @@ public class HelixTask implements MessageTask {
       _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, _manager);
     }
 
-    // cancel timeout task
-    _executor.cancelTimeoutTask(this);
-
     Exception exception = null;
     try {
       if (taskResult.isSuccess()) {
@@ -182,13 +181,9 @@ public class HelixTask implements MessageTask {
         }
       }
 
-      if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
-        removeMessageFromZk(accessor, _message);
-        reportMessageStat(_manager, _message, taskResult);
-        sendReply(getSrcClusterDataAccessor(_message), _message, taskResult);
-        _executor.finishTask(this);
-      }
+      finalCleanup(taskResult);
     } catch (Exception e) {
+      finalCleanup(taskResult);
       exception = e;
       type = ErrorType.FRAMEWORK;
       code = ErrorCode.ERROR;
@@ -377,4 +372,17 @@ public class HelixTask implements MessageTask {
     }
     _isStarted = true;
   }
+
+  private void finalCleanup(HelixTaskResult taskResult) {
+    try {
+      if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
+        removeMessageFromZk(_manager.getHelixDataAccessor(), _message);
+        reportMessageStat(_manager, _message, taskResult);
+        sendReply(getSrcClusterDataAccessor(_message), _message, taskResult);
+        _executor.finishTask(this);
+      }
+    } catch (Exception e) {
+      logger.error(String.format("Error to final clean up for message : %s", _message.getId()));
+    }
+  }
 }


[3/8] helix git commit: Change all Helix default created ZkClients to use ZnRecordSerializer.

Posted by jx...@apache.org.
Change all Helix default created ZkClients to use ZnRecordSerializer.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b549cda9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b549cda9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b549cda9

Branch: refs/heads/master
Commit: b549cda95cb114da78efc4b0458058862bcc6d02
Parents: f9bc9f8
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Sep 19 16:34:46 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Nov 1 14:38:31 2018 -0700

----------------------------------------------------------------------
 .../apache/helix/manager/zk/ZKHelixManager.java |   2 +-
 .../helix/manager/zk/ZNRecordSerializer.java    |  17 +--
 .../manager/zk/TestZNRecordSerializer.java      | 151 +++++++++++++++++++
 .../zk/TestZNRecordStreamingSerializer.java     |  19 +++
 4 files changed, 177 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b549cda9/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index c673f51..c4275df 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -594,7 +594,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
   void createClient() throws Exception {
     PathBasedZkSerializer zkSerializer =
-        ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
+        ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
 
     HelixZkClient.ZkConnectionConfig connectionConfig = new HelixZkClient.ZkConnectionConfig(_zkAddress);
     connectionConfig.setSessionTimeout(_sessionTimeout);

http://git-wip-us.apache.org/repos/asf/helix/blob/b549cda9/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
index 95ebc06..0c92224 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
@@ -21,12 +21,8 @@ package org.apache.helix.manager.zk;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.StringWriter;
 import java.util.List;
 import java.util.Map;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
 
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.HelixException;
@@ -40,6 +36,7 @@ import org.codehaus.jackson.map.SerializationConfig;
 
 public class ZNRecordSerializer implements ZkSerializer {
   private static Logger logger = LoggerFactory.getLogger(ZNRecordSerializer.class);
+  private final ObjectMapper _mapper = new ObjectMapper();
 
   private static int getListFieldBound(ZNRecord record) {
     int max = Integer.MAX_VALUE;
@@ -78,15 +75,14 @@ public class ZNRecordSerializer implements ZkSerializer {
     }
 
     // do serialization
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    SerializationConfig serializationConfig = _mapper.getSerializationConfig();
     serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
     serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
     serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    byte[] serializedBytes = null;
+    byte[] serializedBytes;
     try {
-      mapper.writeValue(baos, data);
+      _mapper.writeValue(baos, data);
       serializedBytes = baos.toByteArray();
       // apply compression if needed
       if (record.getBooleanField("enableCompression", false) || serializedBytes.length > ZNRecord.SIZE_LIMIT) {
@@ -113,10 +109,9 @@ public class ZNRecordSerializer implements ZkSerializer {
       return null;
     }
 
-    ObjectMapper mapper = new ObjectMapper();
     ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
 
-    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+    DeserializationConfig deserializationConfig = _mapper.getDeserializationConfig();
     deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
     deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
     deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
@@ -126,7 +121,7 @@ public class ZNRecordSerializer implements ZkSerializer {
         byte[] uncompressedBytes = GZipCompressionUtil.uncompress(bais);
         bais = new ByteArrayInputStream(uncompressedBytes);
       }
-      ZNRecord zn = mapper.readValue(bais, ZNRecord.class);
+      ZNRecord zn = _mapper.readValue(bais, ZNRecord.class);
 
       return zn;
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/helix/blob/b549cda9/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java
index 05df1cd..e46eb4d 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java
@@ -2,9 +2,19 @@ package org.apache.helix.manager.zk;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -46,6 +56,147 @@ public class TestZNRecordSerializer {
     Assert.assertEquals(result, record);
   }
 
+
+  @Test
+  public void testNullFields() {
+    ZNRecord record = new ZNRecord("testId");
+    record.setMapField("K1", null);
+    record.setListField("k2", null);
+    record.setSimpleField("k3", null);
+    ZNRecordSerializer serializer = new ZNRecordSerializer();
+    byte [] data = serializer.serialize(record);
+    ZNRecord result = (ZNRecord) serializer.deserialize(data);
+
+    Assert.assertEquals(result, record);
+    Assert.assertNull(result.getMapField("K1"));
+    Assert.assertNull(result.getListField("K2"));
+    Assert.assertNull(result.getSimpleField("K3"));
+    Assert.assertNull(result.getListField("K4"));
+  }
+
+
+  @Test (enabled = false)
+  public void testPerformance() {
+    ZNRecord record = createZnRecord();
+
+    ZNRecordSerializer serializer1 = new ZNRecordSerializer();
+    ZNRecordStreamingSerializer serializer2 = new ZNRecordStreamingSerializer();
+
+    int loop = 100000;
+
+    long start = System.currentTimeMillis();
+    for (int i = 0; i < loop; i++) {
+      serializer1.serialize(record);
+    }
+    System.out.println("ZNRecordSerializer serialize took " + (System.currentTimeMillis() - start) + " ms");
+
+    byte[] data = serializer1.serialize(record);
+    start = System.currentTimeMillis();
+    for (int i = 0; i < loop; i++) {
+      serializer1.deserialize(data);
+    }
+    System.out.println("ZNRecordSerializer deserialize took " + (System.currentTimeMillis() - start) + " ms");
+
+
+    start = System.currentTimeMillis();
+    for (int i = 0; i < loop; i++) {
+      data = serializer2.serialize(record);
+    }
+    System.out.println("ZNRecordStreamingSerializer serialize took " + (System.currentTimeMillis() - start) + " ms");
+
+    start = System.currentTimeMillis();
+    for (int i = 0; i < loop; i++) {
+      ZNRecord result = (ZNRecord) serializer2.deserialize(data);
+    }
+    System.out.println("ZNRecordStreamingSerializer deserialize took " + (System.currentTimeMillis() - start) + " ms");
+  }
+
+
+  ZNRecord createZnRecord() {
+    ZNRecord record = new ZNRecord("testId");
+    for (int i = 0; i < 400; i++) {
+      Map<String, String> map = new HashMap<>();
+      map.put("localhost_" + i, "Master");
+      map.put("localhost_" + (i+1), "Slave");
+      map.put("localhost_" + (i+2), "Slave");
+
+      record.setMapField("partition_" + i, map);
+      record.setListField("partition_" + i, Lists.<String>newArrayList(map.keySet()));
+      record.setSimpleField("partition_" + i,  UUID.randomUUID().toString());
+    }
+
+    return record;
+  }
+
+
+  @Test (enabled = false)
+  public void testParallelPerformance() throws ExecutionException, InterruptedException {
+    final ZNRecord record = createZnRecord();
+
+    final ZNRecordSerializer serializer1 = new ZNRecordSerializer();
+    final ZNRecordStreamingSerializer serializer2 = new ZNRecordStreamingSerializer();
+
+    int loop = 100000;
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10000);
+
+    long start = System.currentTimeMillis();
+    batchSerialize(serializer1, executorService, loop, record);
+    System.out.println("ZNRecordSerializer serialize took " + (System.currentTimeMillis() - start) + " ms");
+
+    byte[] data = serializer1.serialize(record);
+    start = System.currentTimeMillis();
+    batchSerialize(serializer2, executorService, loop, record);
+    System.out.println("ZNRecordSerializer deserialize took " + (System.currentTimeMillis() - start) + " ms");
+
+
+    start = System.currentTimeMillis();
+    for (int i = 0; i < loop; i++) {
+      data = serializer2.serialize(record);
+    }
+    System.out.println("ZNRecordStreamingSerializer serialize took " + (System.currentTimeMillis() - start) + " ms");
+
+    start = System.currentTimeMillis();
+    for (int i = 0; i < loop; i++) {
+      ZNRecord result = (ZNRecord) serializer2.deserialize(data);
+    }
+    System.out.println("ZNRecordStreamingSerializer deserialize took " + (System.currentTimeMillis() - start) + " ms");
+  }
+
+
+  private void batchSerialize(final ZkSerializer serializer, ExecutorService executorService, int repeatTime, final ZNRecord record)
+      throws ExecutionException, InterruptedException {
+    List<Future> futures = new ArrayList<>();
+    for (int i = 0; i < repeatTime; i++) {
+      Future f = executorService.submit(new Runnable() {
+        @Override public void run() {
+          serializer.serialize(record);
+        }
+      });
+      futures.add(f);
+    }
+    for (Future f : futures) {
+      f.get();
+    }
+  }
+
+
+  private void batchDeSerialize(final ZkSerializer serializer, ExecutorService executorService, int repeatTime, final byte [] data)
+      throws ExecutionException, InterruptedException {
+    List<Future> futures = new ArrayList<>();
+    for (int i = 0; i < repeatTime; i++) {
+      Future f = executorService.submit(new Runnable() {
+        @Override public void run() {
+          serializer.deserialize(data);
+        }
+      });
+      futures.add(f);
+    }
+    for (Future f : futures) {
+      f.get();
+    }
+  }
+
   /**
    * Test that simple, list, and map fields are initialized as empty even when not in json
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/b549cda9/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java
index 567d842..2aea3da 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java
@@ -46,6 +46,25 @@ public class TestZNRecordStreamingSerializer {
     Assert.assertEquals(result, record);
   }
 
+
+  // TODO: need to fix ZnRecordStreamingSerializer before enabling this test.
+  @Test (enabled = false)
+  public void testNullFields() {
+    ZNRecord record = new ZNRecord("testId");
+    record.setMapField("K1", null);
+    record.setListField("k2", null);
+    record.setSimpleField("k3", null);
+    ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
+    byte [] data = serializer.serialize(record);
+    ZNRecord result = (ZNRecord) serializer.deserialize(data);
+
+    Assert.assertEquals(result, record);
+    Assert.assertNull(result.getMapField("K1"));
+    Assert.assertNull(result.getListField("K2"));
+    Assert.assertNull(result.getSimpleField("K3"));
+    Assert.assertNull(result.getListField("K4"));
+  }
+
   /**
    * Check that the ZNRecord is not constructed if there is no id in the json
    */


[4/8] helix git commit: Emitting per resource rebalance status for possible calculation failure.

Posted by jx...@apache.org.
Emitting per resource rebalance status for possible calculation failure.

The status in MBean will be string for debug purposes only.
The resource rebalance state attribute will be in one of the following state:
1. NORMAL
2. BEST_POSSIBLE_STATE_CAL_FAILED: calculation failed or no possible allocation found.
3. INTERMEDIATE_STATE_CAL_FAILED: Intermediate state calculation failed. (not include throttled case)
4. UNKNOWN: the resource is not rebalanced or newly created.

Additional related changes:
1. Fix a cluster level metric related bug to generate the right metrics data.
2. Fix a resource monitoring bug that DISABLE_MONITORING is not working.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2f39f381
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2f39f381
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2f39f381

Branch: refs/heads/master
Commit: 2f39f381b0981503d7c204aabbeaa09153292e15
Parents: b549cda
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Fri Oct 5 16:26:11 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Nov 1 14:38:36 2018 -0700

----------------------------------------------------------------------
 .../stages/BestPossibleStateCalcStage.java      |  52 ++++----
 .../stages/ExternalViewComputeStage.java        |   4 +-
 .../stages/IntermediateStateCalcStage.java      |  19 +++
 .../monitoring/mbeans/ClusterStatusMonitor.java |  66 ++++++----
 .../monitoring/mbeans/ResourceMonitor.java      |  35 ++++--
 .../dynamicMBeans/DynamicMBeanProvider.java     |  18 +--
 .../TestAlertingRebalancerFailure.java          | 123 +++++++++++++------
 ...ceModeWhenReachingOfflineInstancesLimit.java |  61 ++++++---
 .../mbeans/TestClusterStatusMonitor.java        |  10 +-
 .../mbeans/TestDisableResourceMbean.java        |  17 ++-
 .../monitoring/mbeans/TestResourceMonitor.java  |  21 +++-
 11 files changed, 291 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 1bbd6a0..b0e453d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -43,6 +43,7 @@ import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.ResourceMonitor;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskRebalancer;
 import org.apache.helix.util.HelixUtil;
@@ -75,11 +76,6 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     // Reset current INIT/RUNNING tasks on participants for throttling
     cache.resetActiveTaskCount(currentStateOutput);
 
-    // Check whether the offline/disabled instance count in the cluster reaches the set limit,
-    // if yes, pause the rebalancer.
-    validateOfflineInstancesLimit(cache,
-        (HelixManager) event.getAttribute(AttributeName.helixmanager.name()), clusterStatusMonitor);
-
     final BestPossibleStateOutput bestPossibleStateOutput =
         compute(event, resourceMap, currentStateOutput);
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
@@ -112,6 +108,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     BestPossibleStateOutput output = new BestPossibleStateOutput();
 
     HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name());
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+
+    // Check whether the offline/disabled instance count in the cluster reaches the set limit,
+    // if yes, pause the rebalancer.
+    boolean isValid = validateOfflineInstancesLimit(cache,
+        (HelixManager) event.getAttribute(AttributeName.helixmanager.name()));
 
     final List<String> failureResources = new ArrayList<>();
     Iterator<Resource> itr = resourceMap.values().iterator();
@@ -125,6 +128,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
         LogUtil.logError(logger, _eventId,
             "Exception when calculating best possible states for " + resource.getResourceName(),
             ex);
+
       }
       if (!result) {
         failureResources.add(resource.getResourceName());
@@ -134,31 +138,34 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     }
 
     // Check and report if resource rebalance has failure
-    ClusterStatusMonitor clusterStatusMonitor =
-        event.getAttribute(AttributeName.clusterStatusMonitor.name());
-    updateRebalanceStatus(!failureResources.isEmpty(), helixManager, cache, clusterStatusMonitor,
+    updateRebalanceStatus(!isValid || !failureResources.isEmpty(), failureResources, helixManager,
+        cache, clusterStatusMonitor,
         "Failed to calculate best possible states for " + failureResources.size() + " resources.");
 
     return output;
   }
 
-  private void updateRebalanceStatus(final boolean hasFailure, final HelixManager helixManager,
-      final ClusterDataCache cache, final ClusterStatusMonitor clusterStatusMonitor,
-      final String errorMessage) {
+  private void updateRebalanceStatus(final boolean hasFailure, final List<String> failedResources,
+      final HelixManager helixManager, final ClusterDataCache cache,
+      final ClusterStatusMonitor clusterStatusMonitor, final String errorMessage) {
     asyncExecute(cache.getAsyncTasksThreadPool(), new Callable<Object>() {
       @Override
       public Object call() {
         try {
-          // TODO re-enable logging error after ticket HELIX-631 is resolved
-          /*
-          if (hasFailure && _statusUpdateUtil != null) {
-            _statusUpdateUtil
-                .logError(StatusUpdateUtil.ErrorType.RebalanceResourceFailure, this.getClass(),
-                    errorMessage, helixManager);
+          if (hasFailure) {
+            /* TODO Enable this update when we resolve ZK server load issue. This will cause extra write to ZK.
+            if (_statusUpdateUtil != null) {
+              _statusUpdateUtil
+                  .logError(StatusUpdateUtil.ErrorType.RebalanceResourceFailure, this.getClass(),
+                      errorMessage, helixManager);
+            }
+            */
+            LogUtil.logWarn(logger, _eventId, errorMessage);
           }
-          */
           if (clusterStatusMonitor != null) {
             clusterStatusMonitor.setRebalanceFailureGauge(hasFailure);
+            clusterStatusMonitor.setResourceRebalanceStates(failedResources,
+                ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED);
           }
         } catch (Exception e) {
           LogUtil.logError(logger, _eventId, "Could not update cluster status!", e);
@@ -170,8 +177,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
 
   // Check whether the offline/disabled instance count in the cluster reaches the set limit,
   // if yes, pause the rebalancer, and throw exception to terminate rebalance cycle.
-  private void validateOfflineInstancesLimit(final ClusterDataCache cache,
-      final HelixManager manager, final ClusterStatusMonitor clusterStatusMonitor) {
+  private boolean validateOfflineInstancesLimit(final ClusterDataCache cache,
+      final HelixManager manager) {
     int maxOfflineInstancesAllowed = cache.getClusterConfig().getMaxOfflineInstancesAllowed();
     if (maxOfflineInstancesAllowed >= 0) {
       int offlineCount = cache.getAllInstances().size() - cache.getEnabledLiveInstances().size();
@@ -190,11 +197,10 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
           LogUtil.logError(logger, _eventId, "Failed to put cluster " + cache.getClusterName()
               + " into maintenance mode, HelixManager is not set!");
         }
-        if (!cache.isTaskCache()) {
-          updateRebalanceStatus(true, manager, cache, clusterStatusMonitor, errMsg);
-        }
+        return false;
       }
     }
+    return true;
   }
 
   private boolean computeResourceBestPossibleState(ClusterEvent event, ClusterDataCache cache,

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index e3a504b..667b254 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -187,9 +187,7 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
               cache.getStateModelDef(idealState.getStateModelDefRef());
           clusterStatusMonitor
               .setResourceStatus(view, cache.getIdealState(view.getResourceName()),
-                  stateModelDef);
-          clusterStatusMonitor
-              .updatePendingMessages(resource.getResourceName(), totalPendingMessageCount);
+                  stateModelDef, totalPendingMessageCount);
           monitoringResources.add(resourceName);
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index c4d11d6..915a90f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -45,6 +45,7 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.ResourceMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,6 +138,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
 
     ClusterStatusMonitor clusterStatusMonitor =
         event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    List<String> failedResources = new ArrayList<>();
 
     // Priority is applied in assignment computation because higher priority by looping in order of
     // decreasing priority
@@ -170,8 +172,17 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       } catch (HelixException ex) {
         LogUtil.logInfo(logger, _eventId,
             "Failed to calculate intermediate partition states for resource " + resourceName, ex);
+        failedResources.add(resourceName);
       }
     }
+
+    if (clusterStatusMonitor != null) {
+      clusterStatusMonitor.setResourceRebalanceStates(failedResources,
+          ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
+      clusterStatusMonitor
+          .setResourceRebalanceStates(output.resourceSet(), ResourceMonitor.RebalanceStatus.NORMAL);
+    }
+
     return output;
   }
 
@@ -237,6 +248,14 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
                       + " mode due to an instance being assigned more replicas/partitions than "
                       + "the limit.");
             }
+
+            ClusterStatusMonitor clusterStatusMonitor =
+                event.getAttribute(AttributeName.clusterStatusMonitor.name());
+            if (clusterStatusMonitor != null) {
+              clusterStatusMonitor.setResourceRebalanceStates(Collections.singletonList(resource),
+                  ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
+            }
+
             throw new HelixException(errMsg);
           }
           instancePartitionCounts.put(instance, partitionCount);

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index f870ddc..803bd3c 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -19,9 +19,10 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
 import java.util.Arrays;
 import java.util.Collection;
@@ -33,10 +34,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.management.JMException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -61,7 +62,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   static final String RESOURCE_STATUS_KEY = "ResourceStatus";
   public static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
   public static final String CLUSTER_DN_KEY = "cluster";
-  static final String RESOURCE_DN_KEY = "resourceName";
+  public static final String RESOURCE_DN_KEY = "resourceName";
   static final String INSTANCE_DN_KEY = "instanceName";
   static final String MESSAGE_QUEUE_DN_KEY = "messageQueue";
   static final String WORKFLOW_TYPE_DN_KEY = "workflowType";
@@ -160,6 +161,16 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     this._rebalanceFailure = isFailure;
   }
 
+  public void setResourceRebalanceStates(Collection<String> resources,
+      ResourceMonitor.RebalanceStatus state) {
+    for (String resource : resources) {
+      ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resource);
+      if (resourceMonitor != null) {
+        resourceMonitor.setRebalanceState(state);
+      }
+    }
+  }
+
   @Override
   public long getMaxMessageQueueSizeGauge() {
     long maxQueueSize = 0;
@@ -421,11 +432,19 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   public void retainResourceMonitor(Set<String> resourceNames) {
     Set<String> resourcesToRemove = new HashSet<>();
     synchronized (this) {
+      resourceNames.retainAll(_resourceMbeanMap.keySet());
       resourcesToRemove.addAll(_resourceMbeanMap.keySet());
     }
     resourcesToRemove.removeAll(resourceNames);
 
     try {
+      registerResources(resourceNames);
+    } catch (JMException e) {
+      LOG.error(String.format("Could not register beans for the following resources: %s",
+          Joiner.on(',').join(resourceNames)), e);
+    }
+
+    try {
       unregisterResources(resourcesToRemove);
     } catch (MalformedObjectNameException e) {
       LOG.error(String.format("Could not unregister beans for the following resources: %s",
@@ -433,12 +452,14 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  public void setResourceStatus(ExternalView externalView, IdealState idealState, StateModelDefinition stateModelDef) {
+  public void setResourceStatus(ExternalView externalView, IdealState idealState,
+      StateModelDefinition stateModelDef, int messageCount) {
     try {
       ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(externalView.getId());
 
       if (resourceMonitor != null) {
-        resourceMonitor.updateResource(externalView, idealState, stateModelDef);
+        resourceMonitor.updateResourceState(externalView, idealState, stateModelDef);
+        resourceMonitor.updatePendingStateTransitionMessages(messageCount);
       }
     } catch (Exception e) {
       LOG.error("Fail to set resource status, resource: " + idealState.getResourceName(), e);
@@ -461,24 +482,12 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
 
     if (resourceMonitor != null) {
-      resourceMonitor.updateRebalancerStat(numPendingRecoveryRebalancePartitions,
+      resourceMonitor.updateRebalancerStats(numPendingRecoveryRebalancePartitions,
           numPendingLoadRebalancePartitions, numRecoveryRebalanceThrottledPartitions,
           numLoadRebalanceThrottledPartitions);
     }
   }
 
-  public synchronized void updatePendingMessages(String resourceName, int messageCount) {
-    try {
-      ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
-
-      if (resourceMonitor != null) {
-        resourceMonitor.updatePendingStateTransitionMessages(messageCount);
-      }
-    } catch (Exception e) {
-      LOG.error("Fail to update resource pending messages, resource: " + resourceName, e);
-    }
-  }
-
   private ResourceMonitor getOrCreateResourceMonitor(String resourceName) {
     try {
       if (!_resourceMbeanMap.containsKey(resourceName)) {
@@ -487,7 +496,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
             String beanName = getResourceBeanName(resourceName);
             ResourceMonitor bean =
                 new ResourceMonitor(_clusterName, resourceName, getObjectName(beanName));
-            bean.register();
             _resourceMbeanMap.put(resourceName, bean);
           }
         }
@@ -663,6 +671,15 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     _instanceMbeanMap.keySet().removeAll(instances);
   }
 
+  private synchronized void registerResources(Collection<String> resources) throws JMException {
+    for (String resourceName : resources) {
+      ResourceMonitor monitor = _resourceMbeanMap.get(resourceName);
+      if (monitor != null) {
+        monitor.register();
+      }
+    }
+  }
+
   private synchronized void unregisterResources(Collection<String> resources) throws MalformedObjectNameException {
     for (String resourceName : resources) {
       ResourceMonitor monitor = _resourceMbeanMap.get(resourceName);
@@ -729,6 +746,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
+  // For test only
   protected ResourceMonitor getResourceMonitor(String resourceName) {
     return _resourceMbeanMap.get(resourceName);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index c3dd242..fb9a779 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -41,6 +41,13 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
 public class ResourceMonitor extends DynamicMBeanProvider {
 
+  public enum RebalanceStatus {
+    UNKNOWN,
+    NORMAL,
+    BEST_POSSIBLE_STATE_CAL_FAILED,
+    INTERMEDIATE_STATE_CAL_FAILED
+  }
+
   // Gauges
   private SimpleDynamicMetric<Long> _numOfPartitions;
   private SimpleDynamicMetric<Long> _numOfPartitionsInExternalView;
@@ -67,6 +74,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
   private HistogramDynamicMetric _partitionTopStateHandoffUserLatencyGauge;
   private HistogramDynamicMetric _partitionTopStateNonGracefulHandoffDurationGauge;
 
+  private SimpleDynamicMetric<String> _rebalanceState;
+
   private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
   private long _lastResetTime;
   private final String _resourceName;
@@ -96,6 +105,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     attributeList.add(_partitionTopStateNonGracefulHandoffDurationGauge);
     attributeList.add(_totalMessageReceived);
     attributeList.add(_numPendingStateTransitions);
+    attributeList.add(_rebalanceState);
     doRegister(attributeList, _initObjectName);
     return this;
   }
@@ -146,6 +156,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     _successTopStateHandoffCounter = new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0L);
     _successfulTopStateHandoffDurationCounter =
         new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0L);
+
+    _rebalanceState = new SimpleDynamicMetric<>("RebalanceStatus", RebalanceStatus.UNKNOWN.name());
   }
 
   @Override
@@ -214,7 +226,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     return _clusterName + " " + _resourceName;
   }
 
-  public void updateResource(ExternalView externalView, IdealState idealState,
+  public void updateResourceState(ExternalView externalView, IdealState idealState,
       StateModelDefinition stateModelDef) {
     if (externalView == null) {
       _logger.warn("External view is null");
@@ -229,7 +241,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
       }
     }
 
-    resetGauges();
+    resetResourceStateGauges();
 
     if (idealState == null) {
       _logger.warn("ideal state is null for {}", _resourceName);
@@ -319,20 +331,13 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     }
   }
 
-  private void resetGauges() {
+  private void resetResourceStateGauges() {
     _numOfErrorPartitions.updateValue(0L);
     _numNonTopStatePartitions.updateValue(0L);
     _externalViewIdealStateDiff.updateValue(0L);
     _numOfPartitionsInExternalView.updateValue(0L);
-
-    // The following gauges are computed each call to updateResource by way of looping so need to be reset.
     _numLessMinActiveReplicaPartitions.updateValue(0L);
     _numLessReplicaPartitions.updateValue(0L);
-    _numPendingRecoveryRebalancePartitions.updateValue(0L);
-    _numPendingLoadRebalancePartitions.updateValue(0L);
-    _numRecoveryRebalanceThrottledPartitions.updateValue(0L);
-    _numLoadRebalanceThrottledPartitions.updateValue(0L);
-    _numPendingStateTransitions.updateValue(0L);
   }
 
   public void updatePendingStateTransitionMessages(int messageCount) {
@@ -367,7 +372,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     }
   }
 
-  public void updateRebalancerStat(long numPendingRecoveryRebalancePartitions,
+  public void updateRebalancerStats(long numPendingRecoveryRebalancePartitions,
       long numPendingLoadRebalancePartitions, long numRecoveryRebalanceThrottledPartitions,
       long numLoadRebalanceThrottledPartitions) {
     _numPendingRecoveryRebalancePartitions.updateValue(numPendingRecoveryRebalancePartitions);
@@ -376,6 +381,10 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     _numLoadRebalanceThrottledPartitions.updateValue(numLoadRebalanceThrottledPartitions);
   }
 
+  public void setRebalanceState(RebalanceStatus state) {
+    _rebalanceState.updateValue(state.name());
+  }
+
   public long getExternalViewPartitionGauge() {
     return _numOfPartitionsInExternalView.getValue();
   }
@@ -408,6 +417,10 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     return _numPendingStateTransitions.getValue();
   }
 
+  public String getRebalanceState() {
+    return _rebalanceState.getValue();
+  }
+
   public void resetMaxTopStateHandoffGauge() {
     if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis()) {
       _maxSinglePartitionTopStateHandoffDuration.updateValue(0L);

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
index 988ba9b..fbbb9e6 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
@@ -51,14 +51,15 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
    * @param domain         the MBean domain name
    * @param keyValuePairs  the MBean object name components
    */
-  protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics,
+  protected synchronized boolean doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics,
       String description, String domain, String... keyValuePairs) throws JMException {
     if (_objectName != null) {
-      throw new HelixException(
-          "Mbean has been registered before. Please create new object for new registration.");
+      _logger.warn("Mbean has been registered before. Please create new object for new registration.");
+      return false;
     }
     updateAttributtInfos(dynamicMetrics, description);
     _objectName = MBeanRegistrar.register(this, domain, keyValuePairs);
+    return true;
   }
 
   /**
@@ -68,19 +69,20 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
    * @param description    the MBean description
    * @param objectName     the proposed MBean ObjectName
    */
-  protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics,
+  protected synchronized boolean doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics,
       String description, ObjectName objectName) throws JMException {
     if (_objectName != null) {
-      throw new HelixException(
-          "Mbean has been registered before. Please create new object for new registration.");
+      _logger.warn("Mbean has been registered before. Please create new object for new registration.");
+      return false;
     }
     updateAttributtInfos(dynamicMetrics, description);
     _objectName = MBeanRegistrar.register(this, objectName);
+    return true;
   }
 
-  protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics,
+  protected synchronized boolean doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics,
       ObjectName objectName) throws JMException {
-    doRegister(dynamicMetrics, null, objectName);
+    return doRegister(dynamicMetrics, null, objectName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
index e732c85..7defef4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
@@ -19,18 +19,19 @@ package org.apache.helix.integration;
  * under the License.
  */
 
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Set;
-import javax.management.MBeanServerConnection;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
+
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -42,6 +43,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.mbeans.ResourceMonitor;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
@@ -50,6 +52,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY;
+import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.RESOURCE_DN_KEY;
 import static org.apache.helix.util.StatusUpdateUtil.ErrorType.RebalanceResourceFailure;
 
 public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
@@ -91,6 +94,9 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
 
     accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
     errorNodeKey = accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
+
+    _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
   }
 
   @BeforeMethod
@@ -99,8 +105,8 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     accessor.removeProperty(errorNodeKey);
   }
 
-  @Test (enabled = false)
-  public void testParticipantUnavailable() {
+  @Test
+  public void testParticipantUnavailable() throws Exception {
     _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
         BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name());
     _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
@@ -119,6 +125,7 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     // Verify there is no rebalance error logged
     Assert.assertNull(accessor.getProperty(errorNodeKey));
     checkRebalanceFailureGauge(false);
+    checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb);
 
     // kill nodes, so rebalance cannot be done
     for (int i = 0; i < NODE_NR; i++) {
@@ -126,8 +133,10 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     }
 
     // Verify the rebalance error caused by no node available
-    Assert.assertNotNull(pollForError(accessor, errorNodeKey));
+    pollForError(accessor, errorNodeKey);
     checkRebalanceFailureGauge(true);
+    checkResourceBestPossibleCalFailureState(
+        ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, testDb);
 
     // clean up
     _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
@@ -138,10 +147,20 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     }
   }
 
-  @Test (enabled = false)
-  public void testTagSetIncorrect() {
+  @Test (dependsOnMethods = "testParticipantUnavailable")
+  public void testTagSetIncorrect() throws Exception {
     _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
         BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name());
+    ZkHelixClusterVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(new HashSet<>(Collections.singleton(testDb))).build();
+    Assert.assertTrue(verifier.verifyByPolling());
+
+    // Verify there is no rebalance error logged
+    Assert.assertNull(accessor.getProperty(errorNodeKey));
+    checkRebalanceFailureGauge(false);
+    checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb);
+
     // set expected instance tag
     IdealState is =
         _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
@@ -150,15 +169,17 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
 
     // Verify there is rebalance error logged
-    Assert.assertNotNull(pollForError(accessor, errorNodeKey));
+    pollForError(accessor, errorNodeKey);
     checkRebalanceFailureGauge(true);
+    checkResourceBestPossibleCalFailureState(
+        ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, testDb);
 
     // clean up
     _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
   }
 
-  @Test (enabled = false)
-  public void testWithDomainId() throws InterruptedException {
+  @Test (dependsOnMethods = "testTagSetIncorrect")
+  public void testWithDomainId() throws Exception {
     int replicas = 2;
     ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
     // 1. disable all participants except one node, then set domain Id
@@ -192,14 +213,17 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     // Verify there is no rebalance error logged
     Assert.assertNull(accessor.getProperty(errorNodeKey));
     checkRebalanceFailureGauge(false);
+    checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb);
 
     // 2. enable the rest nodes with no domain Id
     for (int i = replicas; i < NODE_NR; i++) {
       setInstanceEnable(_participants[i].getInstanceName(), true, configAccessor);
     }
     // Verify there is rebalance error logged
-    Assert.assertNotNull(pollForError(accessor, errorNodeKey));
+    pollForError(accessor, errorNodeKey);
     checkRebalanceFailureGauge(true);
+    checkResourceBestPossibleCalFailureState(
+        ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, testDb);
 
     // 3. reset all nodes domain Id to be correct setting
     for (int i = replicas; i < NODE_NR; i++) {
@@ -211,6 +235,7 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
 
     // Verify that rebalance error state is removed
     checkRebalanceFailureGauge(false);
+    checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb);
 
     // clean up
     _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
@@ -223,6 +248,14 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
         String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), clusterBeanName));
   }
 
+  private ObjectName getResourceMbeanName(String clusterName, String resourceName)
+      throws MalformedObjectNameException {
+    String resourceBeanName =
+        String.format("%s=%s,%s=%s", CLUSTER_DN_KEY, clusterName, RESOURCE_DN_KEY, resourceName);
+    return new ObjectName(
+        String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), resourceBeanName));
+  }
+
   private void setDomainId(String instanceName, ConfigAccessor configAccessor) {
     String domain = String.format("Rack=%s, Instance=%s", instanceName, instanceName);
     InstanceConfig instanceConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName);
@@ -237,30 +270,50 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     configAccessor.setInstanceConfig(CLUSTER_NAME, instanceName, instanceConfig);
   }
 
-  private void checkRebalanceFailureGauge(boolean expectFailure) {
-    try {
-      Long value = (Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), "RebalanceFailureGauge");
-      Assert.assertNotNull(value);
-      Assert.assertEquals(value == 1, expectFailure);
-    } catch (Exception e) {
-      Assert.fail("Failed to get attribute!");
-    }
+  private void checkRebalanceFailureGauge(final boolean expectFailure) throws Exception {
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        try {
+          Long value =
+              (Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), "RebalanceFailureGauge");
+          return value != null && (value == 1) == expectFailure;
+        } catch (Exception e) {
+          return false;
+        }
+      }
+    }, 5000); Assert.assertTrue(result);
   }
 
-  private HelixProperty pollForError(HelixDataAccessor accessor, PropertyKey key) {
-    final int POLL_TIMEOUT = 5000;
-    final int POLL_INTERVAL = 100;
-    HelixProperty property = accessor.getProperty(key);
-    int timeWaited = 0;
-    while (property == null && timeWaited < POLL_TIMEOUT) {
-      try {
-        Thread.sleep(POLL_INTERVAL);
-      } catch (InterruptedException e) {
-        return null;
+  private void checkResourceBestPossibleCalFailureState(
+      final ResourceMonitor.RebalanceStatus expectedState, final String resourceName)
+      throws Exception {
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        try {
+          String state = (String) _server
+              .getAttribute(getResourceMbeanName(CLUSTER_NAME, resourceName), "RebalanceStatus");
+          return state != null && state.equals(expectedState.name());
+        } catch (Exception e) {
+          return false;
+        }
       }
-      timeWaited += POLL_INTERVAL;
-      property = accessor.getProperty(key);
-    }
-    return property;
+    }, 5000);
+    Assert.assertTrue(result);
+  }
+
+  private void pollForError(final HelixDataAccessor accessor, final PropertyKey key)
+      throws Exception {
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        /* TODO re-enable this check when we start recording rebalance error again
+        return accessor.getProperty(key) != null;
+        */
+        return true;
+      }
+    }, 5000);
+    Assert.assertTrue(result);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
index c89505b..0de2fe3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
@@ -19,16 +19,17 @@ package org.apache.helix.integration.rebalancer;
  * under the License.
  */
 
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import javax.management.MBeanServerConnection;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
+import javax.management.*;
+
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -42,10 +43,12 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY;
+import static org.apache.helix.util.StatusUpdateUtil.ErrorType.RebalanceResourceFailure;
 
 public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     extends ZkTestBase {
@@ -105,12 +108,19 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
   }
 
+  @AfterMethod
+  public void afterMethod() {
+    cleanupRebalanceError();
+  }
+
   @Test
   public void testWithDisabledInstancesLimit() throws Exception {
     MaintenanceSignal maintenanceSignal =
         _dataAccessor.getProperty(_dataAccessor.keyBuilder().maintenance());
     Assert.assertNull(maintenanceSignal);
 
+    checkForRebalanceError(false);
+
     HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
 
     // disable instance
@@ -133,6 +143,8 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     Assert.assertNotNull(maintenanceSignal);
     Assert.assertNotNull(maintenanceSignal.getReason());
 
+    checkForRebalanceError(true);
+
     for (i = 2; i < 2 + _maxOfflineInstancesAllowed + 1; i++) {
       instance = _participants.get(i).getInstanceName();
       admin.enableInstance(CLUSTER_NAME, instance, true);
@@ -146,6 +158,9 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     MaintenanceSignal maintenanceSignal =
         _dataAccessor.getProperty(_dataAccessor.keyBuilder().maintenance());
     Assert.assertNull(maintenanceSignal);
+
+    checkForRebalanceError(false);
+
     int i;
     for (i = 2; i < 2 + _maxOfflineInstancesAllowed; i++) {
       _participants.get(i).syncStop();
@@ -163,19 +178,8 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     Assert.assertNotNull(maintenanceSignal);
     Assert.assertNotNull(maintenanceSignal.getReason());
 
-    // TODO re-enable the check after HELIX-631 is fixed
-    /*
-    // Verify there is no rebalance error logged
-    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
-    PropertyKey errorNodeKey =
-        accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
-    Assert.assertNotNull(accessor.getProperty(errorNodeKey));
-
-    Long value =
-        (Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), "RebalanceFailureGauge");
-    Assert.assertNotNull(value);
-    Assert.assertTrue(value.longValue() > 0);
-    */
+    // Verify there is rebalance error logged
+    checkForRebalanceError(true);
   }
 
   @AfterClass
@@ -193,12 +197,33 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
   }
 
-  private ObjectName getMbeanName(String clusterName)
+  private void checkForRebalanceError(boolean expectError)
+      throws MalformedObjectNameException, AttributeNotFoundException, MBeanException,
+      ReflectionException, InstanceNotFoundException, IOException {
+    /* TODO re-enable this check when we start recording rebalance error again
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+    PropertyKey errorNodeKey =
+        accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
+    Assert.assertEquals(accessor.getProperty(errorNodeKey) != null, expectError);
+    */
+
+    Long value =
+        (Long) _server.getAttribute(getClusterMbeanName(CLUSTER_NAME), "RebalanceFailureGauge");
+    Assert.assertEquals(value != null && value.longValue() > 0, expectError);
+  }
+
+  private void cleanupRebalanceError() {
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+    PropertyKey errorNodeKey =
+        accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
+    accessor.removeProperty(errorNodeKey);
+  }
+
+  private ObjectName getClusterMbeanName(String clusterName)
       throws MalformedObjectNameException {
     String clusterBeanName =
         String.format("%s=%s", CLUSTER_DN_KEY, clusterName);
     return new ObjectName(
         String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), clusterBeanName));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index 143d325..b2daba6 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -205,7 +205,7 @@ public class TestClusterStatusMonitor {
     StateModelDefinition stateModelDef =
         BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition();
 
-    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, 0);
 
     Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
     Assert.assertEquals(monitor.getTotalResourceGauge(), 1);
@@ -238,7 +238,7 @@ public class TestClusterStatusMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, 0);
     Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
     Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), lessMinActiveReplica);
     Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
@@ -266,7 +266,7 @@ public class TestClusterStatusMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, 0);
     Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
     Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
     Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), missTopState);
@@ -291,7 +291,7 @@ public class TestClusterStatusMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, 0);
     Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
     Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
     Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
@@ -313,7 +313,7 @@ public class TestClusterStatusMonitor {
 
     // test pending state transition message report and read
     messageCount = new Random().nextInt(numPartition) + 1;
-    monitor.updatePendingMessages(testDB, messageCount);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, messageCount);
     Assert.assertEquals(monitor.getPendingStateTransitionGuage(), messageCount);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
index fbbf4b8..0c8ebe7 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
@@ -88,9 +88,9 @@ public class TestDisableResourceMbean extends ZkUnitTestBase {
     Assert.assertTrue(clusterVerifier.verifyByPolling());
 
     // Verify the bean was created for TestDB0, but not for TestDB1.
-    Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB0", clusterName)));
-    Assert.assertFalse(_mbeanServer.isRegistered(getMbeanName("TestDB1", clusterName)));
-    Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB2", clusterName)));
+    pollForMBeanExistance(getMbeanName("TestDB0", clusterName), true);
+    pollForMBeanExistance(getMbeanName("TestDB1", clusterName), false);
+    pollForMBeanExistance(getMbeanName("TestDB2", clusterName), true);
 
     controller.syncStop();
     for (MockParticipantManager participant : participants) {
@@ -100,6 +100,17 @@ public class TestDisableResourceMbean extends ZkUnitTestBase {
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
+  private void pollForMBeanExistance(final ObjectName objectName, boolean expectation)
+      throws Exception {
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() throws Exception {
+        return _mbeanServer.isRegistered(objectName);
+      }
+    }, 3000);
+    Assert.assertEquals(result, expectation);
+  }
+
   private ObjectName getMbeanName(String resourceName, String clusterName)
       throws MalformedObjectNameException {
     String clusterBeanName =

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index 5310ded..713fd65 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -62,7 +62,7 @@ public class TestResourceMonitor {
     StateModelDefinition stateModelDef =
         BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition();
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
@@ -88,7 +88,7 @@ public class TestResourceMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), errorCount);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), errorCount);
@@ -119,7 +119,7 @@ public class TestResourceMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessMinActiveReplica);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
@@ -151,7 +151,7 @@ public class TestResourceMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessReplica);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
@@ -181,7 +181,7 @@ public class TestResourceMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), missTopState);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
@@ -196,6 +196,17 @@ public class TestResourceMonitor {
     int messageCount = new Random().nextInt(_partitions) + 1;
     monitor.updatePendingStateTransitionMessages(messageCount);
     Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), messageCount);
+
+    Assert
+        .assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.UNKNOWN.name());
+    monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.NORMAL);
+    Assert.assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.NORMAL.name());
+    monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED);
+    Assert.assertEquals(monitor.getRebalanceState(),
+        ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED.name());
+    monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
+    Assert.assertEquals(monitor.getRebalanceState(),
+        ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED.name());
   }
 
   /**


[5/8] helix git commit: Revert the ObjectMapper change in ZKRecordSerializer.

Posted by jx...@apache.org.
Revert the ObjectMapper change in ZKRecordSerializer.

We noticed OOM in the test after this change. And verified that this is causing the problem, revert it for now to unblocking test.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/30c0eff3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/30c0eff3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/30c0eff3

Branch: refs/heads/master
Commit: 30c0eff3ea04cabb6e3c3fe161fe457100f1868e
Parents: 2f39f38
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Thu Oct 11 16:58:36 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Nov 1 14:38:43 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/manager/zk/ZNRecordSerializer.java  | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/30c0eff3/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
index 0c92224..890bb13 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
@@ -36,7 +36,6 @@ import org.codehaus.jackson.map.SerializationConfig;
 
 public class ZNRecordSerializer implements ZkSerializer {
   private static Logger logger = LoggerFactory.getLogger(ZNRecordSerializer.class);
-  private final ObjectMapper _mapper = new ObjectMapper();
 
   private static int getListFieldBound(ZNRecord record) {
     int max = Integer.MAX_VALUE;
@@ -75,14 +74,15 @@ public class ZNRecordSerializer implements ZkSerializer {
     }
 
     // do serialization
-    SerializationConfig serializationConfig = _mapper.getSerializationConfig();
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
     serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
     serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
     serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     byte[] serializedBytes;
     try {
-      _mapper.writeValue(baos, data);
+      mapper.writeValue(baos, data);
       serializedBytes = baos.toByteArray();
       // apply compression if needed
       if (record.getBooleanField("enableCompression", false) || serializedBytes.length > ZNRecord.SIZE_LIMIT) {
@@ -111,7 +111,8 @@ public class ZNRecordSerializer implements ZkSerializer {
 
     ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
 
-    DeserializationConfig deserializationConfig = _mapper.getDeserializationConfig();
+    ObjectMapper mapper = new ObjectMapper();
+    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
     deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
     deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
     deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
@@ -121,7 +122,7 @@ public class ZNRecordSerializer implements ZkSerializer {
         byte[] uncompressedBytes = GZipCompressionUtil.uncompress(bais);
         bais = new ByteArrayInputStream(uncompressedBytes);
       }
-      ZNRecord zn = _mapper.readValue(bais, ZNRecord.class);
+      ZNRecord zn = mapper.readValue(bais, ZNRecord.class);
 
       return zn;
     } catch (Exception e) {


[8/8] helix git commit: Fix TestRoutingTableProviderFromTargetEV by using poll&wait check.

Posted by jx...@apache.org.
Fix TestRoutingTableProviderFromTargetEV by using poll&wait check.

Add a 3 seconds poll&wait for stabling the test.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/44d7d2eb
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/44d7d2eb
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/44d7d2eb

Branch: refs/heads/master
Commit: 44d7d2eb8b3da78630ca0d327d88448f8a82672c
Parents: 1507f01
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Fri Oct 12 11:05:24 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Nov 1 14:39:05 2018 -0700

----------------------------------------------------------------------
 .../TestRoutingTableProviderFromTargetEV.java   | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/44d7d2eb/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromTargetEV.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromTargetEV.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromTargetEV.java
index da6e2f8..6a24986 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromTargetEV.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromTargetEV.java
@@ -3,12 +3,14 @@ package org.apache.helix.integration.spectator;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -111,7 +113,7 @@ public class TestRoutingTableProviderFromTargetEV extends ZkTestBase {
   }
 
   @Test (dependsOnMethods = "testTargetExternalViewWithoutEnable")
-  public void testExternalViewDiffFromTargetExternalView() throws InterruptedException {
+  public void testExternalViewDiffFromTargetExternalView() throws Exception {
     ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
     clusterConfig.enableTargetExternalView(true);
     clusterConfig.setPersistBestPossibleAssignment(true);
@@ -120,7 +122,7 @@ public class TestRoutingTableProviderFromTargetEV extends ZkTestBase {
 
     RoutingTableProvider externalViewProvider =
         new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
-    RoutingTableProvider targetExternalViewProvider =
+    final RoutingTableProvider targetExternalViewProvider =
         new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
 
     try {
@@ -129,9 +131,17 @@ public class TestRoutingTableProviderFromTargetEV extends ZkTestBase {
       Set<InstanceConfig> externalViewMasters =
           externalViewProvider.getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER");
       Assert.assertEquals(externalViewMasters.size(), 0);
-      Set<InstanceConfig> targetExternalViewMasters = targetExternalViewProvider
-          .getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER");
-      Assert.assertEquals(targetExternalViewMasters.size(), NUM_NODES);
+
+      final Set<InstanceConfig> targetExternalViewMasters = new HashSet<>();
+      Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
+        @Override
+        public boolean verify() {
+          targetExternalViewMasters.clear();
+          targetExternalViewMasters.addAll(targetExternalViewProvider
+              .getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER"));
+          return targetExternalViewMasters.size() == NUM_NODES;
+        }
+      }, 3000));
 
       // TargetExternalView MASTERS mapping should exactly match IdealState MASTERS mapping
       Map<String, Map<String, String>> stateMap = _gSetupTool.getClusterManagementTool()


[2/8] helix git commit: Fix TestBucketizedResource.

Posted by jx...@apache.org.
Fix TestBucketizedResource.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f9bc9f8c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f9bc9f8c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f9bc9f8c

Branch: refs/heads/master
Commit: f9bc9f8c69a480c87f89435b8b59f3ca04e8aeb0
Parents: 3721c1f
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Oct 10 10:14:25 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Nov 1 14:38:27 2018 -0700

----------------------------------------------------------------------
 .../helix/integration/TestBucketizedResource.java   | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f9bc9f8c/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
index 27a7729..d10234d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
@@ -121,7 +121,7 @@ public class TestBucketizedResource extends ZkTestBase {
   }
 
   @Test
-  public void testBounceDisableAndDrop() {
+  public void testBounceDisableAndDrop() throws Exception {
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String clusterName = className + "_" + methodName;
@@ -154,7 +154,7 @@ public class TestBucketizedResource extends ZkTestBase {
     participants[0].syncStop();
     participants[0] = new MockParticipantManager(ZK_ADDR, clusterName, instanceNames.get(0));
     participants[0].syncStart();
-    
+
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // make sure participants[0]'s current state is bucketzied correctly during carryover
@@ -176,8 +176,16 @@ public class TestBucketizedResource extends ZkTestBase {
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // make sure external-view is cleaned up
-    path = keyBuilder.externalView(dbName).getPath();
-    boolean result = _baseAccessor.exists(path, 0);
+    final String evPath = keyBuilder.externalView(dbName).getPath();
+
+    TestHelper.verify(new TestHelper.Verifier() {
+        @Override
+        public boolean verify() {
+          return !_baseAccessor.exists(evPath, 0);
+        }
+      }, 3000);
+
+    boolean result = _baseAccessor.exists(evPath, 0);
     Assert.assertFalse(result);
 
     // clean up


[6/8] helix git commit: Fix test TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit

Posted by jx...@apache.org.
Fix test TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit

The test becomes unstable due to pipeline runtime changed.
Change to poll and wait style check.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c783ae78
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c783ae78
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c783ae78

Branch: refs/heads/master
Commit: c783ae788f03c55122858eb552963de5c9a264dd
Parents: 30c0eff
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Thu Oct 11 17:37:15 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Nov 1 14:38:48 2018 -0700

----------------------------------------------------------------------
 ...ceModeWhenReachingOfflineInstancesLimit.java | 31 +++++++++++---------
 1 file changed, 17 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c783ae78/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
index 0de2fe3..9067bb5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
@@ -19,7 +19,6 @@ package org.apache.helix.integration.rebalancer;
  * under the License.
  */
 
-import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Date;
@@ -30,6 +29,7 @@ import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -197,19 +197,22 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
   }
 
-  private void checkForRebalanceError(boolean expectError)
-      throws MalformedObjectNameException, AttributeNotFoundException, MBeanException,
-      ReflectionException, InstanceNotFoundException, IOException {
-    /* TODO re-enable this check when we start recording rebalance error again
-    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
-    PropertyKey errorNodeKey =
-        accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
-    Assert.assertEquals(accessor.getProperty(errorNodeKey) != null, expectError);
-    */
-
-    Long value =
-        (Long) _server.getAttribute(getClusterMbeanName(CLUSTER_NAME), "RebalanceFailureGauge");
-    Assert.assertEquals(value != null && value.longValue() > 0, expectError);
+  private void checkForRebalanceError(final boolean expectError) throws Exception {
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() throws Exception {
+        /* TODO re-enable this check when we start recording rebalance error again
+        ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+        PropertyKey errorNodeKey =
+            accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
+        Assert.assertEquals(accessor.getProperty(errorNodeKey) != null, expectError);
+        */
+        Long value =
+            (Long) _server.getAttribute(getClusterMbeanName(CLUSTER_NAME), "RebalanceFailureGauge");
+        return expectError == (value != null && value.longValue() > 0);
+      }
+    }, 5000);
+    Assert.assertTrue(result);
   }
 
   private void cleanupRebalanceError() {