You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:40 UTC

[10/47] Refactoring from com.linkedin.helix to org.apache.helix

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZKPropertyTransferServer.java b/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZKPropertyTransferServer.java
deleted file mode 100644
index a7555ec..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZKPropertyTransferServer.java
+++ /dev/null
@@ -1,262 +0,0 @@
-package com.linkedin.helix.controller.restlet;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.log4j.Logger;
-import org.restlet.Component;
-import org.restlet.Context;
-import org.restlet.data.Protocol;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkClient;
-/**
- * Controller side restlet server that receives ZNRecordUpdate requests from
- * clients, and batch the ZNRecordUpdate and apply them to zookeeper. This is 
- * to optimize the concurrency level of zookeeper access for ZNRecord updates 
- * that does not require real-time, like message handling status updates and 
- * healthcheck reports. 
- * 
- * As one server will be used by multiple helix controllers that runs on the same machine,
- * This class is designed as a singleton. Application is responsible to call init() 
- * and shutdown() on the getInstance().
- * */
-public class ZKPropertyTransferServer
-{
-  public static final String PORT = "port";
-  public static String RESTRESOURCENAME = "ZNRecordUpdates";
-  public static final String SERVER = "ZKPropertyTransferServer";
-  
-  // Frequency period for the ZNRecords are batch written to zookeeper 
-  public static int PERIOD = 10 * 1000;
-  // If the buffered ZNRecord updates exceed the limit, do a zookeeper batch update.
-  public static int MAX_UPDATE_LIMIT = 10000;
-  private static Logger LOG = Logger.getLogger(ZKPropertyTransferServer.class);
-  
-  int _localWebservicePort;
-  String _webserviceUrl;
-  ZkBaseDataAccessor<ZNRecord> _accessor;
-  String _zkAddress;
-  
-  AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef
-    = new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
-  
-  boolean _initialized = false;
-  boolean _shutdownFlag = false;
-  Component _component = null;
-  Timer _timer = null;
-  
-  /**
-   * Timertask for zookeeper batched writes
-   * */
-  class ZKPropertyTransferTask extends TimerTask
-  {
-    @Override
-    public void run()
-    {
-      try
-      {
-        sendData();
-      }
-      catch(Throwable t)
-      {
-        LOG.error("", t);
-      }
-    
-    }
-  }
-  
-  void sendData()
-  {
-    LOG.info("ZKPropertyTransferServer transfering data to zookeeper");
-    ConcurrentHashMap<String, ZNRecordUpdate> updateCache  = null;
-    
-    synchronized(_dataBufferRef)
-    {
-      updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-    }
-    
-    if(updateCache != null)
-    {
-      List<String> paths = new ArrayList<String>();
-      List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
-      List<ZNRecord> vals = new ArrayList<ZNRecord>();
-      // BUGBUG : what if the instance is dropped? 
-      for(ZNRecordUpdate holder : updateCache.values())
-      {
-        paths.add(holder.getPath());
-        updaters.add(holder.getZNRecordUpdater());
-        vals.add(holder.getRecord());
-      }
-      // Batch write the accumulated updates into zookeeper
-      long timeStart = System.currentTimeMillis();
-      if(paths.size() > 0)
-      {
-        _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
-      }
-      LOG.info("ZKPropertyTransferServer updated " + vals.size() + " records in " + (System.currentTimeMillis() - timeStart) + " ms");
-    }
-    else
-    {
-      LOG.warn("null _dataQueueRef. Should be in the beginning only");
-    }
-  }
-  
-  static ZKPropertyTransferServer _instance = new ZKPropertyTransferServer();
-  
-  private ZKPropertyTransferServer()
-  {
-    _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-  }
-  
-  public static ZKPropertyTransferServer getInstance()
-  {
-    return _instance;
-  }
-  
-  public boolean isInitialized()
-  {
-    return _initialized;
-  }
-  
-  public void init(int localWebservicePort, String zkAddress)
-  {
-    if(!_initialized && !_shutdownFlag)
-    {
-      LOG.error("Initializing with port " + localWebservicePort + " zkAddress: " + zkAddress);
-      _localWebservicePort = localWebservicePort;
-      ZkClient zkClient = new ZkClient(zkAddress);
-      zkClient.setZkSerializer(new ZNRecordSerializer());
-      _accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
-      _zkAddress = zkAddress;
-      startServer();
-    }
-    else
-    {
-      LOG.error("Already initialized with port " + _localWebservicePort + " shutdownFlag: " + _shutdownFlag);
-    }
-  }
-  
-  public String getWebserviceUrl()
-  {
-    if(!_initialized || _shutdownFlag)
-    {
-      LOG.debug("inited:" + _initialized + " shutdownFlag:"+_shutdownFlag+" , return");
-      return null;
-    }
-    return _webserviceUrl;
-  }
-  
-  /** Add an ZNRecordUpdate to the change queue. 
-   *  Called by the webservice front-end.
-   *
-   */
-  void enqueueData(ZNRecordUpdate e)
-  {
-    if(!_initialized || _shutdownFlag)
-    {
-      LOG.error("zkDataTransferServer inited:" + _initialized 
-          + " shutdownFlag:"+_shutdownFlag+" , return");
-      return;
-    }
-    // Do local merge if receive multiple update on the same path
-    synchronized(_dataBufferRef)
-    {
-      e.getRecord().setSimpleField(SERVER, _webserviceUrl);
-      if(_dataBufferRef.get().containsKey(e.getPath()))
-      {
-        ZNRecord oldVal = _dataBufferRef.get().get(e.getPath()).getRecord();
-        oldVal = e.getZNRecordUpdater().update(oldVal);
-        _dataBufferRef.get().get(e.getPath())._record = oldVal;
-      }
-      else
-      {
-        _dataBufferRef.get().put(e.getPath(), e);
-      }
-    }
-    if(_dataBufferRef.get().size() > MAX_UPDATE_LIMIT)
-    {
-      sendData();
-    }
-  }
-  
-  void startServer()
-  {
-    LOG.info("zkDataTransferServer starting on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
-    
-    _component = new Component();
-    
-    _component.getServers().add(Protocol.HTTP, _localWebservicePort);
-    Context applicationContext = _component.getContext().createChildContext();
-    applicationContext.getAttributes().put(SERVER, this);
-    applicationContext.getAttributes().put(PORT, "" + _localWebservicePort);
-    ZkPropertyTransferApplication application = new ZkPropertyTransferApplication(
-        applicationContext);
-    // Attach the application to the component and start it
-    _component.getDefaultHost().attach(application);
-    _timer = new Timer(true);
-    _timer.schedule(new ZKPropertyTransferTask(), PERIOD , PERIOD);
-    
-    try
-    {
-      _webserviceUrl 
-        = "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":" + _localWebservicePort 
-              + "/" + RESTRESOURCENAME;
-      _component.start();
-      _initialized = true;
-    }
-    catch (Exception e)
-    {
-      LOG.error("", e);
-    }
-    LOG.info("zkDataTransferServer started on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
-  }
-  
-  public void shutdown()
-  {
-    if(_shutdownFlag)
-    {
-      LOG.error("ZKPropertyTransferServer already has been shutdown...");
-      return;
-    }
-    LOG.info("zkDataTransferServer shuting down on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
-    if(_timer != null)
-    {
-      _timer.cancel();
-    }
-    if(_component != null)
-    {
-      try
-      {
-        _component.stop();
-      }
-      catch (Exception e)
-      {
-        LOG.error("", e);
-      }
-    }
-    _shutdownFlag = true;
-  }
-
-  public void reset()
-  {
-    if(_shutdownFlag == true)
-    {
-      _shutdownFlag = false;
-      _initialized = false;
-      _component = null;
-      _timer = null;
-      _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdate.java b/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdate.java
deleted file mode 100644
index 727ab8a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdate.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package com.linkedin.helix.controller.restlet;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZNRecordUpdater;
-/**
- * Unit of transfered ZNRecord updates. Contains the ZNRecord Value, zkPath
- * to store the update value, and the property type (used to merge the ZNRecord)
- * For ZNRecord subtraction, it is currently not supported yet. 
- * */
-public class ZNRecordUpdate
-{
-  public enum OpCode
-  {
-    // TODO: create is not supported; but update will create if not exist
-    CREATE,
-    UPDATE,
-    SET
-  }
-  final String _path;
-  ZNRecord _record;
-  final OpCode _code;
-
-  @JsonCreator
-  public ZNRecordUpdate(@JsonProperty("path")String path, 
-                        @JsonProperty("opcode")OpCode code, 
-                        @JsonProperty("record")ZNRecord record)
-  {
-    _path = path;
-    _record = record;
-    _code = code;
-  }
-  
-  public String getPath()
-  {
-    return _path;
-  }
-  
-  public ZNRecord getRecord()
-  {
-    return _record;
-  }
-  
-  public OpCode getOpcode()
-  {
-    return _code;
-  }
-
-  @JsonIgnore(true)
-  public DataUpdater<ZNRecord> getZNRecordUpdater()
-  {
-    if(_code == OpCode.SET)
-
-    {
-      return new ZNRecordUpdater(_record)
-      {
-        @Override
-        public ZNRecord update(ZNRecord current)
-        {
-          return _record;
-        }
-      };
-    }
-    else if ((_code == OpCode.UPDATE))
-    {
-      return new ZNRecordUpdater(_record);
-    }
-    else
-    {
-      throw new UnsupportedOperationException("Not supported : " + _code);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdateResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdateResource.java b/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdateResource.java
deleted file mode 100644
index ed955a7..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdateResource.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package com.linkedin.helix.controller.restlet;
-
-import java.io.StringReader;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-import org.restlet.data.Form;
-import org.restlet.data.MediaType;
-import org.restlet.data.Status;
-import org.restlet.resource.Representation;
-import org.restlet.resource.Resource;
-
-/**
- * REST resource for ZkPropertyTransfer server to receive PUT requests 
- * that submits ZNRecordUpdates
- * */
-public class ZNRecordUpdateResource  extends Resource
-{
-  public static final String UPDATEKEY = "ZNRecordUpdate";
-  private static Logger LOG = Logger.getLogger(ZNRecordUpdateResource.class);
-  @Override
-  public boolean allowGet()
-  {
-    return false;
-  }
-
-  @Override
-  public boolean allowPost()
-  {
-    return false;
-  }
-
-  @Override
-  public boolean allowPut()
-  {
-    return true;
-  }
-
-  @Override
-  public boolean allowDelete()
-  {
-    return false;
-  }
-  
-  @Override
-  public void storeRepresentation(Representation entity)
-  {
-    try
-    {
-      ZKPropertyTransferServer server = ZKPropertyTransferServer.getInstance();
-      
-      Form form = new Form(entity);
-      String jsonPayload = form.getFirstValue(UPDATEKEY, true);
-      
-      // Parse the map from zkPath --> ZNRecordUpdate from the payload
-      StringReader sr = new StringReader(jsonPayload);
-      ObjectMapper mapper = new ObjectMapper();
-      TypeReference<TreeMap<String, ZNRecordUpdate>> typeRef =
-          new TypeReference<TreeMap<String, ZNRecordUpdate>>()
-          {
-          };
-      Map<String, ZNRecordUpdate> holderMap = mapper.readValue(sr, typeRef);
-      // Enqueue the ZNRecordUpdate for sending
-      for(ZNRecordUpdate holder : holderMap.values())
-      {
-        server.enqueueData(holder);
-        LOG.info("Received " + holder.getPath() + " from " + getRequest().getClientInfo().getAddress());
-      }
-      getResponse().setStatus(Status.SUCCESS_OK);
-    }
-    catch(Exception e)
-    {
-      LOG.error("", e);
-      getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferApplication.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferApplication.java b/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferApplication.java
deleted file mode 100644
index 3ac88ed..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferApplication.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.linkedin.helix.controller.restlet;
-
-import org.restlet.Application;
-import org.restlet.Context;
-import org.restlet.Restlet;
-import org.restlet.Router;
-
-/**
- * Restlet application for ZkPropertyTransfer server
- * */
-public class ZkPropertyTransferApplication extends Application
-{
-  public ZkPropertyTransferApplication()
-  {
-    super();
-  }
-
-  public ZkPropertyTransferApplication(Context context)
-  {
-    super(context);
-  }
-  
-  @Override
-  public Restlet createRoot()
-  {
-    Router router = new Router(getContext());
-    router.attach("/" + ZKPropertyTransferServer.RESTRESOURCENAME, ZNRecordUpdateResource.class);
-    return router;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferClient.java b/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferClient.java
deleted file mode 100644
index 9cb3a25..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferClient.java
+++ /dev/null
@@ -1,178 +0,0 @@
-package com.linkedin.helix.controller.restlet;
-
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.restlet.Client;
-import org.restlet.data.MediaType;
-import org.restlet.data.Method;
-import org.restlet.data.Protocol;
-import org.restlet.data.Reference;
-import org.restlet.data.Request;
-import org.restlet.data.Response;
-import org.restlet.data.Status;
-import com.linkedin.helix.ZNRecord;
-
-public class ZkPropertyTransferClient
-{
-  private static Logger LOG = Logger.getLogger(ZkPropertyTransferClient.class);
-  public static final int DEFAULT_MAX_CONCURRENTTASKS = 2;
-  public static int SEND_PERIOD = 10 * 1000;
-  
-  public static final String USE_PROPERTYTRANSFER = "UsePropertyTransfer";
-  
-  int _maxConcurrentTasks;
-  ExecutorService _executorService;
-  Client[] _clients;
-  AtomicInteger _requestCount = new AtomicInteger(0);
-  
-  // ZNRecord update buffer: key is the zkPath, value is the ZNRecordUpdate
-  AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef
-    = new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
-  Timer _timer;
-  volatile String _webServiceUrl = "";
-  
-  public ZkPropertyTransferClient(int maxConcurrentTasks)
-  {
-    _maxConcurrentTasks = maxConcurrentTasks;
-    _executorService = Executors.newFixedThreadPool(_maxConcurrentTasks);
-    _clients = new Client[_maxConcurrentTasks];
-    for(int i = 0; i< _clients.length; i++)
-    {
-      _clients[i] = new Client(Protocol.HTTP);
-    }
-    _timer = new Timer(true);
-    _timer.schedule(new SendZNRecordTimerTask(), SEND_PERIOD, SEND_PERIOD);
-    _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-  }
-  
-  class SendZNRecordTimerTask extends TimerTask
-  {
-    @Override
-    public void run()
-    { 
-      sendUpdateBatch();
-    }
-  }
-  
-  public void enqueueZNRecordUpdate(ZNRecordUpdate update, String webserviceUrl)
-  {
-    try
-    {
-      LOG.info("Enqueue update to " + update.getPath() + " opcode: " + update.getOpcode() + " to " + webserviceUrl);
-      _webServiceUrl = webserviceUrl;
-      update.getRecord().setSimpleField(USE_PROPERTYTRANSFER, "true");
-      synchronized(_dataBufferRef)
-      {
-        if(_dataBufferRef.get().containsKey(update._path))
-        {
-          ZNRecord oldVal = _dataBufferRef.get().get(update.getPath()).getRecord();
-          oldVal = update.getZNRecordUpdater().update(oldVal);
-          _dataBufferRef.get().get(update.getPath())._record = oldVal;
-        }
-        else
-        {
-          _dataBufferRef.get().put(update.getPath(), update);
-        }
-      }
-    }
-    catch(Exception e)
-    {
-      LOG.error("", e);
-    }
-  }
-  
-  void sendUpdateBatch()
-  {
-    LOG.debug("Actual sending update with " + _dataBufferRef.get().size() + " updates to " + _webServiceUrl);
-    Map<String, ZNRecordUpdate> updateCache  = null;
-    
-    synchronized(_dataBufferRef)
-    {
-      updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-    }
-    
-    if(updateCache != null && updateCache.size() > 0)
-    {
-      ZNRecordUpdateUploadTask task = new ZNRecordUpdateUploadTask(updateCache, _webServiceUrl, _clients[_requestCount.intValue() % _maxConcurrentTasks]);
-      _requestCount.incrementAndGet();
-      _executorService.submit(task);
-      LOG.trace("Queue size :" + ((ThreadPoolExecutor)_executorService).getQueue().size());
-    }
-  }
-  
-  public void shutdown()
-  {
-    LOG.info("Shutting down ZkPropertyTransferClient");
-    _executorService.shutdown();
-    _timer.cancel();
-    for(Client client: _clients)
-    {
-      try
-      {
-        client.stop();
-      }
-      catch (Exception e)
-      {
-        LOG.error("", e);
-      }
-    }
-  }
-  
-  class ZNRecordUpdateUploadTask implements Callable<Void>
-  {
-    Map<String, ZNRecordUpdate> _updateMap;
-    String _webServiceUrl;
-    Client _client;
-    
-    ZNRecordUpdateUploadTask(Map<String, ZNRecordUpdate> update, String webserviceUrl, Client client)
-    {
-      _updateMap = update;
-      _webServiceUrl = webserviceUrl;
-      _client = client;
-    }
-    
-    @Override
-    public Void call() throws Exception
-    {
-      LOG.debug("Actual sending update with " + _updateMap.size() + " updates to " + _webServiceUrl);
-      long time = System.currentTimeMillis();
-      Reference resourceRef = new Reference(_webServiceUrl);
-      Request request = new Request(Method.PUT, resourceRef);
-      
-      ObjectMapper mapper = new ObjectMapper();
-      StringWriter sw = new StringWriter();
-      try
-      {
-        mapper.writeValue(sw, _updateMap);
-      }
-      catch (Exception e)
-      {
-        LOG.error("",e);
-      }
-
-      request.setEntity(
-          ZNRecordUpdateResource.UPDATEKEY + "=" + sw, MediaType.APPLICATION_ALL);
-      // This is a sync call. See com.noelios.restlet.http.StreamClientCall.sendRequest()
-      Response response = _client.handle(request);
-      
-      if(response.getStatus().getCode() != Status.SUCCESS_OK.getCode())
-      {
-        LOG.error("Status : " + response.getStatus());
-      }
-      LOG.info("Using time : " + (System.currentTimeMillis() - time));
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/restlet/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/package-info.java b/helix-core/src/main/java/com/linkedin/helix/controller/restlet/package-info.java
deleted file mode 100644
index 777a09f..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Restlet server for Helix controller
- * 
- */
-package com.linkedin.helix.controller.restlet;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/AttributeName.java
deleted file mode 100644
index 7cd9383..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/AttributeName.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-public enum AttributeName
-{
-  RESOURCES,
-  BEST_POSSIBLE_STATE,
-  CURRENT_STATE,
-  MESSAGES_ALL,
-  MESSAGES_SELECTED,
-  MESSAGES_THROTTLE,
-  LOCAL_STATE
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateCalcStage.java
deleted file mode 100644
index 64f7f91..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateCalcStage.java
+++ /dev/null
@@ -1,609 +0,0 @@
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixConstants.StateModelToken;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.josql.JsqlQueryListProcessor;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.IdealState.IdealStateModeProperty;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-import com.linkedin.helix.model.StateModelDefinition;
-
-/**
- * For partition compute best possible (instance,state) pair based on
- * IdealState,StateModel,LiveInstance
- * 
- * @author kgopalak
- * 
- */
-// TODO: refactor this
-public class BestPossibleStateCalcStage extends AbstractBaseStage
-{
-  private static final Logger logger =
-      Logger.getLogger(BestPossibleStateCalcStage.class.getName());
-
-  @Override
-  public void process(ClusterEvent event) throws Exception
-  {
-    long startTime = System.currentTimeMillis();
-    logger.info("START BestPossibleStateCalcStage.process()");
-
-    CurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    Map<String, Resource> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-
-    if (currentStateOutput == null || resourceMap == null || cache == null)
-    {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires CURRENT_STATE|RESOURCES|DataCache");
-    }
-
-    BestPossibleStateOutput bestPossibleStateOutput =
-        compute(event, resourceMap, currentStateOutput);
-    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(),
-                       bestPossibleStateOutput);
-
-    long endTime = System.currentTimeMillis();
-    logger.info("END BestPossibleStateCalcStage.process(). took: "
-        + (endTime - startTime) + " ms");
-  }
-
-  private BestPossibleStateOutput compute(ClusterEvent event,
-                                          Map<String, Resource> resourceMap,
-                                          CurrentStateOutput currentStateOutput)
-  {
-    // for each ideal state
-    // read the state model def
-    // for each resource
-    // get the preference list
-    // for each instanceName check if its alive then assign a state
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    HelixManager manager = event.getAttribute("helixmanager");
-
-    BestPossibleStateOutput output = new BestPossibleStateOutput();
-
-    for (String resourceName : resourceMap.keySet())
-    {
-      logger.debug("Processing resource:" + resourceName);
-
-      Resource resource = resourceMap.get(resourceName);
-      // Ideal state may be gone. In that case we need to get the state model name
-      // from the current state
-      IdealState idealState = cache.getIdealState(resourceName);
-
-      String stateModelDefName;
-
-      if (idealState == null)
-      {
-        // if ideal state is deleted, use an empty one
-        logger.info("resource:" + resourceName + " does not exist anymore");
-        stateModelDefName = currentStateOutput.getResourceStateModelDef(resourceName);
-        idealState = new IdealState(resourceName);
-      }
-      else
-      {
-        stateModelDefName = idealState.getStateModelDefRef();
-      }
-
-      StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
-      if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO_REBALANCE)
-      {
-        calculateAutoBalancedIdealState(cache,
-                                        idealState,
-                                        stateModelDef,
-                                        currentStateOutput);
-      }
-
-      // For idealstate that has rebalancing timer and is in AUTO mode, we will run jsql
-      // queries to calculate the
-      // preference list
-
-      Map<String, List<String>> queryPartitionPriorityLists = null;
-      if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO
-          && idealState.getRebalanceTimerPeriod() > 0)
-      {
-        if (manager != null)
-        {
-          queryPartitionPriorityLists =
-              calculatePartitionPriorityListWithQuery(manager, idealState);
-        }
-      }
-
-      for (Partition partition : resource.getPartitions())
-      {
-        Map<String, String> currentStateMap =
-            currentStateOutput.getCurrentStateMap(resourceName, partition);
-
-        Map<String, String> bestStateForPartition;
-        Set<String> disabledInstancesForPartition =
-            cache.getDisabledInstancesForPartition(partition.toString());
-
-        if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
-        {
-          Map<String, String> idealStateMap =
-              idealState.getInstanceStateMap(partition.getPartitionName());
-          bestStateForPartition =
-              computeCustomizedBestStateForPartition(cache,
-                                                     stateModelDef,
-                                                     idealStateMap,
-                                                     currentStateMap,
-                                                     disabledInstancesForPartition);
-        }
-        else
-        // both AUTO and AUTO_REBALANCE mode
-        {
-          List<String> instancePreferenceList =
-              getPreferenceList(cache, partition, idealState, stateModelDef);
-          if (queryPartitionPriorityLists != null)
-          {
-            String partitionName = partition.getPartitionName();
-            if (queryPartitionPriorityLists.containsKey(partitionName))
-            {
-              List<String> queryInstancePreferenceList =
-                  queryPartitionPriorityLists.get(partitionName);
-              // For instances that is not included in the queryInstancePreferenceList,
-              // add them to the end of the list
-              for (String instanceName : instancePreferenceList)
-              {
-                if (!queryInstancePreferenceList.contains(instanceName))
-                {
-                  queryInstancePreferenceList.add(instanceName);
-                }
-              }
-              instancePreferenceList = queryInstancePreferenceList;
-            }
-          }
-          bestStateForPartition =
-              computeAutoBestStateForPartition(cache,
-                                               stateModelDef,
-                                               instancePreferenceList,
-                                               currentStateMap,
-                                               disabledInstancesForPartition);
-        }
-        output.setState(resourceName, partition, bestStateForPartition);
-      }
-    }
-    return output;
-  }
-
-  Map<String, List<String>> calculatePartitionPriorityListWithQuery(HelixManager manager,
-                                                                    IdealState idealState)
-  {
-    // Read queries from the resource config
-    String querys = idealState.getRecord().getSimpleField(IdealState.QUERY_LIST);
-    if (querys == null)
-    {
-      logger.warn("IdealState " + idealState.getResourceName()
-          + " does not have query list");
-      return null;
-    }
-    try
-    {
-      List<String> queryList = Arrays.asList(querys.split(";"));
-      List<ZNRecord> resultList =
-          JsqlQueryListProcessor.executeQueryList(manager.getHelixDataAccessor(),
-                                                  manager.getClusterName(),
-                                                  queryList);
-      Map<String, List<String>> priorityLists = new TreeMap<String, List<String>>();
-      for (ZNRecord result : resultList)
-      {
-        String partition = result.getSimpleField("partition");
-        String instance = result.getSimpleField("instance");
-        if (instance.equals("") || partition.equals(""))
-        {
-          continue;
-        }
-
-        if (!priorityLists.containsKey(partition))
-        {
-          priorityLists.put(partition, new ArrayList<String>());
-        }
-        priorityLists.get(partition).add(instance);
-      }
-      return priorityLists;
-    }
-    catch (Exception e)
-    {
-      logger.error("", e);
-      return null;
-    }
-
-  }
-
-  /**
-   * Compute best state for resource in AUTO_REBALANCE ideal state mode. the algorithm
-   * will make sure that the master partition are evenly distributed; Also when instances
-   * are added / removed, the amount of diff in master partitions are minimized
-   * 
-   * @param cache
-   * @param idealState
-   * @param instancePreferenceList
-   * @param stateModelDef
-   * @param currentStateOutput
-   * @return
-   */
-  private void calculateAutoBalancedIdealState(ClusterDataCache cache,
-                                               IdealState idealState,
-                                               StateModelDefinition stateModelDef,
-                                               CurrentStateOutput currentStateOutput)
-  {
-    String topStateValue = stateModelDef.getStatesPriorityList().get(0);
-    Set<String> liveInstances = cache._liveInstanceMap.keySet();
-    // Obtain replica number
-    int replicas = 1;
-    try
-    {
-      replicas = Integer.parseInt(idealState.getReplicas());
-    }
-    catch (Exception e)
-    {
-      logger.error("", e);
-    }
-    // Init for all partitions with empty list
-    Map<String, List<String>> defaultListFields = new TreeMap<String, List<String>>();
-    List<String> emptyList = new ArrayList<String>(0);
-    for (String partition : idealState.getPartitionSet())
-    {
-      defaultListFields.put(partition, emptyList);
-    }
-    idealState.getRecord().setListFields(defaultListFields);
-    // Return if no live instance
-    if (liveInstances.size() == 0)
-    {
-      logger.info("No live instances, return. Idealstate : "
-          + idealState.getResourceName());
-      return;
-    }
-    Map<String, List<String>> masterAssignmentMap = new HashMap<String, List<String>>();
-    for (String instanceName : liveInstances)
-    {
-      masterAssignmentMap.put(instanceName, new ArrayList<String>());
-    }
-    Set<String> orphanedPartitions = new HashSet<String>();
-    orphanedPartitions.addAll(idealState.getPartitionSet());
-    // Go through all current states and fill the assignments
-    for (String liveInstanceName : liveInstances)
-    {
-      CurrentState currentState =
-          cache.getCurrentState(liveInstanceName,
-                                cache.getLiveInstances()
-                                     .get(liveInstanceName)
-                                     .getSessionId()).get(idealState.getId());
-      if (currentState != null)
-      {
-        Map<String, String> partitionStates = currentState.getPartitionStateMap();
-        for (String partitionName : partitionStates.keySet())
-        {
-          String state = partitionStates.get(partitionName);
-          if (state.equals(topStateValue))
-          {
-            masterAssignmentMap.get(liveInstanceName).add(partitionName);
-            orphanedPartitions.remove(partitionName);
-          }
-        }
-      }
-    }
-    List<String> orphanedPartitionsList = new ArrayList<String>();
-    orphanedPartitionsList.addAll(orphanedPartitions);
-    normalizeAssignmentMap(masterAssignmentMap, orphanedPartitionsList);
-    idealState.getRecord()
-              .setListFields(generateListFieldFromMasterAssignment(masterAssignmentMap,
-                                                                   replicas));
-
-  }
-
-  /**
-   * Given the current master assignment map and the partitions not hosted, generate an
-   * evenly distributed partition assignment map
-   * 
-   * @param masterAssignmentMap
-   *          current master assignment map
-   * @param orphanPartitions
-   *          partitions not hosted by any instance
-   * @return
-   */
-  private void normalizeAssignmentMap(Map<String, List<String>> masterAssignmentMap,
-                                      List<String> orphanPartitions)
-  {
-    int totalPartitions = 0;
-    String[] instanceNames = new String[masterAssignmentMap.size()];
-    masterAssignmentMap.keySet().toArray(instanceNames);
-    Arrays.sort(instanceNames);
-    // Find out total partition number
-    for (String key : masterAssignmentMap.keySet())
-    {
-      totalPartitions += masterAssignmentMap.get(key).size();
-      Collections.sort(masterAssignmentMap.get(key));
-    }
-    totalPartitions += orphanPartitions.size();
-
-    // Find out how many partitions an instance should host
-    int partitionNumber = totalPartitions / masterAssignmentMap.size();
-    int leave = totalPartitions % masterAssignmentMap.size();
-
-    for (int i = 0; i < instanceNames.length; i++)
-    {
-      int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
-      leave--;
-      // For hosts that has more partitions, move those partitions to "orphaned"
-      while (masterAssignmentMap.get(instanceNames[i]).size() > targetPartitionNo)
-      {
-        int lastElementIndex = masterAssignmentMap.get(instanceNames[i]).size() - 1;
-        orphanPartitions.add(masterAssignmentMap.get(instanceNames[i])
-                                                .get(lastElementIndex));
-        masterAssignmentMap.get(instanceNames[i]).remove(lastElementIndex);
-      }
-    }
-    leave = totalPartitions % masterAssignmentMap.size();
-    Collections.sort(orphanPartitions);
-    // Assign "orphaned" partitions to hosts that do not have enough partitions
-    for (int i = 0; i < instanceNames.length; i++)
-    {
-      int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
-      leave--;
-      while (masterAssignmentMap.get(instanceNames[i]).size() < targetPartitionNo)
-      {
-        int lastElementIndex = orphanPartitions.size() - 1;
-        masterAssignmentMap.get(instanceNames[i])
-                           .add(orphanPartitions.get(lastElementIndex));
-        orphanPartitions.remove(lastElementIndex);
-      }
-    }
-    if (orphanPartitions.size() > 0)
-    {
-      logger.error("orphanPartitions still contains elements");
-    }
-  }
-
-  /**
-   * Generate full preference list from the master assignment map evenly distribute the
-   * slave partitions mastered on a host to other hosts
-   * 
-   * @param masterAssignmentMap
-   *          current master assignment map
-   * @param orphanPartitions
-   *          partitions not hosted by any instance
-   * @return
-   */
-  Map<String, List<String>> generateListFieldFromMasterAssignment(Map<String, List<String>> masterAssignmentMap,
-                                                                  int replicas)
-  {
-    Map<String, List<String>> listFields = new HashMap<String, List<String>>();
-    int slaves = replicas - 1;
-    String[] instanceNames = new String[masterAssignmentMap.size()];
-    masterAssignmentMap.keySet().toArray(instanceNames);
-    Arrays.sort(instanceNames);
-
-    for (int i = 0; i < instanceNames.length; i++)
-    {
-      String instanceName = instanceNames[i];
-      List<String> otherInstances = new ArrayList<String>(masterAssignmentMap.size() - 1);
-      for (int x = 0; x < instanceNames.length - 1; x++)
-      {
-        int index = (x + i + 1) % instanceNames.length;
-        otherInstances.add(instanceNames[index]);
-      }
-
-      List<String> partitionList = masterAssignmentMap.get(instanceName);
-      for (int j = 0; j < partitionList.size(); j++)
-      {
-        String partitionName = partitionList.get(j);
-        listFields.put(partitionName, new ArrayList<String>());
-        listFields.get(partitionName).add(instanceName);
-
-        int slavesCanAssign = Math.min(slaves, otherInstances.size());
-        for (int k = 0; k < slavesCanAssign; k++)
-        {
-          int index = (j + k + 1) % otherInstances.size();
-          listFields.get(partitionName).add(otherInstances.get(index));
-        }
-      }
-    }
-    return listFields;
-  }
-
-  /**
-   * compute best state for resource in AUTO ideal state mode
-   * 
-   * @param cache
-   * @param stateModelDef
-   * @param instancePreferenceList
-   * @param currentStateMap
-   *          : instance->state for each partition
-   * @param disabledInstancesForPartition
-   * @return
-   */
-  private Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
-                                                               StateModelDefinition stateModelDef,
-                                                               List<String> instancePreferenceList,
-                                                               Map<String, String> currentStateMap,
-                                                               Set<String> disabledInstancesForPartition)
-  {
-    Map<String, String> instanceStateMap = new HashMap<String, String>();
-
-    // if the ideal state is deleted, instancePreferenceList will be empty and
-    // we should drop all resources.
-    if (currentStateMap != null)
-    {
-      for (String instance : currentStateMap.keySet())
-      {
-        if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
-            && !"ERROR".equals(currentStateMap.get(instance)))
-        {
-          // move to DROPPED state only if not in ERROR state
-          instanceStateMap.put(instance, "DROPPED");
-        }
-        else if (!"ERROR".equals(currentStateMap.get(instance))
-            && disabledInstancesForPartition.contains(instance))
-        {
-          // if a non-error node is disabled, put it into initial state (OFFLINE)
-          instanceStateMap.put(instance, stateModelDef.getInitialState());
-        }
-      }
-    }
-
-    // ideal state is deleted
-    if (instancePreferenceList == null)
-    {
-      return instanceStateMap;
-    }
-
-    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
-    boolean assigned[] = new boolean[instancePreferenceList.size()];
-
-    Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
-
-    for (String state : statesPriorityList)
-    {
-      String num = stateModelDef.getNumInstancesPerState(state);
-      int stateCount = -1;
-      if ("N".equals(num))
-      {
-        Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
-        liveAndEnabled.removeAll(disabledInstancesForPartition);
-        stateCount = liveAndEnabled.size();
-      }
-      else if ("R".equals(num))
-      {
-        stateCount = instancePreferenceList.size();
-      }
-      else
-      {
-        try
-        {
-          stateCount = Integer.parseInt(num);
-        }
-        catch (Exception e)
-        {
-          logger.error("Invalid count for state:" + state + " ,count=" + num);
-        }
-      }
-      if (stateCount > -1)
-      {
-        int count = 0;
-        for (int i = 0; i < instancePreferenceList.size(); i++)
-        {
-          String instanceName = instancePreferenceList.get(i);
-
-          boolean notInErrorState =
-              currentStateMap == null
-                  || !"ERROR".equals(currentStateMap.get(instanceName));
-
-          if (liveInstancesMap.containsKey(instanceName) && !assigned[i]
-              && notInErrorState && !disabledInstancesForPartition.contains(instanceName))
-          {
-            instanceStateMap.put(instanceName, state);
-            count = count + 1;
-            assigned[i] = true;
-            if (count == stateCount)
-            {
-              break;
-            }
-          }
-        }
-      }
-    }
-    return instanceStateMap;
-  }
-
-  /**
-   * compute best state for resource in CUSTOMIZED ideal state mode
-   * 
-   * @param cache
-   * @param stateModelDef
-   * @param idealStateMap
-   * @param currentStateMap
-   * @param disabledInstancesForPartition
-   * @return
-   */
-  private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache cache,
-                                                                     StateModelDefinition stateModelDef,
-                                                                     Map<String, String> idealStateMap,
-                                                                     Map<String, String> currentStateMap,
-                                                                     Set<String> disabledInstancesForPartition)
-  {
-    Map<String, String> instanceStateMap = new HashMap<String, String>();
-
-    // if the ideal state is deleted, idealStateMap will be null/empty and
-    // we should drop all resources.
-    if (currentStateMap != null)
-    {
-      for (String instance : currentStateMap.keySet())
-      {
-        if ( (idealStateMap == null || !idealStateMap.containsKey(instance))
-            && !"ERROR".equals(currentStateMap.get(instance)))
-        {
-          // move to DROPPED state only if not in ERROR state
-          instanceStateMap.put(instance, "DROPPED");
-        }
-        else if (!"ERROR".equals(currentStateMap.get(instance))
-            && disabledInstancesForPartition.contains(instance))
-        {
-          // if a non-error node is disabled, put it into initial state (OFFLINE)
-          instanceStateMap.put(instance, stateModelDef.getInitialState());
-        }
-      }
-    }
-
-    // ideal state is deleted
-    if (idealStateMap == null)
-    {
-      return instanceStateMap;
-    }
-
-    Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
-    for (String instance : idealStateMap.keySet())
-    {
-      boolean notInErrorState =
-          currentStateMap == null || !"ERROR".equals(currentStateMap.get(instance));
-
-      if (liveInstancesMap.containsKey(instance) && notInErrorState
-          && !disabledInstancesForPartition.contains(instance))
-      {
-        instanceStateMap.put(instance, idealStateMap.get(instance));
-      }
-    }
-
-    return instanceStateMap;
-  }
-
-  private List<String> getPreferenceList(ClusterDataCache cache,
-                                         Partition resource,
-                                         IdealState idealState,
-                                         StateModelDefinition stateModelDef)
-  {
-    List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
-
-    if (listField != null && listField.size() == 1
-        && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0)))
-    {
-      Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-      List<String> prefList = new ArrayList<String>(liveInstances.keySet());
-      Collections.sort(prefList);
-      return prefList;
-    }
-    else
-    {
-      return listField;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateOutput.java
deleted file mode 100644
index 1334b47..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateOutput.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.linkedin.helix.model.Partition;
-
-public class BestPossibleStateOutput
-{
-  // resource->partition->instance->state
-  Map<String, Map<Partition, Map<String, String>>> _dataMap;
-
-  public BestPossibleStateOutput()
-  {
-    _dataMap = new HashMap<String, Map<Partition, Map<String, String>>>();
-  }
-
-  public void setState(String resourceName, Partition resource,
-      Map<String, String> bestInstanceStateMappingForResource)
-  {
-    if (!_dataMap.containsKey(resourceName))
-    {
-      _dataMap.put(resourceName,
-          new HashMap<Partition, Map<String, String>>());
-    }
-    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
-    map.put(resource, bestInstanceStateMappingForResource);
-  }
-
-  public Map<String, String> getInstanceStateMap(String resourceName,
-      Partition resource)
-  {
-    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
-    if (map != null)
-    {
-      return map.get(resource);
-    }
-    return Collections.emptyMap();
-  }
-
-  public Map<Partition, Map<String, String>> getResourceMap(String resourceName)
-  {
-    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
-    if (map != null)
-    {
-      return map;
-    }
-    return Collections.emptyMap();
-  }
-
-  @Override
-  public String toString()
-  {
-    return _dataMap.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterDataCache.java
deleted file mode 100644
index 34e06e6..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterDataCache.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixConstants.StateModelToken;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.model.ClusterConstraints;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintType;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.StateModelDefinition;
-
-/**
- * Reads the data from the cluster using data accessor. This output ClusterData which
- * provides useful methods to search/lookup properties
- * 
- * @author kgopalak
- * 
- */
-public class ClusterDataCache
-{
-
-  Map<String, LiveInstance>                           _liveInstanceMap;
-  Map<String, IdealState>                             _idealStateMap;
-  Map<String, StateModelDefinition>                   _stateModelDefMap;
-  Map<String, InstanceConfig>                         _instanceConfigMap;
-  Map<String, ClusterConstraints>                     _constraintMap;
-  Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
-  Map<String, Map<String, Message>>                   _messageMap;
-
-  // Map<String, Map<String, HealthStat>> _healthStatMap;
-  // private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
-  // private PersistentStats _persistentStats;
-  // private Alerts _alerts;
-  // private AlertStatus _alertStatus;
-
-  private static final Logger                         LOG =
-                                                              Logger.getLogger(ClusterDataCache.class.getName());
-
-  public boolean refresh(HelixDataAccessor accessor)
-  {
-    Builder keyBuilder = accessor.keyBuilder();
-    _idealStateMap = accessor.getChildValuesMap(keyBuilder.idealStates());
-    _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
-
-    for (LiveInstance instance : _liveInstanceMap.values())
-    {
-      LOG.trace("live instance: " + instance.getInstanceName() + " "
-          + instance.getSessionId());
-    }
-
-    _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
-    _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
-    _constraintMap =
-        accessor.getChildValuesMap(keyBuilder.constraints());
-
-    Map<String, Map<String, Message>> msgMap =
-        new HashMap<String, Map<String, Message>>();
-    for (String instanceName : _liveInstanceMap.keySet())
-    {
-      Map<String, Message> map =
-          accessor.getChildValuesMap(keyBuilder.messages(instanceName));
-      msgMap.put(instanceName, map);
-    }
-    _messageMap = Collections.unmodifiableMap(msgMap);
-
-    Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap =
-        new HashMap<String, Map<String, Map<String, CurrentState>>>();
-    for (String instanceName : _liveInstanceMap.keySet())
-    {
-      LiveInstance liveInstance = _liveInstanceMap.get(instanceName);
-      String sessionId = liveInstance.getSessionId();
-      if (!allCurStateMap.containsKey(instanceName))
-      {
-        allCurStateMap.put(instanceName, new HashMap<String, Map<String, CurrentState>>());
-      }
-      Map<String, Map<String, CurrentState>> curStateMap =
-          allCurStateMap.get(instanceName);
-      Map<String, CurrentState> map =
-          accessor.getChildValuesMap(keyBuilder.currentStates(instanceName, sessionId));
-      curStateMap.put(sessionId, map);
-    }
-
-    for (String instance : allCurStateMap.keySet())
-    {
-      allCurStateMap.put(instance,
-                         Collections.unmodifiableMap(allCurStateMap.get(instance)));
-    }
-    _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
-
-    return true;
-  }
-
-  public Map<String, IdealState> getIdealStates()
-  {
-    return _idealStateMap;
-  }
-
-  public Map<String, LiveInstance> getLiveInstances()
-  {
-    return _liveInstanceMap;
-  }
-
-  public Map<String, CurrentState> getCurrentState(String instanceName,
-                                                   String clientSessionId)
-  {
-    return _currentStateMap.get(instanceName).get(clientSessionId);
-  }
-
-  public Map<String, Message> getMessages(String instanceName)
-  {
-    Map<String, Message> map = _messageMap.get(instanceName);
-    if (map != null)
-    {
-      return map;
-    }
-    else
-    {
-      return Collections.emptyMap();
-    }
-  }
-
-  // public HealthStat getGlobalStats()
-  // {
-  // return _globalStats;
-  // }
-  //
-  // public PersistentStats getPersistentStats()
-  // {
-  // return _persistentStats;
-  // }
-  //
-  // public Alerts getAlerts()
-  // {
-  // return _alerts;
-  // }
-  //
-  // public AlertStatus getAlertStatus()
-  // {
-  // return _alertStatus;
-  // }
-  //
-  // public Map<String, HealthStat> getHealthStats(String instanceName)
-  // {
-  // Map<String, HealthStat> map = _healthStatMap.get(instanceName);
-  // if (map != null)
-  // {
-  // return map;
-  // } else
-  // {
-  // return Collections.emptyMap();
-  // }
-  // }
-
-  public StateModelDefinition getStateModelDef(String stateModelDefRef)
-  {
-
-    return _stateModelDefMap.get(stateModelDefRef);
-  }
-
-  public IdealState getIdealState(String resourceName)
-  {
-    return _idealStateMap.get(resourceName);
-  }
-
-  public Map<String, InstanceConfig> getInstanceConfigMap()
-  {
-    return _instanceConfigMap;
-  }
-
-  public Set<String> getDisabledInstancesForPartition(String partition)
-  {
-    Set<String> disabledInstancesSet = new HashSet<String>();
-    for (String instance : _instanceConfigMap.keySet())
-    {
-      InstanceConfig config = _instanceConfigMap.get(instance);
-      if (config.getInstanceEnabled() == false
-          || config.getInstanceEnabledForPartition(partition) == false)
-      {
-        disabledInstancesSet.add(instance);
-      }
-    }
-    return disabledInstancesSet;
-  }
-
-  public int getReplicas(String resourceName)
-  {
-    int replicas = -1;
-
-    if (_idealStateMap.containsKey(resourceName))
-    {
-      String replicasStr = _idealStateMap.get(resourceName).getReplicas();
-
-      if (replicasStr != null)
-      {
-        if (replicasStr.equals(StateModelToken.ANY_LIVEINSTANCE.toString()))
-        {
-          replicas = _liveInstanceMap.size();
-        }
-        else
-        {
-          try
-          {
-            replicas = Integer.parseInt(replicasStr);
-          }
-          catch (Exception e)
-          {
-            LOG.error("invalid replicas string: " + replicasStr);
-          }
-        }
-      }
-      else
-      {
-        LOG.error("idealState for resource: " + resourceName + " does NOT have replicas");
-      }
-    }
-    return replicas;
-  }
-
-  public ClusterConstraints getConstraint(ConstraintType type)
-  {
-    if (_constraintMap != null)
-    {
-      return _constraintMap.get(type.toString());
-    }
-    return null;
-  }
-
-  @Override
-  public String toString()
-  {
-    StringBuilder sb = new StringBuilder();
-    sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n");
-    sb.append("idealStateMap:" + _idealStateMap).append("\n");
-    sb.append("stateModelDefMap:" + _stateModelDefMap).append("\n");
-    sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
-    sb.append("messageMap:" + _messageMap).append("\n");
-
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterEvent.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterEvent.java
deleted file mode 100644
index 3a3049a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterEvent.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class ClusterEvent
-{
-  private static final Logger logger = Logger.getLogger(ClusterEvent.class
-      .getName());
-  private final String _eventName;
-  private final Map<String, Object> _eventAttributeMap;
-
-  public ClusterEvent(String name)
-  {
-    _eventName = name;
-    _eventAttributeMap = new HashMap<String, Object>();
-  }
-
-  public void addAttribute(String attrName, Object attrValue)
-  {
-    if (logger.isTraceEnabled())
-    {
-      logger.trace("Adding attribute:" + attrName);
-      logger.trace(" attribute value:" + attrValue);
-    }
-   
-    _eventAttributeMap.put(attrName, attrValue);
-  }
-
-  public String getName()
-  {
-    return _eventName;
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T extends Object> T getAttribute(String attrName)
-  {
-    Object ret = _eventAttributeMap.get(attrName);
-    if (ret != null)
-    {
-      return (T) ret;
-    }
-    return null;
-  }
-  
-  @Override
-  public String toString()
-  {
-    StringBuilder sb = new StringBuilder();
-    sb.append("name:"+ _eventName).append("\n");
-    for(String key:_eventAttributeMap.keySet()){
-      sb.append(key).append(":").append(_eventAttributeMap.get(key)).append("\n");
-    }
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/CompatibilityCheckStage.java
deleted file mode 100644
index bf350b1..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/CompatibilityCheckStage.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.LiveInstance;
-
-public class CompatibilityCheckStage extends AbstractBaseStage
-{
-  private static final Logger LOG = Logger
-      .getLogger(CompatibilityCheckStage.class.getName());
-
-  /**
-   * INCOMPATIBLE_MAP stores primary version pairs:
-   *  {controllerPrimaryVersion, participantPrimaryVersion}
-   *  that are incompatible
-   */
-  private static final Map<String, Boolean> INCOMPATIBLE_MAP;
-  static
-  {
-      Map<String, Boolean> map = new HashMap<String, Boolean>();
-      /**
-       * {controllerPrimaryVersion,participantPrimaryVersion} -> false
-       */
-      map.put("0.4,0.3", false);
-      INCOMPATIBLE_MAP = Collections.unmodifiableMap(map);
-  }
-
-  private String getPrimaryVersion(String version)
-  {
-    String[] splits = version.split("\\.");
-    if (splits == null || splits.length != 3)
-    {
-      return null;
-    }
-    return version.substring(0, version.lastIndexOf('.'));
-  }
-
-  private boolean isCompatible(String controllerVersion, String participantVersion)
-  {
-    if (participantVersion == null)
-    {
-      LOG.warn("Missing version of participant. Skip version check.");
-      return true;
-    }
-
-    // compare primary version
-    String controllerPrimaryVersion = getPrimaryVersion(controllerVersion);
-    String participantPrimaryVersion = getPrimaryVersion(participantVersion);
-    if (controllerPrimaryVersion != null && participantPrimaryVersion != null)
-    {
-      if (controllerPrimaryVersion.compareTo(participantPrimaryVersion) < 0)
-      {
-        LOG.info("Controller primary version is less than participant primary version.");
-        return false;
-      }
-      else
-      {
-        if (INCOMPATIBLE_MAP.containsKey(controllerPrimaryVersion + "," + participantPrimaryVersion))
-        {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public void process(ClusterEvent event) throws Exception
-  {
-    HelixManager manager = event.getAttribute("helixmanager");
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    if (manager == null || cache == null)
-    {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager | DataCache");
-    }
-
-    String controllerVersion = manager.getVersion();
-    if (controllerVersion == null)
-    {
-      String errorMsg = "Missing version of controller: " + manager.getInstanceName()
-          + ". Pipeline will not continue.";
-      LOG.error(errorMsg);
-      throw new StageException(errorMsg);
-    }
-
-    Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
-    for (LiveInstance liveInstance : liveInstanceMap.values())
-    {
-      String participantVersion = liveInstance.getHelixVersion();
-      if (!isCompatible(controllerVersion, participantVersion))
-      {
-        String errorMsg = "cluster manager versions are incompatible; pipeline will not continue. "
-                        + "controller:" + manager.getInstanceName() + ", controllerVersion:" + controllerVersion
-                        + "; participant:" + liveInstance.getInstanceName() + ", participantVersion:" + participantVersion;
-        LOG.error(errorMsg);
-        throw new StageException(errorMsg);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateComputationStage.java
deleted file mode 100644
index 833caba..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateComputationStage.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.List;
-import java.util.Map;
-
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-
-/**
- * For each LiveInstances select currentState and message whose sessionId matches
- * sessionId from LiveInstance Get Partition,State for all the resources computed in
- * previous State [ResourceComputationStage]
- * 
- * @author kgopalak
- * 
- */
-public class CurrentStateComputationStage extends AbstractBaseStage
-{
-  @Override
-  public void process(ClusterEvent event) throws Exception
-  {
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, Resource> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-
-    if (cache == null || resourceMap == null)
-    {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires DataCache|RESOURCE");
-    }
-
-    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
-
-    for (LiveInstance instance : liveInstances.values())
-    {
-      String instanceName = instance.getInstanceName();
-      Map<String, Message> instanceMessages = cache.getMessages(instanceName);
-      for (Message message : instanceMessages.values())
-      {
-        if (!MessageType.STATE_TRANSITION.toString()
-                                         .equalsIgnoreCase(message.getMsgType()))
-        {
-          continue;
-        }
-        if (!instance.getSessionId().equals(message.getTgtSessionId()))
-        {
-          continue;
-        }
-        String resourceName = message.getResourceName();
-        Resource resource = resourceMap.get(resourceName);
-        if (resource == null)
-        {
-          continue;
-        }
-
-        if (!message.getGroupMessageMode())
-        {
-          String partitionName = message.getPartitionName();
-          Partition partition = resource.getPartition(partitionName);
-          if (partition != null)
-          {
-            currentStateOutput.setPendingState(resourceName,
-                                               partition,
-                                               instanceName,
-                                               message.getToState());
-          }
-          else
-          {
-            // log
-          }
-        }
-        else
-        {
-          List<String> partitionNames = message.getPartitionNames();
-          if (!partitionNames.isEmpty())
-          {
-            for (String partitionName : partitionNames)
-            {
-              Partition partition = resource.getPartition(partitionName);
-              if (partition != null)
-              {
-                currentStateOutput.setPendingState(resourceName,
-                                                   partition,
-                                                   instanceName,
-                                                   message.getToState());
-              }
-              else
-              {
-                // log
-              }
-            }
-          }
-        }
-      }
-    }
-    for (LiveInstance instance : liveInstances.values())
-    {
-      String instanceName = instance.getInstanceName();
-
-      String clientSessionId = instance.getSessionId();
-      Map<String, CurrentState> currentStateMap =
-          cache.getCurrentState(instanceName, clientSessionId);
-      for (CurrentState currentState : currentStateMap.values())
-      {
-
-        if (!instance.getSessionId().equals(currentState.getSessionId()))
-        {
-          continue;
-        }
-        String resourceName = currentState.getResourceName();
-        String stateModelDefName = currentState.getStateModelDefRef();
-        Resource resource = resourceMap.get(resourceName);
-        if (resource == null)
-        {
-          continue;
-        }
-        if (stateModelDefName != null)
-        {
-          currentStateOutput.setResourceStateModelDef(resourceName, stateModelDefName);
-        }
-
-        currentStateOutput.setBucketSize(resourceName, currentState.getBucketSize());
-
-        Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
-        for (String partitionName : partitionStateMap.keySet())
-        {
-          Partition partition = resource.getPartition(partitionName);
-          if (partition != null)
-          {
-            currentStateOutput.setCurrentState(resourceName,
-                                               partition,
-                                               instanceName,
-                                               currentState.getState(partitionName));
-
-          }
-          else
-          {
-            // log
-          }
-        }
-      }
-    }
-    event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateOutput.java
deleted file mode 100644
index 9f531ba..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateOutput.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.Partition;
-
-public class CurrentStateOutput
-{
-  private final Map<String, Map<Partition, Map<String, String>>> _currentStateMap;
-  private final Map<String, Map<Partition, Map<String, String>>> _pendingStateMap;
-  private final Map<String, String>                              _resourceStateModelMap;
-  private final Map<String, CurrentState>                        _curStateMetaMap;
-
-  public CurrentStateOutput()
-  {
-    _currentStateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
-    _pendingStateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
-    _resourceStateModelMap = new HashMap<String, String>();
-    _curStateMetaMap = new HashMap<String, CurrentState>();
-
-  }
-
-  public void setResourceStateModelDef(String resourceName, String stateModelDefName)
-  {
-    _resourceStateModelMap.put(resourceName, stateModelDefName);
-  }
-
-  public String getResourceStateModelDef(String resourceName)
-  {
-    return _resourceStateModelMap.get(resourceName);
-  }
-
-  public void setBucketSize(String resource, int bucketSize)
-  {
-    CurrentState curStateMeta = _curStateMetaMap.get(resource);
-    if (curStateMeta == null)
-    {
-      curStateMeta = new CurrentState(resource);
-      _curStateMetaMap.put(resource, curStateMeta);
-    }
-    curStateMeta.setBucketSize(bucketSize);
-  }
-  
-  public int getBucketSize(String resource)
-  {
-    int bucketSize = 0;
-    CurrentState curStateMeta = _curStateMetaMap.get(resource);
-    if (curStateMeta != null)
-    {
-      bucketSize = curStateMeta.getBucketSize();  
-    }
-    
-    return bucketSize;
-  }
-  
-  public void setCurrentState(String resourceName,
-                              Partition partition,
-                              String instanceName,
-                              String state)
-  {
-    if (!_currentStateMap.containsKey(resourceName))
-    {
-      _currentStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
-    }
-    if (!_currentStateMap.get(resourceName).containsKey(partition))
-    {
-      _currentStateMap.get(resourceName).put(partition, new HashMap<String, String>());
-    }
-    _currentStateMap.get(resourceName).get(partition).put(instanceName, state);
-  }
-
-  public void setPendingState(String resourceName,
-                              Partition partition,
-                              String instanceName,
-                              String state)
-  {
-    if (!_pendingStateMap.containsKey(resourceName))
-    {
-      _pendingStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
-    }
-    if (!_pendingStateMap.get(resourceName).containsKey(partition))
-    {
-      _pendingStateMap.get(resourceName).put(partition, new HashMap<String, String>());
-    }
-    _pendingStateMap.get(resourceName).get(partition).put(instanceName, state);
-  }
-
-  /**
-   * given (resource, partition, instance), returns currentState
-   * 
-   * @param resourceName
-   * @param partition
-   * @param instanceName
-   * @return
-   */
-  public String getCurrentState(String resourceName,
-                                Partition partition,
-                                String instanceName)
-  {
-    Map<Partition, Map<String, String>> map = _currentStateMap.get(resourceName);
-    if (map != null)
-    {
-      Map<String, String> instanceStateMap = map.get(partition);
-      if (instanceStateMap != null)
-      {
-        return instanceStateMap.get(instanceName);
-      }
-    }
-    return null;
-  }
-
-  /**
-   * given (resource, partition, instance), returns toState
-   * 
-   * @param resourceName
-   * @param partition
-   * @param instanceName
-   * @return
-   */
-  public String getPendingState(String resourceName,
-                                Partition partition,
-                                String instanceName)
-  {
-    Map<Partition, Map<String, String>> map = _pendingStateMap.get(resourceName);
-    if (map != null)
-    {
-      Map<String, String> instanceStateMap = map.get(partition);
-      if (instanceStateMap != null)
-      {
-        return instanceStateMap.get(instanceName);
-      }
-    }
-    return null;
-  }
-
-  /**
-   * given (resource, partition), returns (instance->currentState) map
-   * 
-   * @param resourceName
-   * @param partition
-   * @return
-   */
-  public Map<String, String> getCurrentStateMap(String resourceName, Partition partition)
-  {
-    if (_currentStateMap.containsKey(resourceName))
-    {
-      Map<Partition, Map<String, String>> map = _currentStateMap.get(resourceName);
-      if (map.containsKey(partition))
-      {
-        return map.get(partition);
-      }
-    }
-    return Collections.emptyMap();
-  }
-
-  /**
-   * given (resource, partition), returns (instance->toState) map
-   * 
-   * @param resourceName
-   * @param partition
-   * @return
-   */
-  public Map<String, String> getPendingStateMap(String resourceName, Partition partition)
-  {
-    if (_pendingStateMap.containsKey(resourceName))
-    {
-      Map<Partition, Map<String, String>> map = _pendingStateMap.get(resourceName);
-      if (map.containsKey(partition))
-      {
-        return map.get(partition);
-      }
-    }
-    return Collections.emptyMap();
-  }
-
-  @Override
-  public String toString()
-  {
-    StringBuilder sb = new StringBuilder();
-    sb.append("current state= ").append(_currentStateMap);
-    sb.append(", pending state= ").append(_pendingStateMap);
-    return sb.toString();
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/ExternalViewComputeStage.java
deleted file mode 100644
index 6bad69f..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ExternalViewComputeStage.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-import com.linkedin.helix.monitoring.mbeans.ClusterStatusMonitor;
-
-public class ExternalViewComputeStage extends AbstractBaseStage
-{
-  private static Logger log = Logger.getLogger(ExternalViewComputeStage.class);
-
-  @Override
-  public void process(ClusterEvent event) throws Exception
-  {
-    long startTime = System.currentTimeMillis();
-    log.info("START ExternalViewComputeStage.process()");
-
-    HelixManager manager = event.getAttribute("helixmanager");
-    Map<String, Resource> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-
-    if (manager == null || resourceMap == null || cache == null)
-    {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires ClusterManager|RESOURCES|DataCache");
-    }
-
-    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
-
-    CurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-
-    List<ExternalView> newExtViews = new ArrayList<ExternalView>();
-    List<PropertyKey> keys = new ArrayList<PropertyKey>();
-    
-    for (String resourceName : resourceMap.keySet())
-    {
-      ExternalView view = new ExternalView(resourceName);
-      view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
-      
-      Resource resource = resourceMap.get(resourceName);
-      for (Partition partition : resource.getPartitions())
-      {
-        Map<String, String> currentStateMap =
-            currentStateOutput.getCurrentStateMap(resourceName, partition);
-        if (currentStateMap != null && currentStateMap.size() > 0)
-        {
-          // Set<String> disabledInstances
-          // = cache.getDisabledInstancesForResource(resource.toString());
-          for (String instance : currentStateMap.keySet())
-          {
-            // if (!disabledInstances.contains(instance))
-            // {
-            view.setState(partition.getPartitionName(),
-                          instance,
-                          currentStateMap.get(instance));
-            // }
-          }
-        }
-      }
-      // Update cluster status monitor mbean
-      ClusterStatusMonitor clusterStatusMonitor =
-          (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-      if (clusterStatusMonitor != null)
-      {
-        clusterStatusMonitor.onExternalViewChange(view,
-                                                  cache._idealStateMap.get(view.getResourceName()));
-      }
-      
-      // compare the new external view with current one, set only on different
-      Map<String, ExternalView> curExtViews =
-          dataAccessor.getChildValuesMap(manager.getHelixDataAccessor()
-                                             .keyBuilder()
-                                             .externalViews());
-
-      ExternalView curExtView = curExtViews.get(resourceName);
-      if (curExtView == null || !curExtView.getRecord().equals(view.getRecord()))
-      {
-        keys.add(manager.getHelixDataAccessor().keyBuilder().externalView(resourceName));
-        newExtViews.add(view);
-        // dataAccessor.setProperty(PropertyType.EXTERNALVIEW, view,
-        // resourceName);
-      }
-    }
-
-    if (newExtViews.size() > 0)
-    {
-      dataAccessor.setChildren(keys, newExtViews);
-    }
-    
-    long endTime = System.currentTimeMillis();
-    log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
-  }
-
-}