You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/11 19:13:18 UTC

[4/7] git commit: [HELIX-446] Remove ZkPropertyTransfer and restlet dependency from helix-core

[HELIX-446] Remove ZkPropertyTransfer and restlet dependency from helix-core


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/85277291
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/85277291
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/85277291

Branch: refs/heads/master
Commit: 85277291607154f6d63ce0a8e401e221e55cc95c
Parents: e914edb
Author: zzhang <zz...@apache.org>
Authored: Tue May 20 17:56:01 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Jul 10 11:32:43 2014 -0700

----------------------------------------------------------------------
 helix-core/pom.xml                              |   7 +-
 .../java/org/apache/helix/PropertyType.java     |  12 -
 .../helix/controller/HelixControllerMain.java   |  19 -
 .../restlet/ZKPropertyTransferServer.java       | 247 -----------
 .../controller/restlet/ZNRecordUpdate.java      |  83 ----
 .../restlet/ZNRecordUpdateResource.java         |  79 ----
 .../restlet/ZkPropertyTransferApplication.java  |  45 --
 .../restlet/ZkPropertyTransferClient.java       | 177 --------
 .../helix/controller/restlet/package-info.java  |  23 -
 .../manager/zk/ParticipantManagerHelper.java    |   1 -
 .../helix/manager/zk/ZKHelixDataAccessor.java   |  71 +---
 .../apache/helix/manager/zk/ZKHelixManager.java |   2 -
 .../manager/zk/ZNRecordStreamingSerializer.java |   6 +-
 .../helix/manager/zk/ZkHelixLeaderElection.java |  10 -
 .../helix/manager/zk/ZkHelixParticipant.java    |   2 -
 .../healthcheck/TestAlertActionTriggering.java  | 224 ----------
 .../helix/healthcheck/TestAlertFireHistory.java | 422 -------------------
 .../helix/integration/TestAutoRebalance.java    |   6 +-
 .../TestAutoRebalancePartitionLimit.java        |   2 +-
 .../TestCustomizedIdealStateRebalancer.java     |   2 +-
 .../helix/integration/TestDisableNode.java      |   2 +-
 .../helix/integration/TestDisablePartition.java |   2 +-
 .../helix/integration/TestDropResource.java     |   2 +-
 .../helix/integration/TestMessagingService.java |   4 +-
 .../helix/integration/TestSchedulerMessage.java |   6 +-
 .../TestUserDefRebalancerCompatibility.java     |   3 +-
 .../integration/TestZkCallbackHandlerLeak.java  |  77 ++--
 ...dAloneCMTestBaseWithPropertyServerCheck.java |  88 ----
 .../manager/TestConsecutiveZkSessionExpiry.java |   4 +-
 .../TestDistributedControllerManager.java       |   4 +-
 .../manager/TestZkCallbackHandlerLeak.java      |  76 ++--
 .../manager/zk/TestLiveInstanceBounce.java      |   4 +-
 .../zk/TestZKPropertyTransferServer.java        |  63 ---
 .../manager/zk/TestZkStateChangeListener.java   |   5 +-
 34 files changed, 106 insertions(+), 1674 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index b8e7a9a..0d06914 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -45,7 +45,6 @@ under the License.
       org.apache.zookeeper.txn*;resolution:=optional,
       org.apache.zookeeper*;version="[3.3,4)",
       org.codehaus.jackson*;version="[1.8,2)",
-      org.restlet;version="[2.2.1,3]",
       *
     </osgi.import>
     <osgi.ignore>
@@ -131,9 +130,9 @@ under the License.
       <version>2.1</version>
     </dependency>
     <dependency>
-      <groupId>org.restlet.jse</groupId>
-      <artifactId>org.restlet</artifactId>
-      <version>2.2.1</version>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <version>1.6</version>
     </dependency>
     <dependency>
       <groupId>com.google.guava</groupId>

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/PropertyType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index 75adb20..579f454 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -85,8 +85,6 @@ public enum PropertyType {
    */
   boolean isCached;
 
-  boolean usePropertyTransferServer;
-
   private PropertyType(Type type, boolean isPersistent, boolean mergeOnUpdate) {
     this(type, isPersistent, mergeOnUpdate, false);
   }
@@ -114,7 +112,6 @@ public enum PropertyType {
     this.updateOnlyOnExists = updateOnlyOnExists;
     this.createOnlyIfAbsent = createOnlyIfAbsent;
     this.isCached = isCached;
-    this.usePropertyTransferServer = isAsyncWrite;
   }
 
   /**
@@ -204,13 +201,4 @@ public enum PropertyType {
   public boolean isCached() {
     return isCached;
   }
-
-  /**
-   * Check if this property uses a property transfer server
-   * @return true if a property transfer server is used, false otherwise
-   */
-  public boolean usePropertyTransferServer() {
-    return usePropertyTransferServer;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
index 62f3b23..b6c16b5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -47,7 +47,6 @@ import org.apache.commons.cli.ParseException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
 import org.apache.helix.manager.zk.HelixManagerShutdownHook;
 import org.apache.helix.participant.DistClusterControllerStateModelFactory;
 import org.apache.helix.participant.StateMachineEngine;
@@ -58,7 +57,6 @@ public class HelixControllerMain {
   public static final String cluster = "cluster";
   public static final String help = "help";
   public static final String mode = "mode";
-  public static final String propertyTransferServicePort = "propertyTransferPort";
   public static final String name = "controllerName";
   public static final String STANDALONE = "STANDALONE";
   public static final String DISTRIBUTED = "DISTRIBUTED";
@@ -101,19 +99,11 @@ public class HelixControllerMain {
     controllerNameOption.setRequired(false);
     controllerNameOption.setArgName("Cluster controller name (Optional)");
 
-    Option portOption =
-        OptionBuilder.withLongOpt(propertyTransferServicePort)
-            .withDescription("Webservice port for ZkProperty controller transfer").create();
-    portOption.setArgs(1);
-    portOption.setRequired(false);
-    portOption.setArgName("Cluster controller property transfer port (Optional)");
-
     Options options = new Options();
     options.addOption(helpOption);
     options.addOption(zkServerOption);
     options.addOption(clusterOption);
     options.addOption(modeOption);
-    options.addOption(portOption);
     options.addOption(controllerNameOption);
 
     return options;
@@ -208,15 +198,11 @@ public class HelixControllerMain {
     String clusterName = cmd.getOptionValue(cluster);
     String controllerMode = STANDALONE;
     String controllerName = null;
-    int propertyTransServicePort = -1;
 
     if (cmd.hasOption(mode)) {
       controllerMode = cmd.getOptionValue(mode);
     }
 
-    if (cmd.hasOption(propertyTransferServicePort)) {
-      propertyTransServicePort = Integer.parseInt(cmd.getOptionValue(propertyTransferServicePort));
-    }
     if (controllerMode.equalsIgnoreCase(DISTRIBUTED) && !cmd.hasOption(name)) {
       throw new IllegalArgumentException(
           "A unique cluster controller name is required in DISTRIBUTED mode");
@@ -228,10 +214,6 @@ public class HelixControllerMain {
     logger.info("Cluster manager started, zkServer: " + zkConnectString + ", clusterName:"
         + clusterName + ", controllerName:" + controllerName + ", mode:" + controllerMode);
 
-    if (propertyTransServicePort > 0) {
-      ZKPropertyTransferServer.getInstance().init(propertyTransServicePort, zkConnectString);
-    }
-
     HelixManager manager =
         startHelixController(zkConnectString, clusterName, controllerName, controllerMode);
 
@@ -244,7 +226,6 @@ public class HelixControllerMain {
           + " interrupted");
     } finally {
       manager.disconnect();
-      ZKPropertyTransferServer.getInstance().shutdown();
     }
 
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
deleted file mode 100644
index 66bd257..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
+++ /dev/null
@@ -1,247 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.AccessOption;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.log4j.Logger;
-import org.restlet.Component;
-import org.restlet.Context;
-import org.restlet.data.Protocol;
-
-/**
- * Controller side restlet server that receives ZNRecordUpdate requests from
- * clients, and batch the ZNRecordUpdate and apply them to zookeeper. This is
- * to optimize the concurrency level of zookeeper access for ZNRecord updates
- * that does not require real-time, like message handling status updates and
- * healthcheck reports.
- * As one server will be used by multiple helix controllers that runs on the same machine,
- * This class is designed as a singleton. Application is responsible to call init()
- * and shutdown() on the getInstance().
- */
-public class ZKPropertyTransferServer {
-  public static final String PORT = "port";
-  public static String RESTRESOURCENAME = "ZNRecordUpdates";
-  public static final String SERVER = "ZKPropertyTransferServer";
-
-  // Frequency period for the ZNRecords are batch written to zookeeper
-  public static int PERIOD = 10 * 1000;
-  // If the buffered ZNRecord updates exceed the limit, do a zookeeper batch update.
-  public static int MAX_UPDATE_LIMIT = 10000;
-  private static Logger LOG = Logger.getLogger(ZKPropertyTransferServer.class);
-
-  int _localWebservicePort;
-  String _webserviceUrl;
-  ZkBaseDataAccessor<ZNRecord> _accessor;
-  String _zkAddress;
-
-  AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef =
-      new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
-
-  boolean _initialized = false;
-  boolean _shutdownFlag = false;
-  Component _component = null;
-  Timer _timer = null;
-
-  static {
-    org.restlet.engine.Engine.setLogLevel(Level.SEVERE);
-  }
-
-  /**
-   * Timertask for zookeeper batched writes
-   */
-  class ZKPropertyTransferTask extends TimerTask {
-    @Override
-    public void run() {
-      try {
-        sendData();
-      } catch (Throwable t) {
-        LOG.error("", t);
-      }
-
-    }
-  }
-
-  void sendData() {
-    LOG.info("ZKPropertyTransferServer transfering data to zookeeper");
-    ConcurrentHashMap<String, ZNRecordUpdate> updateCache = null;
-
-    synchronized (_dataBufferRef) {
-      updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-    }
-
-    if (updateCache != null) {
-      List<String> paths = new ArrayList<String>();
-      List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
-      List<ZNRecord> vals = new ArrayList<ZNRecord>();
-      // BUGBUG : what if the instance is dropped?
-      for (ZNRecordUpdate holder : updateCache.values()) {
-        paths.add(holder.getPath());
-        updaters.add(holder.getZNRecordUpdater());
-        vals.add(holder.getRecord());
-      }
-      // Batch write the accumulated updates into zookeeper
-      long timeStart = System.currentTimeMillis();
-      if (paths.size() > 0) {
-        _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
-      }
-      LOG.info("ZKPropertyTransferServer updated " + vals.size() + " records in "
-          + (System.currentTimeMillis() - timeStart) + " ms");
-    } else {
-      LOG.warn("null _dataQueueRef. Should be in the beginning only");
-    }
-  }
-
-  static ZKPropertyTransferServer _instance = new ZKPropertyTransferServer();
-
-  private ZKPropertyTransferServer() {
-    _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-  }
-
-  public static ZKPropertyTransferServer getInstance() {
-    return _instance;
-  }
-
-  public boolean isInitialized() {
-    return _initialized;
-  }
-
-  public void init(int localWebservicePort, String zkAddress) {
-    if (!_initialized && !_shutdownFlag) {
-      LOG.error("Initializing with port " + localWebservicePort + " zkAddress: " + zkAddress);
-      _localWebservicePort = localWebservicePort;
-      ZkClient zkClient = new ZkClient(zkAddress);
-      zkClient.setZkSerializer(new ZNRecordSerializer());
-      _accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
-      _zkAddress = zkAddress;
-      startServer();
-    } else {
-      LOG.error("Already initialized with port " + _localWebservicePort + " shutdownFlag: "
-          + _shutdownFlag);
-    }
-  }
-
-  public String getWebserviceUrl() {
-    if (!_initialized || _shutdownFlag) {
-      LOG.debug("inited:" + _initialized + " shutdownFlag:" + _shutdownFlag + " , return");
-      return null;
-    }
-    return _webserviceUrl;
-  }
-
-  /**
-   * Add an ZNRecordUpdate to the change queue.
-   * Called by the webservice front-end.
-   */
-  void enqueueData(ZNRecordUpdate e) {
-    if (!_initialized || _shutdownFlag) {
-      LOG.error("zkDataTransferServer inited:" + _initialized + " shutdownFlag:" + _shutdownFlag
-          + " , return");
-      return;
-    }
-    // Do local merge if receive multiple update on the same path
-    synchronized (_dataBufferRef) {
-      e.getRecord().setSimpleField(SERVER, _webserviceUrl);
-      if (_dataBufferRef.get().containsKey(e.getPath())) {
-        ZNRecord oldVal = _dataBufferRef.get().get(e.getPath()).getRecord();
-        oldVal = e.getZNRecordUpdater().update(oldVal);
-        _dataBufferRef.get().get(e.getPath())._record = oldVal;
-      } else {
-        _dataBufferRef.get().put(e.getPath(), e);
-      }
-    }
-    if (_dataBufferRef.get().size() > MAX_UPDATE_LIMIT) {
-      sendData();
-    }
-  }
-
-  void startServer() {
-    LOG.info("zkDataTransferServer starting on Port " + _localWebservicePort + " zkAddress "
-        + _zkAddress);
-
-    _component = new Component();
-
-    _component.getServers().add(Protocol.HTTP, _localWebservicePort);
-    Context applicationContext = _component.getContext().createChildContext();
-    applicationContext.getAttributes().put(SERVER, this);
-    applicationContext.getAttributes().put(PORT, "" + _localWebservicePort);
-    ZkPropertyTransferApplication application =
-        new ZkPropertyTransferApplication(applicationContext);
-    // Attach the application to the component and start it
-    _component.getDefaultHost().attach(application);
-    _timer = new Timer(true);
-    _timer.schedule(new ZKPropertyTransferTask(), PERIOD, PERIOD);
-
-    try {
-      _webserviceUrl =
-          "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":"
-              + _localWebservicePort + "/" + RESTRESOURCENAME;
-      _component.start();
-      _initialized = true;
-    } catch (Exception e) {
-      LOG.error("", e);
-    }
-    LOG.info("zkDataTransferServer started on Port " + _localWebservicePort + " zkAddress "
-        + _zkAddress);
-  }
-
-  public void shutdown() {
-    if (_shutdownFlag) {
-      LOG.error("ZKPropertyTransferServer already has been shutdown...");
-      return;
-    }
-    LOG.info("zkDataTransferServer shuting down on Port " + _localWebservicePort + " zkAddress "
-        + _zkAddress);
-    if (_timer != null) {
-      _timer.cancel();
-    }
-    if (_component != null) {
-      try {
-        _component.stop();
-      } catch (Exception e) {
-        LOG.error("", e);
-      }
-    }
-    _shutdownFlag = true;
-  }
-
-  public void reset() {
-    if (_shutdownFlag == true) {
-      _shutdownFlag = false;
-      _initialized = false;
-      _component = null;
-      _timer = null;
-      _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
deleted file mode 100644
index deef748..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZNRecordUpdater;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- * Unit of transfered ZNRecord updates. Contains the ZNRecord Value, zkPath
- * to store the update value, and the property type (used to merge the ZNRecord)
- * For ZNRecord subtraction, it is currently not supported yet.
- */
-public class ZNRecordUpdate {
-  public enum OpCode {
-    // TODO: create is not supported; but update will create if not exist
-    CREATE,
-    UPDATE,
-    SET
-  }
-
-  final String _path;
-  ZNRecord _record;
-  final OpCode _code;
-
-  @JsonCreator
-  public ZNRecordUpdate(@JsonProperty("path") String path, @JsonProperty("opcode") OpCode code,
-      @JsonProperty("record") ZNRecord record) {
-    _path = path;
-    _record = record;
-    _code = code;
-  }
-
-  public String getPath() {
-    return _path;
-  }
-
-  public ZNRecord getRecord() {
-    return _record;
-  }
-
-  public OpCode getOpcode() {
-    return _code;
-  }
-
-  @JsonIgnore(true)
-  public DataUpdater<ZNRecord> getZNRecordUpdater() {
-    if (_code == OpCode.SET)
-
-    {
-      return new ZNRecordUpdater(_record) {
-        @Override
-        public ZNRecord update(ZNRecord current) {
-          return _record;
-        }
-      };
-    } else if ((_code == OpCode.UPDATE)) {
-      return new ZNRecordUpdater(_record);
-    } else {
-      throw new UnsupportedOperationException("Not supported : " + _code);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
deleted file mode 100644
index 3877686..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.StringReader;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-import org.restlet.data.Form;
-import org.restlet.data.MediaType;
-import org.restlet.data.Status;
-import org.restlet.representation.Representation;
-import org.restlet.representation.Variant;
-import org.restlet.resource.ServerResource;
-
-/**
- * REST resource for ZkPropertyTransfer server to receive PUT requests
- * that submits ZNRecordUpdates
- */
-public class ZNRecordUpdateResource extends ServerResource {
-  public static final String UPDATEKEY = "ZNRecordUpdate";
-  private static Logger LOG = Logger.getLogger(ZNRecordUpdateResource.class);
-
-  public ZNRecordUpdateResource() {
-    getVariants().add(new Variant(MediaType.TEXT_PLAIN));
-    getVariants().add(new Variant(MediaType.APPLICATION_JSON));
-    setNegotiated(false);
-  }
-
-  @Override
-  public Representation put(Representation entity) {
-    try {
-      ZKPropertyTransferServer server = ZKPropertyTransferServer.getInstance();
-
-      Form form = new Form(entity);
-      String jsonPayload = form.getFirstValue(UPDATEKEY, true);
-
-      // Parse the map from zkPath --> ZNRecordUpdate from the payload
-      StringReader sr = new StringReader(jsonPayload);
-      ObjectMapper mapper = new ObjectMapper();
-      TypeReference<TreeMap<String, ZNRecordUpdate>> typeRef =
-          new TypeReference<TreeMap<String, ZNRecordUpdate>>() {
-          };
-      Map<String, ZNRecordUpdate> holderMap = mapper.readValue(sr, typeRef);
-      // Enqueue the ZNRecordUpdate for sending
-      for (ZNRecordUpdate holder : holderMap.values()) {
-        server.enqueueData(holder);
-        LOG.info("Received " + holder.getPath() + " from "
-            + getRequest().getClientInfo().getAddress());
-      }
-      getResponse().setStatus(Status.SUCCESS_OK);
-    } catch (Exception e) {
-      LOG.error("", e);
-      getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
-    }
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
deleted file mode 100644
index 68d35cb..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.restlet.Application;
-import org.restlet.Context;
-import org.restlet.Restlet;
-import org.restlet.routing.Router;
-
-/**
- * Restlet application for ZkPropertyTransfer server
- */
-public class ZkPropertyTransferApplication extends Application {
-  public ZkPropertyTransferApplication() {
-    super();
-  }
-
-  public ZkPropertyTransferApplication(Context context) {
-    super(context);
-  }
-
-  @Override
-  public Restlet createInboundRoot() {
-    Router router = new Router(getContext());
-    router.attach("/" + ZKPropertyTransferServer.RESTRESOURCENAME, ZNRecordUpdateResource.class);
-    return router;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
deleted file mode 100644
index 092d845..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
+++ /dev/null
@@ -1,177 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-
-import org.apache.helix.ZNRecord;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.restlet.Client;
-import org.restlet.Request;
-import org.restlet.Response;
-import org.restlet.data.MediaType;
-import org.restlet.data.Method;
-import org.restlet.data.Protocol;
-import org.restlet.data.Reference;
-import org.restlet.data.Status;
-
-public class ZkPropertyTransferClient {
-  private static Logger LOG = Logger.getLogger(ZkPropertyTransferClient.class);
-  public static final int DEFAULT_MAX_CONCURRENTTASKS = 2;
-  public static int SEND_PERIOD = 10 * 1000;
-
-  public static final String USE_PROPERTYTRANSFER = "UsePropertyTransfer";
-
-  int _maxConcurrentTasks;
-  ExecutorService _executorService;
-  Client[] _clients;
-  AtomicInteger _requestCount = new AtomicInteger(0);
-
-  // ZNRecord update buffer: key is the zkPath, value is the ZNRecordUpdate
-  AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef =
-      new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
-  Timer _timer;
-  volatile String _webServiceUrl = "";
-
-  static {
-    org.restlet.engine.Engine.setLogLevel(Level.SEVERE);
-  }
-
-  public ZkPropertyTransferClient(int maxConcurrentTasks) {
-    _maxConcurrentTasks = maxConcurrentTasks;
-    _executorService = Executors.newFixedThreadPool(_maxConcurrentTasks);
-    _clients = new Client[_maxConcurrentTasks];
-    for (int i = 0; i < _clients.length; i++) {
-      _clients[i] = new Client(Protocol.HTTP);
-    }
-    _timer = new Timer(true);
-    _timer.schedule(new SendZNRecordTimerTask(), SEND_PERIOD, SEND_PERIOD);
-    _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-  }
-
-  class SendZNRecordTimerTask extends TimerTask {
-    @Override
-    public void run() {
-      sendUpdateBatch();
-    }
-  }
-
-  public void enqueueZNRecordUpdate(ZNRecordUpdate update, String webserviceUrl) {
-    try {
-      LOG.info("Enqueue update to " + update.getPath() + " opcode: " + update.getOpcode() + " to "
-          + webserviceUrl);
-      _webServiceUrl = webserviceUrl;
-      update.getRecord().setSimpleField(USE_PROPERTYTRANSFER, "true");
-      synchronized (_dataBufferRef) {
-        if (_dataBufferRef.get().containsKey(update._path)) {
-          ZNRecord oldVal = _dataBufferRef.get().get(update.getPath()).getRecord();
-          oldVal = update.getZNRecordUpdater().update(oldVal);
-          _dataBufferRef.get().get(update.getPath())._record = oldVal;
-        } else {
-          _dataBufferRef.get().put(update.getPath(), update);
-        }
-      }
-    } catch (Exception e) {
-      LOG.error("", e);
-    }
-  }
-
-  void sendUpdateBatch() {
-    LOG.debug("Actual sending update with " + _dataBufferRef.get().size() + " updates to "
-        + _webServiceUrl);
-    Map<String, ZNRecordUpdate> updateCache = null;
-
-    synchronized (_dataBufferRef) {
-      updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-    }
-
-    if (updateCache != null && updateCache.size() > 0) {
-      ZNRecordUpdateUploadTask task =
-          new ZNRecordUpdateUploadTask(updateCache, _webServiceUrl,
-              _clients[_requestCount.intValue() % _maxConcurrentTasks]);
-      _requestCount.incrementAndGet();
-      _executorService.submit(task);
-      LOG.trace("Queue size :" + ((ThreadPoolExecutor) _executorService).getQueue().size());
-    }
-  }
-
-  public void shutdown() {
-    LOG.info("Shutting down ZkPropertyTransferClient");
-    _executorService.shutdown();
-    _timer.cancel();
-    for (Client client : _clients) {
-      try {
-        client.stop();
-      } catch (Exception e) {
-        LOG.error("", e);
-      }
-    }
-  }
-
-  class ZNRecordUpdateUploadTask implements Callable<Void> {
-    Map<String, ZNRecordUpdate> _updateMap;
-    String _webServiceUrl;
-    Client _client;
-
-    ZNRecordUpdateUploadTask(Map<String, ZNRecordUpdate> update, String webserviceUrl, Client client) {
-      _updateMap = update;
-      _webServiceUrl = webserviceUrl;
-      _client = client;
-    }
-
-    @Override
-    public Void call() throws Exception {
-      LOG.debug("Actual sending update with " + _updateMap.size() + " updates to " + _webServiceUrl);
-      long time = System.currentTimeMillis();
-      Reference resourceRef = new Reference(_webServiceUrl);
-      Request request = new Request(Method.PUT, resourceRef);
-
-      ObjectMapper mapper = new ObjectMapper();
-      StringWriter sw = new StringWriter();
-      try {
-        mapper.writeValue(sw, _updateMap);
-      } catch (Exception e) {
-        LOG.error("", e);
-      }
-
-      request.setEntity(ZNRecordUpdateResource.UPDATEKEY + "=" + sw, MediaType.APPLICATION_ALL);
-      // This is a sync call. See com.noelios.restlet.http.StreamClientCall.sendRequest()
-      Response response = _client.handle(request);
-
-      if (response.getStatus().getCode() != Status.SUCCESS_OK.getCode()) {
-        LOG.error("Status : " + response.getStatus());
-      }
-      LOG.info("Using time : " + (System.currentTimeMillis() - time));
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
deleted file mode 100644
index 0ef7a79..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Restlet server for Helix controller
- * 
- */
-package org.apache.helix.controller.restlet;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
index 42c4f11..925c52f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
@@ -269,7 +269,6 @@ public class ParticipantManagerHelper {
     _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
         _stateMachineEngine);
     _manager.addMessageListener(_messagingService.getExecutor(), _instanceName);
-    _manager.addControllerListener(_dataAccessor);
 
     ScheduledTaskStateModelFactory stStateModelFactory =
         new ScheduledTaskStateModelFactory(_messagingService.getExecutor());

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 7527751..e411a72 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -18,6 +18,7 @@ package org.apache.helix.manager.zk;
  * specific language governing permissions and limitations
  * under the License.
  */
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,13 +29,11 @@ import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.GroupCommit;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
-import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyType;
@@ -42,20 +41,15 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordAssembler;
 import org.apache.helix.ZNRecordBucketizer;
 import org.apache.helix.ZNRecordUpdater;
-import org.apache.helix.controller.restlet.ZNRecordUpdate;
-import org.apache.helix.controller.restlet.ZNRecordUpdate.OpCode;
-import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
-import org.apache.helix.model.LiveInstance;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.data.Stat;
 
-public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeListener {
+public class ZKHelixDataAccessor implements HelixDataAccessor {
   private static Logger LOG = Logger.getLogger(ZKHelixDataAccessor.class);
   private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
   final InstanceType _instanceType;
   private final String _clusterName;
   private final Builder _propertyKeyBuilder;
-  ZkPropertyTransferClient _zkPropertyTransferClient = null;
   private final GroupCommit _groupCommit = new GroupCommit();
   String _zkPropertyTransferSvcUrl = null;
 
@@ -100,14 +94,6 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
     String path = key.getPath();
     int options = constructOptions(type);
 
-    if (type.usePropertyTransferServer()) {
-      if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null) {
-        ZNRecordUpdate update = new ZNRecordUpdate(path, OpCode.SET, value.getRecord());
-        _zkPropertyTransferClient.enqueueZNRecordUpdate(update, _zkPropertyTransferSvcUrl);
-        return true;
-      }
-    }
-
     boolean success = false;
     switch (type) {
     case RESOURCEASSIGNMENTS:
@@ -155,20 +141,12 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
     case CURRENTSTATES:
       success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord());
       break;
-    default:
-      if (type.usePropertyTransferServer()) {
-        if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null) {
-          ZNRecordUpdate update = new ZNRecordUpdate(path, OpCode.UPDATE, value.getRecord());
-          _zkPropertyTransferClient.enqueueZNRecordUpdate(update, _zkPropertyTransferSvcUrl);
-
-          return true;
-        } else {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("getPropertyTransferUrl is null, skip updating the value");
-          }
-          return true;
-        }
+    case STATUSUPDATES:
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Update status. path: " + key.getPath() + ", record: " + value.getRecord());
       }
+      break;
+    default:
       success = _baseDataAccessor.update(path, new ZNRecordUpdater(value.getRecord()), options);
       break;
     }
@@ -487,39 +465,4 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
       List<DataUpdater<ZNRecord>> updaters, int options) {
     return _baseDataAccessor.updateChildren(paths, updaters, options);
   }
-
-  public void shutdown() {
-    if (_zkPropertyTransferClient != null) {
-      _zkPropertyTransferClient.shutdown();
-    }
-  }
-
-  @Override
-  public void onControllerChange(NotificationContext changeContext) {
-    LOG.info("Controller has changed");
-    refreshZkPropertyTransferUrl();
-    if (_zkPropertyTransferClient == null) {
-      if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferSvcUrl.length() > 0) {
-        LOG.info("Creating ZkPropertyTransferClient as we get url " + _zkPropertyTransferSvcUrl);
-        _zkPropertyTransferClient =
-            new ZkPropertyTransferClient(ZkPropertyTransferClient.DEFAULT_MAX_CONCURRENTTASKS);
-      }
-    }
-  }
-
-  void refreshZkPropertyTransferUrl() {
-    try {
-      LiveInstance leader = getProperty(keyBuilder().controllerLeader());
-      if (leader != null) {
-        _zkPropertyTransferSvcUrl = leader.getWebserviceUrl();
-        LOG.info("_zkPropertyTransferSvcUrl : " + _zkPropertyTransferSvcUrl + " Controller "
-            + leader.getInstanceName());
-      } else {
-        _zkPropertyTransferSvcUrl = null;
-      }
-    } catch (Exception e) {
-      // LOG.error("", e);
-      _zkPropertyTransferSvcUrl = null;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index b71304a..d446430 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -548,8 +548,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       // TODO reset user defined handlers only
       resetHandlers();
 
-      _dataAccessor.shutdown();
-
       if (_leaderElectionHandler != null) {
         _leaderElectionHandler.reset();
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
index bd37d42..2d7cb3c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
@@ -28,6 +28,7 @@ import java.util.TreeMap;
 
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
 import org.apache.log4j.Logger;
@@ -35,7 +36,6 @@ import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.JsonToken;
-import org.restlet.engine.util.Base64;
 
 import com.google.common.collect.Maps;
 
@@ -141,7 +141,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
       if (rawPayload != null && rawPayload.length > 0) {
         // write rawPayload
         g.writeRaw("\n  ");
-        g.writeStringField("rawPayload", Base64.encode(rawPayload, false));
+        g.writeStringField("rawPayload", new String(Base64.encodeBase64(rawPayload), "UTF-8"));
       }
 
       g.writeRaw("\n");
@@ -226,7 +226,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
           }
 
         } else if ("rawPayload".equals(fieldname)) {
-          rawPayload = Base64.decode(jp.getText());
+          rawPayload = Base64.decodeBase64(jp.getText());
         } else {
           throw new IllegalStateException("Unrecognized field '" + fieldname + "'!");
         }

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
index 88ca1b0..df8b5c9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
@@ -31,7 +31,6 @@ import org.apache.helix.PropertyType;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ControllerId;
 import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
 import org.apache.helix.model.LeaderHistory;
 import org.apache.helix.model.LiveInstance;
 import org.apache.log4j.Logger;
@@ -134,15 +133,6 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
       leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
       leader.setSessionId(manager.getSessionId());
       leader.setHelixVersion(manager.getVersion());
-      if (ZKPropertyTransferServer.getInstance() != null) {
-        String zkPropertyTransferServiceUrl =
-            ZKPropertyTransferServer.getInstance().getWebserviceUrl();
-        if (zkPropertyTransferServiceUrl != null) {
-          leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
-        }
-      } else {
-        LOG.warn("ZKPropertyTransferServer instnace is null");
-      }
       boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
       if (success) {
         return true;

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
index 39ded2c..0c6e772 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
@@ -353,7 +353,6 @@ public class ZkHelixParticipant implements HelixParticipant {
      */
     _connection.addMessageListener(this, _messagingService.getExecutor(), _clusterId,
         _participantId);
-    _connection.addControllerListener(this, _accessor, _clusterId);
 
     ScheduledTaskStateModelFactory stStateModelFactory =
         new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
@@ -432,7 +431,6 @@ public class ZkHelixParticipant implements HelixParticipant {
      * transition?
      */
     _messagingService.getExecutor().shutdown();
-    _accessor.shutdown();
 
     /**
      * remove live instance ephemeral znode

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
deleted file mode 100644
index 37f8205..0000000
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
+++ /dev/null
@@ -1,224 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
-  private static Logger LOG = Logger.getLogger(TestAlertActionTriggering.class);
-
-  String _statName = "TestStat@DB=db1";
-  String _stat = "TestStat";
-  String metricName1 = "TestMetric1";
-  String metricName2 = "TestMetric2";
-
-  void setHealthData(int[] val1, int[] val2) {
-    for (int i = 0; i < NODE_NR; i++) {
-      HelixManager manager = _participants[i];
-      ZNRecord record = new ZNRecord(_stat);
-      Map<String, String> valMap = new HashMap<String, String>();
-      valMap.put(metricName1, val1[i] + "");
-      valMap.put(metricName2, val2[i] + "");
-      record.setSimpleField("TimeStamp", new Date().getTime() + "");
-      record.setMapField(_statName, valMap);
-      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-      Builder keyBuilder = helixDataAccessor.keyBuilder();
-      helixDataAccessor.setProperty(
-          keyBuilder.healthReport(manager.getInstanceName(), record.getId()),
-          new HealthStat(record));
-    }
-    try {
-      Thread.sleep(1000);
-    } catch (InterruptedException e) {
-      LOG.error("sleep interrupted", e);
-    }
-  }
-
-  void setHealthData2(int[] val1) {
-    for (int i = 0; i < NODE_NR; i++) {
-      HelixManager manager = _participants[i];
-      ZNRecord record = new ZNRecord(_stat);
-      Map<String, String> valMap = new HashMap<String, String>();
-      valMap.put(metricName2, val1[i] + "");
-      record.setSimpleField("TimeStamp", new Date().getTime() + "");
-      record.setMapField("TestStat@DB=TestDB;Partition=TestDB_3", valMap);
-      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-      Builder keyBuilder = helixDataAccessor.keyBuilder();
-      helixDataAccessor.setProperty(
-          keyBuilder.healthReport(manager.getInstanceName(), record.getId()),
-          new HealthStat(record));
-    }
-    try {
-      Thread.sleep(1000);
-    } catch (InterruptedException e) {
-      LOG.error("sleep interrupted", e);
-    }
-  }
-
-  @Test
-  public void testAlertActionDisableNode() throws InterruptedException {
-    // ConfigScope scope = new ConfigScopeBuilder().forCluster(CLUSTER_NAME).build();
-    HelixConfigScope scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(CLUSTER_NAME).build();
-    Map<String, String> properties = new HashMap<String, String>();
-    properties.put("healthChange.enabled", "true");
-    _setupTool.getClusterManagementTool().setConfig(scope, properties);
-
-    String alertStr1 =
-        "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric1))CMP(GREATER)CON(20)ACTION(DISABLE_INSTANCE)";
-    String alertStr2 =
-        "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric2))CMP(GREATER)CON(120)ACTION(DISABLE_INSTANCE)";
-    String alertStr3 =
-        "EXP(decay(1.0)(localhost_*.TestStat@DB=TestDB;Partition=*.TestMetric2))CMP(GREATER)CON(160)ACTION(DISABLE_PARTITION)";
-
-    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr1);
-    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr2);
-    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr3);
-
-    int[] metrics1 = {
-        10, 15, 22, 12, 16
-    };
-    int[] metrics2 = {
-        22, 115, 22, 163, 16
-    };
-    int[] metrics3 = {
-        0, 0, 0, 0, 0
-    };
-    setHealthData(metrics1, metrics2);
-
-    HelixManager manager = _controller;
-
-    HealthStatsAggregator task = new HealthStatsAggregator(manager);
-    task.aggregate();
-    Thread.sleep(4000);
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            CLUSTER_NAME));
-    Assert.assertTrue(result);
-
-    Builder kb = manager.getHelixDataAccessor().keyBuilder();
-    ExternalView externalView =
-        manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
-    // Test the DISABLE_INSTANCE alerts
-    String participant1 = "localhost_" + (START_PORT + 3);
-    String participant2 = "localhost_" + (START_PORT + 2);
-    ConfigAccessor configAccessor = manager.getConfigAccessor();
-    // scope = new
-    // ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant1).build();
-    scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
-            .forCluster(manager.getClusterName()).forParticipant(participant1).build();
-    String isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
-    Assert.assertFalse(Boolean.parseBoolean(isEnabled));
-
-    // scope = new
-    // ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant2).build();
-    scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
-            .forCluster(manager.getClusterName()).forParticipant(participant2).build();
-    isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
-    Assert.assertFalse(Boolean.parseBoolean(isEnabled));
-
-    for (String partitionName : externalView.getRecord().getMapFields().keySet()) {
-      for (String hostName : externalView.getRecord().getMapField(partitionName).keySet()) {
-        if (hostName.equals(participant1) || hostName.equals(participant2)) {
-          Assert.assertEquals(externalView.getRecord().getMapField(partitionName).get(hostName),
-              "OFFLINE");
-        }
-      }
-    }
-
-    // enable the disabled instances
-    setHealthData(metrics3, metrics3);
-    task.aggregate();
-    Thread.sleep(1000);
-
-    manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), participant2, true);
-    manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), participant1, true);
-
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            CLUSTER_NAME));
-    Assert.assertTrue(result);
-
-    // Test the DISABLE_PARTITION case
-    int[] metrics4 = {
-        22, 115, 22, 16, 163
-    };
-    setHealthData2(metrics4);
-    task.aggregate();
-
-    // scope = new
-    // ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant1).build();
-    scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
-            .forCluster(manager.getClusterName()).forParticipant(participant1).build();
-    isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
-    Assert.assertTrue(Boolean.parseBoolean(isEnabled));
-
-    // scope = new
-    // ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant2).build();
-    scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
-            .forCluster(manager.getClusterName()).forParticipant(participant2).build();
-    isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
-    Assert.assertTrue(Boolean.parseBoolean(isEnabled));
-
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            CLUSTER_NAME));
-    Assert.assertTrue(result);
-    String participant3 = "localhost_" + (START_PORT + 4);
-    externalView = manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
-    Assert.assertTrue(externalView.getRecord().getMapField("TestDB_3").get(participant3)
-        .equalsIgnoreCase("OFFLINE"));
-
-    InstanceConfig nodeConfig =
-        helixDataAccessor.getProperty(keyBuilder.instanceConfig(participant3));
-    Assert.assertTrue(nodeConfig.getRecord()
-        .getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString())
-        .contains("TestDB_3"));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
deleted file mode 100644
index 798ce92..0000000
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
+++ /dev/null
@@ -1,422 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
-import org.apache.helix.model.AlertHistory;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-/**
- * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
- * start 5 dummy participants verify the current states at end
- */
-
-public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
-  private final static Logger LOG = Logger.getLogger(TestAlertFireHistory.class);
-
-  String _statName = "TestStat@DB=db1";
-  String _stat = "TestStat";
-  String metricName1 = "TestMetric1";
-  String metricName2 = "TestMetric2";
-
-  String _alertStr1 = "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric1))CMP(GREATER)CON(20)";
-  String _alertStr2 =
-      "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric2))CMP(GREATER)CON(100)";
-
-  void setHealthData(int[] val1, int[] val2) {
-    for (int i = 0; i < NODE_NR; i++) {
-      HelixManager manager = _participants[i];
-      ZNRecord record = new ZNRecord(_stat);
-      Map<String, String> valMap = new HashMap<String, String>();
-      valMap.put(metricName1, val1[i] + "");
-      valMap.put(metricName2, val2[i] + "");
-      record.setSimpleField("TimeStamp", new Date().getTime() + "");
-      record.setMapField(_statName, valMap);
-      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-      Builder keyBuilder = helixDataAccessor.keyBuilder();
-      helixDataAccessor.setProperty(
-          keyBuilder.healthReport(manager.getInstanceName(), record.getId()),
-          new HealthStat(record));
-    }
-    try {
-      Thread.sleep(1000);
-    } catch (InterruptedException e) {
-      LOG.error("Interrupted sleep", e);
-    }
-  }
-
-  @Test
-  public void testAlertDisable() throws InterruptedException {
-
-    int[] metrics1 = {
-        10, 15, 22, 24, 16
-    };
-    int[] metrics2 = {
-        22, 115, 22, 141, 16
-    };
-    setHealthData(metrics1, metrics2);
-
-    HelixManager manager = _controller;
-    manager.startTimerTasks();
-
-    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
-    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
-
-    HelixConfigScope scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(CLUSTER_NAME).build();
-    Map<String, String> properties = new HashMap<String, String>();
-    properties.put("healthChange.enabled", "false");
-    _setupTool.getClusterManagementTool().setConfig(scope, properties);
-
-    HealthStatsAggregator task = new HealthStatsAggregator(_controller);
-
-    task.aggregate();
-    Thread.sleep(100);
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-
-    AlertHistory history = manager.getHelixDataAccessor().getProperty(keyBuilder.alertHistory());
-
-    Assert.assertEquals(history, null);
-
-    properties.put("healthChange.enabled", "true");
-    _setupTool.getClusterManagementTool().setConfig(scope, properties);
-
-    task.aggregate();
-    Thread.sleep(100);
-
-    history = manager.getHelixDataAccessor().getProperty(keyBuilder.alertHistory());
-    //
-    Assert.assertNotNull(history);
-    Assert.assertEquals(history.getRecord().getMapFields().size(), 1);
-  }
-
-  @Test
-  public void testAlertHistory() throws InterruptedException {
-    int[] metrics1 = {
-        10, 15, 22, 24, 16
-    };
-    int[] metrics2 = {
-        22, 115, 22, 141, 16
-    };
-    setHealthData(metrics1, metrics2);
-
-    HelixManager manager = _controller;
-    for (HelixTimerTask task : _controller.getControllerTimerTasks()) {
-      task.stop();
-    }
-
-    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
-    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
-
-    int historySize = 0;
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    HelixProperty property = helixDataAccessor.getProperty(keyBuilder.alertHistory());
-    ZNRecord history = null;
-    if (property != null) {
-      history = property.getRecord();
-      historySize = property.getRecord().getMapFields().size();
-    }
-
-    HealthStatsAggregator task = new HealthStatsAggregator(_controller);
-
-    task.aggregate();
-    Thread.sleep(100);
-
-    history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
-    //
-    Assert.assertEquals(history.getMapFields().size(), 1 + historySize);
-    TreeMap<String, Map<String, String>> recordMap = new TreeMap<String, Map<String, String>>();
-    recordMap.putAll(history.getMapFields());
-    Map<String, String> lastRecord = recordMap.firstEntry().getValue();
-    Assert.assertTrue(lastRecord.size() == 4);
-    Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("ON"));
-
-    setHealthData(metrics1, metrics2);
-    task.aggregate();
-    Thread.sleep(100);
-    history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
-    // no change
-    Assert.assertEquals(history.getMapFields().size(), 1 + historySize);
-    recordMap = new TreeMap<String, Map<String, String>>();
-    recordMap.putAll(history.getMapFields());
-    lastRecord = recordMap.firstEntry().getValue();
-    Assert.assertTrue(lastRecord.size() == 4);
-    Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("ON"));
-
-    int[] metrics3 = {
-        21, 44, 22, 14, 16
-    };
-    int[] metrics4 = {
-        122, 115, 222, 41, 16
-    };
-    setHealthData(metrics3, metrics4);
-    task.aggregate();
-    Thread.sleep(100);
-    history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
-    // new delta should be recorded
-    Assert.assertEquals(history.getMapFields().size(), 2 + historySize);
-    recordMap = new TreeMap<String, Map<String, String>>();
-    recordMap.putAll(history.getMapFields());
-    lastRecord = recordMap.lastEntry().getValue();
-    Assert.assertEquals(lastRecord.size(), 6);
-    Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("OFF"));
-    Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("OFF"));
-    Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("ON"));
-
-    int[] metrics5 = {
-        0, 0, 0, 0, 0
-    };
-    int[] metrics6 = {
-        0, 0, 0, 0, 0
-    };
-    setHealthData(metrics5, metrics6);
-    task.aggregate();
-
-    for (int i = 0; i < 10; i++) {
-      Thread.sleep(500);
-      history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
-      recordMap = new TreeMap<String, Map<String, String>>();
-      recordMap.putAll(history.getMapFields());
-      lastRecord = recordMap.lastEntry().getValue();
-
-      if (history.getMapFields().size() == 3 + historySize && lastRecord.size() == 6) {
-        break;
-      }
-    }
-
-    // reset everything
-    Assert.assertEquals(history.getMapFields().size(), 3 + historySize,
-        "expect history-map-field size is " + (3 + historySize) + ", but was " + history);
-    Assert
-        .assertTrue(lastRecord.size() == 6, "expect last-record size is 6, but was " + lastRecord);
-
-    Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("OFF"));
-    Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("OFF"));
-    Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("OFF"));
-    Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("OFF"));
-    Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("OFF"));
-    Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("OFF"));
-
-    // Size of the history should be 30
-    for (int i = 0; i < 27; i++) {
-      int x = i % 2;
-      int y = (i + 1) % 2;
-      int[] metricsx = {
-          19 + 3 * x, 19 + 3 * y, 19 + 4 * x, 18 + 4 * y, 17 + 5 * y
-      };
-      int[] metricsy = {
-          99 + 3 * x, 99 + 3 * y, 98 + 4 * x, 98 + 4 * y, 97 + 5 * y
-      };
-
-      setHealthData(metricsx, metricsy);
-      task.aggregate();
-      Thread.sleep(100);
-      history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
-
-      Assert.assertEquals(history.getMapFields().size(), Math.min(3 + i + 1 + historySize, 30));
-      recordMap = new TreeMap<String, Map<String, String>>();
-      recordMap.putAll(history.getMapFields());
-      lastRecord = recordMap.lastEntry().getValue();
-      if (i == 0) {
-        Assert.assertTrue(lastRecord.size() == 6);
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-      } else {
-        System.out.println(lastRecord.size());
-        Assert.assertEquals(lastRecord.size(), 10);
-        if (x == 0) {
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-        } else {
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-        }
-      }
-    }
-    // size limit is 30
-    for (int i = 0; i < 10; i++) {
-      int x = i % 2;
-      int y = (i + 1) % 2;
-      int[] metricsx = {
-          19 + 3 * x, 19 + 3 * y, 19 + 4 * x, 18 + 4 * y, 17 + 5 * y
-      };
-      int[] metricsy = {
-          99 + 3 * x, 99 + 3 * y, 98 + 4 * x, 98 + 4 * y, 97 + 5 * y
-      };
-
-      setHealthData(metricsx, metricsy);
-      task.aggregate();
-      for (int j = 0; j < 10; j++) {
-        Thread.sleep(100);
-        history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
-        recordMap = new TreeMap<String, Map<String, String>>();
-        recordMap.putAll(history.getMapFields());
-        lastRecord = recordMap.lastEntry().getValue();
-
-        if (history.getMapFields().size() == 30 && lastRecord.size() == 10)
-          break;
-      }
-      Assert.assertEquals(history.getMapFields().size(), 30,
-          "expect history.map-field size is 30, but was " + history);
-      Assert.assertEquals(lastRecord.size(), 10, "expect last-record size is 10, but was "
-          + lastRecord);
-
-      if (x == 0) {
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-      } else {
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-      }
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index cc43ecb..330b906 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -30,8 +30,6 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -46,13 +44,11 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
-import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
-  private static final Logger LOG = Logger.getLogger(TestAutoRebalance.class.getName());
+public class TestAutoRebalance extends ZkStandAloneCMTestBase {
   String db2 = TEST_DB + "2";
   String _tag = "SSDSSD";
 

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
index 5f9f48c..59e1b27 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
@@ -48,7 +48,7 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBase {
   private static final Logger LOG = Logger.getLogger(TestAutoRebalancePartitionLimit.class
       .getName());
 

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 7bf4f94..33b95c1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -57,7 +57,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestCustomizedIdealStateRebalancer extends
-    ZkStandAloneCMTestBaseWithPropertyServerCheck {
+    ZkStandAloneCMTestBase {
   String db2 = TEST_DB + "2";
   static boolean testRebalancerCreated = false;
   static boolean testRebalancerInvoked = false;

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
index be523d0..ca54fb3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
@@ -25,7 +25,7 @@ import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestDisableNode extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestDisableNode extends ZkStandAloneCMTestBase {
 
   @Test()
   public void testDisableNode() throws Exception {

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
index ba7e8e4..c81f6e0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
@@ -33,7 +33,7 @@ import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestDisablePartition extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestDisablePartition extends ZkStandAloneCMTestBase {
   private static Logger LOG = Logger.getLogger(TestDisablePartition.class);
 
   @Test()

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
index 0d02d12..047ea75 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
@@ -26,7 +26,7 @@ import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestDropResource extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestDropResource extends ZkStandAloneCMTestBase {
   @Test()
   public void testDropResource() throws Exception {
     // add a resource to be dropped

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
index 86f1ce4..7df9e8b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
@@ -38,7 +38,7 @@ import org.apache.helix.model.Message.MessageType;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
-public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestMessagingService extends ZkStandAloneCMTestBase {
   public static class TestMessagingHandlerFactory implements MessageHandlerFactory {
     public static HashSet<String> _processedMsgIds = new HashSet<String>();
 
@@ -168,8 +168,6 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
               .get("ReplyMessage"));
       if (message.getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString())
           .get("ReplyMessage") == null) {
-        int x = 0;
-        x++;
       }
       _replyedMessageContents.add(message.getRecord()
           .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage"));

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index d78bd9d..e6117d6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -64,7 +64,7 @@ import org.codehaus.jackson.map.SerializationConfig;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
 
   class MockAsyncCallback extends AsyncCallback {
     Message _message;
@@ -386,7 +386,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
             PropertyType.STATUSUPDATES);
 
     subPaths = _gZkClient.getChildren(instanceStatusPath);
-    Assert.assertTrue(subPaths.size() > 0);
+    Assert.assertTrue(subPaths.size() == 0);
     for (String subPath : subPaths) {
       String nextPath = instanceStatusPath + "/" + subPath;
       List<String> subsubPaths = _gZkClient.getChildren(nextPath);
@@ -409,7 +409,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     }
 
     subPaths = _gZkClient.getChildren(instanceStatusPath);
-    Assert.assertTrue(subPaths.size() > 0);
+    Assert.assertTrue(subPaths.size() == 0);
     for (String subPath : subPaths) {
       String nextPath = instanceStatusPath + "/" + subPath;
       List<String> subsubPaths = _gZkClient.getChildren(nextPath);

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java b/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
index 2a026c2..e7cef4f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
@@ -37,8 +37,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @SuppressWarnings("deprecation")
-public class TestUserDefRebalancerCompatibility extends
-    ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestUserDefRebalancerCompatibility extends ZkStandAloneCMTestBase {
   String db2 = TEST_DB + "2";
   static boolean testRebalancerCreated = false;
   static boolean testRebalancerInvoked = false;