You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC

[8/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
new file mode 100644
index 0000000..af0d95e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
@@ -0,0 +1,262 @@
+package org.apache.helix.controller.restlet;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
+import org.restlet.Component;
+import org.restlet.Context;
+import org.restlet.data.Protocol;
+
+/**
+ * Controller side restlet server that receives ZNRecordUpdate requests from
+ * clients, and batch the ZNRecordUpdate and apply them to zookeeper. This is 
+ * to optimize the concurrency level of zookeeper access for ZNRecord updates 
+ * that does not require real-time, like message handling status updates and 
+ * healthcheck reports. 
+ * 
+ * As one server will be used by multiple helix controllers that runs on the same machine,
+ * This class is designed as a singleton. Application is responsible to call init() 
+ * and shutdown() on the getInstance().
+ * */
+public class ZKPropertyTransferServer
+{
+  public static final String PORT = "port";
+  public static String RESTRESOURCENAME = "ZNRecordUpdates";
+  public static final String SERVER = "ZKPropertyTransferServer";
+  
+  // Frequency period for the ZNRecords are batch written to zookeeper 
+  public static int PERIOD = 10 * 1000;
+  // If the buffered ZNRecord updates exceed the limit, do a zookeeper batch update.
+  public static int MAX_UPDATE_LIMIT = 10000;
+  private static Logger LOG = Logger.getLogger(ZKPropertyTransferServer.class);
+  
+  int _localWebservicePort;
+  String _webserviceUrl;
+  ZkBaseDataAccessor<ZNRecord> _accessor;
+  String _zkAddress;
+  
+  AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef
+    = new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
+  
+  boolean _initialized = false;
+  boolean _shutdownFlag = false;
+  Component _component = null;
+  Timer _timer = null;
+  
+  /**
+   * Timertask for zookeeper batched writes
+   * */
+  class ZKPropertyTransferTask extends TimerTask
+  {
+    @Override
+    public void run()
+    {
+      try
+      {
+        sendData();
+      }
+      catch(Throwable t)
+      {
+        LOG.error("", t);
+      }
+    
+    }
+  }
+  
+  void sendData()
+  {
+    LOG.info("ZKPropertyTransferServer transfering data to zookeeper");
+    ConcurrentHashMap<String, ZNRecordUpdate> updateCache  = null;
+    
+    synchronized(_dataBufferRef)
+    {
+      updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
+    }
+    
+    if(updateCache != null)
+    {
+      List<String> paths = new ArrayList<String>();
+      List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
+      List<ZNRecord> vals = new ArrayList<ZNRecord>();
+      // BUGBUG : what if the instance is dropped? 
+      for(ZNRecordUpdate holder : updateCache.values())
+      {
+        paths.add(holder.getPath());
+        updaters.add(holder.getZNRecordUpdater());
+        vals.add(holder.getRecord());
+      }
+      // Batch write the accumulated updates into zookeeper
+      long timeStart = System.currentTimeMillis();
+      if(paths.size() > 0)
+      {
+        _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+      }
+      LOG.info("ZKPropertyTransferServer updated " + vals.size() + " records in " + (System.currentTimeMillis() - timeStart) + " ms");
+    }
+    else
+    {
+      LOG.warn("null _dataQueueRef. Should be in the beginning only");
+    }
+  }
+  
+  static ZKPropertyTransferServer _instance = new ZKPropertyTransferServer();
+  
+  private ZKPropertyTransferServer()
+  {
+    _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
+  }
+  
+  public static ZKPropertyTransferServer getInstance()
+  {
+    return _instance;
+  }
+  
+  public boolean isInitialized()
+  {
+    return _initialized;
+  }
+  
+  public void init(int localWebservicePort, String zkAddress)
+  {
+    if(!_initialized && !_shutdownFlag)
+    {
+      LOG.error("Initializing with port " + localWebservicePort + " zkAddress: " + zkAddress);
+      _localWebservicePort = localWebservicePort;
+      ZkClient zkClient = new ZkClient(zkAddress);
+      zkClient.setZkSerializer(new ZNRecordSerializer());
+      _accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
+      _zkAddress = zkAddress;
+      startServer();
+    }
+    else
+    {
+      LOG.error("Already initialized with port " + _localWebservicePort + " shutdownFlag: " + _shutdownFlag);
+    }
+  }
+  
+  public String getWebserviceUrl()
+  {
+    if(!_initialized || _shutdownFlag)
+    {
+      LOG.debug("inited:" + _initialized + " shutdownFlag:"+_shutdownFlag+" , return");
+      return null;
+    }
+    return _webserviceUrl;
+  }
+  
+  /** Add an ZNRecordUpdate to the change queue. 
+   *  Called by the webservice front-end.
+   *
+   */
+  void enqueueData(ZNRecordUpdate e)
+  {
+    if(!_initialized || _shutdownFlag)
+    {
+      LOG.error("zkDataTransferServer inited:" + _initialized 
+          + " shutdownFlag:"+_shutdownFlag+" , return");
+      return;
+    }
+    // Do local merge if receive multiple update on the same path
+    synchronized(_dataBufferRef)
+    {
+      e.getRecord().setSimpleField(SERVER, _webserviceUrl);
+      if(_dataBufferRef.get().containsKey(e.getPath()))
+      {
+        ZNRecord oldVal = _dataBufferRef.get().get(e.getPath()).getRecord();
+        oldVal = e.getZNRecordUpdater().update(oldVal);
+        _dataBufferRef.get().get(e.getPath())._record = oldVal;
+      }
+      else
+      {
+        _dataBufferRef.get().put(e.getPath(), e);
+      }
+    }
+    if(_dataBufferRef.get().size() > MAX_UPDATE_LIMIT)
+    {
+      sendData();
+    }
+  }
+  
+  void startServer()
+  {
+    LOG.info("zkDataTransferServer starting on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
+    
+    _component = new Component();
+    
+    _component.getServers().add(Protocol.HTTP, _localWebservicePort);
+    Context applicationContext = _component.getContext().createChildContext();
+    applicationContext.getAttributes().put(SERVER, this);
+    applicationContext.getAttributes().put(PORT, "" + _localWebservicePort);
+    ZkPropertyTransferApplication application = new ZkPropertyTransferApplication(
+        applicationContext);
+    // Attach the application to the component and start it
+    _component.getDefaultHost().attach(application);
+    _timer = new Timer(true);
+    _timer.schedule(new ZKPropertyTransferTask(), PERIOD , PERIOD);
+    
+    try
+    {
+      _webserviceUrl 
+        = "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":" + _localWebservicePort 
+              + "/" + RESTRESOURCENAME;
+      _component.start();
+      _initialized = true;
+    }
+    catch (Exception e)
+    {
+      LOG.error("", e);
+    }
+    LOG.info("zkDataTransferServer started on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
+  }
+  
+  public void shutdown()
+  {
+    if(_shutdownFlag)
+    {
+      LOG.error("ZKPropertyTransferServer already has been shutdown...");
+      return;
+    }
+    LOG.info("zkDataTransferServer shuting down on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
+    if(_timer != null)
+    {
+      _timer.cancel();
+    }
+    if(_component != null)
+    {
+      try
+      {
+        _component.stop();
+      }
+      catch (Exception e)
+      {
+        LOG.error("", e);
+      }
+    }
+    _shutdownFlag = true;
+  }
+
+  public void reset()
+  {
+    if(_shutdownFlag == true)
+    {
+      _shutdownFlag = false;
+      _initialized = false;
+      _component = null;
+      _timer = null;
+      _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
new file mode 100644
index 0000000..ee18587
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
@@ -0,0 +1,77 @@
+package org.apache.helix.controller.restlet;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordUpdater;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Unit of transfered ZNRecord updates. Contains the ZNRecord Value, zkPath
+ * to store the update value, and the property type (used to merge the ZNRecord)
+ * For ZNRecord subtraction, it is currently not supported yet. 
+ * */
+public class ZNRecordUpdate
+{
+  public enum OpCode
+  {
+    // TODO: create is not supported; but update will create if not exist
+    CREATE,
+    UPDATE,
+    SET
+  }
+  final String _path;
+  ZNRecord _record;
+  final OpCode _code;
+
+  @JsonCreator
+  public ZNRecordUpdate(@JsonProperty("path")String path, 
+                        @JsonProperty("opcode")OpCode code, 
+                        @JsonProperty("record")ZNRecord record)
+  {
+    _path = path;
+    _record = record;
+    _code = code;
+  }
+  
+  public String getPath()
+  {
+    return _path;
+  }
+  
+  public ZNRecord getRecord()
+  {
+    return _record;
+  }
+  
+  public OpCode getOpcode()
+  {
+    return _code;
+  }
+
+  @JsonIgnore(true)
+  public DataUpdater<ZNRecord> getZNRecordUpdater()
+  {
+    if(_code == OpCode.SET)
+
+    {
+      return new ZNRecordUpdater(_record)
+      {
+        @Override
+        public ZNRecord update(ZNRecord current)
+        {
+          return _record;
+        }
+      };
+    }
+    else if ((_code == OpCode.UPDATE))
+    {
+      return new ZNRecordUpdater(_record);
+    }
+    else
+    {
+      throw new UnsupportedOperationException("Not supported : " + _code);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
new file mode 100644
index 0000000..61aa0a9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
@@ -0,0 +1,80 @@
+package org.apache.helix.controller.restlet;
+
+import java.io.StringReader;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.restlet.data.Form;
+import org.restlet.data.MediaType;
+import org.restlet.data.Status;
+import org.restlet.resource.Representation;
+import org.restlet.resource.Resource;
+
+/**
+ * REST resource for ZkPropertyTransfer server to receive PUT requests 
+ * that submits ZNRecordUpdates
+ * */
+public class ZNRecordUpdateResource  extends Resource
+{
+  public static final String UPDATEKEY = "ZNRecordUpdate";
+  private static Logger LOG = Logger.getLogger(ZNRecordUpdateResource.class);
+  @Override
+  public boolean allowGet()
+  {
+    return false;
+  }
+
+  @Override
+  public boolean allowPost()
+  {
+    return false;
+  }
+
+  @Override
+  public boolean allowPut()
+  {
+    return true;
+  }
+
+  @Override
+  public boolean allowDelete()
+  {
+    return false;
+  }
+  
+  @Override
+  public void storeRepresentation(Representation entity)
+  {
+    try
+    {
+      ZKPropertyTransferServer server = ZKPropertyTransferServer.getInstance();
+      
+      Form form = new Form(entity);
+      String jsonPayload = form.getFirstValue(UPDATEKEY, true);
+      
+      // Parse the map from zkPath --> ZNRecordUpdate from the payload
+      StringReader sr = new StringReader(jsonPayload);
+      ObjectMapper mapper = new ObjectMapper();
+      TypeReference<TreeMap<String, ZNRecordUpdate>> typeRef =
+          new TypeReference<TreeMap<String, ZNRecordUpdate>>()
+          {
+          };
+      Map<String, ZNRecordUpdate> holderMap = mapper.readValue(sr, typeRef);
+      // Enqueue the ZNRecordUpdate for sending
+      for(ZNRecordUpdate holder : holderMap.values())
+      {
+        server.enqueueData(holder);
+        LOG.info("Received " + holder.getPath() + " from " + getRequest().getClientInfo().getAddress());
+      }
+      getResponse().setStatus(Status.SUCCESS_OK);
+    }
+    catch(Exception e)
+    {
+      LOG.error("", e);
+      getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
new file mode 100644
index 0000000..293011b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
@@ -0,0 +1,30 @@
+package org.apache.helix.controller.restlet;
+
+import org.restlet.Application;
+import org.restlet.Context;
+import org.restlet.Restlet;
+import org.restlet.Router;
+
+/**
+ * Restlet application for ZkPropertyTransfer server
+ * */
+public class ZkPropertyTransferApplication extends Application
+{
+  public ZkPropertyTransferApplication()
+  {
+    super();
+  }
+
+  public ZkPropertyTransferApplication(Context context)
+  {
+    super(context);
+  }
+  
+  @Override
+  public Restlet createRoot()
+  {
+    Router router = new Router(getContext());
+    router.attach("/" + ZKPropertyTransferServer.RESTRESOURCENAME, ZNRecordUpdateResource.class);
+    return router;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
new file mode 100644
index 0000000..83d355f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
@@ -0,0 +1,178 @@
+package org.apache.helix.controller.restlet;
+
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.restlet.Client;
+import org.restlet.data.MediaType;
+import org.restlet.data.Method;
+import org.restlet.data.Protocol;
+import org.restlet.data.Reference;
+import org.restlet.data.Request;
+import org.restlet.data.Response;
+import org.restlet.data.Status;
+
+public class ZkPropertyTransferClient
+{
+  private static Logger LOG = Logger.getLogger(ZkPropertyTransferClient.class);
+  public static final int DEFAULT_MAX_CONCURRENTTASKS = 2;
+  public static int SEND_PERIOD = 10 * 1000;
+  
+  public static final String USE_PROPERTYTRANSFER = "UsePropertyTransfer";
+  
+  int _maxConcurrentTasks;
+  ExecutorService _executorService;
+  Client[] _clients;
+  AtomicInteger _requestCount = new AtomicInteger(0);
+  
+  // ZNRecord update buffer: key is the zkPath, value is the ZNRecordUpdate
+  AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef
+    = new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
+  Timer _timer;
+  volatile String _webServiceUrl = "";
+  
+  public ZkPropertyTransferClient(int maxConcurrentTasks)
+  {
+    _maxConcurrentTasks = maxConcurrentTasks;
+    _executorService = Executors.newFixedThreadPool(_maxConcurrentTasks);
+    _clients = new Client[_maxConcurrentTasks];
+    for(int i = 0; i< _clients.length; i++)
+    {
+      _clients[i] = new Client(Protocol.HTTP);
+    }
+    _timer = new Timer(true);
+    _timer.schedule(new SendZNRecordTimerTask(), SEND_PERIOD, SEND_PERIOD);
+    _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
+  }
+  
+  class SendZNRecordTimerTask extends TimerTask
+  {
+    @Override
+    public void run()
+    { 
+      sendUpdateBatch();
+    }
+  }
+  
+  public void enqueueZNRecordUpdate(ZNRecordUpdate update, String webserviceUrl)
+  {
+    try
+    {
+      LOG.info("Enqueue update to " + update.getPath() + " opcode: " + update.getOpcode() + " to " + webserviceUrl);
+      _webServiceUrl = webserviceUrl;
+      update.getRecord().setSimpleField(USE_PROPERTYTRANSFER, "true");
+      synchronized(_dataBufferRef)
+      {
+        if(_dataBufferRef.get().containsKey(update._path))
+        {
+          ZNRecord oldVal = _dataBufferRef.get().get(update.getPath()).getRecord();
+          oldVal = update.getZNRecordUpdater().update(oldVal);
+          _dataBufferRef.get().get(update.getPath())._record = oldVal;
+        }
+        else
+        {
+          _dataBufferRef.get().put(update.getPath(), update);
+        }
+      }
+    }
+    catch(Exception e)
+    {
+      LOG.error("", e);
+    }
+  }
+  
+  void sendUpdateBatch()
+  {
+    LOG.debug("Actual sending update with " + _dataBufferRef.get().size() + " updates to " + _webServiceUrl);
+    Map<String, ZNRecordUpdate> updateCache  = null;
+    
+    synchronized(_dataBufferRef)
+    {
+      updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
+    }
+    
+    if(updateCache != null && updateCache.size() > 0)
+    {
+      ZNRecordUpdateUploadTask task = new ZNRecordUpdateUploadTask(updateCache, _webServiceUrl, _clients[_requestCount.intValue() % _maxConcurrentTasks]);
+      _requestCount.incrementAndGet();
+      _executorService.submit(task);
+      LOG.trace("Queue size :" + ((ThreadPoolExecutor)_executorService).getQueue().size());
+    }
+  }
+  
+  public void shutdown()
+  {
+    LOG.info("Shutting down ZkPropertyTransferClient");
+    _executorService.shutdown();
+    _timer.cancel();
+    for(Client client: _clients)
+    {
+      try
+      {
+        client.stop();
+      }
+      catch (Exception e)
+      {
+        LOG.error("", e);
+      }
+    }
+  }
+  
+  class ZNRecordUpdateUploadTask implements Callable<Void>
+  {
+    Map<String, ZNRecordUpdate> _updateMap;
+    String _webServiceUrl;
+    Client _client;
+    
+    ZNRecordUpdateUploadTask(Map<String, ZNRecordUpdate> update, String webserviceUrl, Client client)
+    {
+      _updateMap = update;
+      _webServiceUrl = webserviceUrl;
+      _client = client;
+    }
+    
+    @Override
+    public Void call() throws Exception
+    {
+      LOG.debug("Actual sending update with " + _updateMap.size() + " updates to " + _webServiceUrl);
+      long time = System.currentTimeMillis();
+      Reference resourceRef = new Reference(_webServiceUrl);
+      Request request = new Request(Method.PUT, resourceRef);
+      
+      ObjectMapper mapper = new ObjectMapper();
+      StringWriter sw = new StringWriter();
+      try
+      {
+        mapper.writeValue(sw, _updateMap);
+      }
+      catch (Exception e)
+      {
+        LOG.error("",e);
+      }
+
+      request.setEntity(
+          ZNRecordUpdateResource.UPDATEKEY + "=" + sw, MediaType.APPLICATION_ALL);
+      // This is a sync call. See com.noelios.restlet.http.StreamClientCall.sendRequest()
+      Response response = _client.handle(request);
+      
+      if(response.getStatus().getCode() != Status.SUCCESS_OK.getCode())
+      {
+        LOG.error("Status : " + response.getStatus());
+      }
+      LOG.info("Using time : " + (System.currentTimeMillis() - time));
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
new file mode 100644
index 0000000..35e5f01
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Restlet server for Helix controller
+ * 
+ */
+package org.apache.helix.controller.restlet;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
new file mode 100644
index 0000000..be44ba9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+public enum AttributeName
+{
+  RESOURCES,
+  BEST_POSSIBLE_STATE,
+  CURRENT_STATE,
+  MESSAGES_ALL,
+  MESSAGES_SELECTED,
+  MESSAGES_THROTTLE,
+  LOCAL_STATE
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
new file mode 100644
index 0000000..8ba4a19
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -0,0 +1,532 @@
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.log4j.Logger;
+
+
+/**
+ * For partition compute best possible (instance,state) pair based on
+ * IdealState,StateModel,LiveInstance
+ * 
+ * @author kgopalak
+ * 
+ */
+// TODO: refactor this
+public class BestPossibleStateCalcStage extends AbstractBaseStage
+{
+  private static final Logger logger =
+      Logger.getLogger(BestPossibleStateCalcStage.class.getName());
+
+  @Override
+  public void process(ClusterEvent event) throws Exception
+  {
+    long startTime = System.currentTimeMillis();
+    logger.info("START BestPossibleStateCalcStage.process()");
+
+    CurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    Map<String, Resource> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+
+    if (currentStateOutput == null || resourceMap == null || cache == null)
+    {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires CURRENT_STATE|RESOURCES|DataCache");
+    }
+
+    BestPossibleStateOutput bestPossibleStateOutput =
+        compute(event, resourceMap, currentStateOutput);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(),
+                       bestPossibleStateOutput);
+
+    long endTime = System.currentTimeMillis();
+    logger.info("END BestPossibleStateCalcStage.process(). took: "
+        + (endTime - startTime) + " ms");
+  }
+
+  private BestPossibleStateOutput compute(ClusterEvent event,
+                                          Map<String, Resource> resourceMap,
+                                          CurrentStateOutput currentStateOutput)
+  {
+    // for each ideal state
+    // read the state model def
+    // for each resource
+    // get the preference list
+    // for each instanceName check if its alive then assign a state
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    HelixManager manager = event.getAttribute("helixmanager");
+
+    BestPossibleStateOutput output = new BestPossibleStateOutput();
+
+    for (String resourceName : resourceMap.keySet())
+    {
+      logger.debug("Processing resource:" + resourceName);
+
+      Resource resource = resourceMap.get(resourceName);
+      // Ideal state may be gone. In that case we need to get the state model name
+      // from the current state
+      IdealState idealState = cache.getIdealState(resourceName);
+
+      String stateModelDefName;
+
+      if (idealState == null)
+      {
+        // if ideal state is deleted, use an empty one
+        logger.info("resource:" + resourceName + " does not exist anymore");
+        stateModelDefName = currentStateOutput.getResourceStateModelDef(resourceName);
+        idealState = new IdealState(resourceName);
+      }
+      else
+      {
+        stateModelDefName = idealState.getStateModelDefRef();
+      }
+
+      StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+      if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO_REBALANCE)
+      {
+        calculateAutoBalancedIdealState(cache,
+                                        idealState,
+                                        stateModelDef,
+                                        currentStateOutput);
+      }
+
+      
+      for (Partition partition : resource.getPartitions())
+      {
+        Map<String, String> currentStateMap =
+            currentStateOutput.getCurrentStateMap(resourceName, partition);
+
+        Map<String, String> bestStateForPartition;
+        Set<String> disabledInstancesForPartition =
+            cache.getDisabledInstancesForPartition(partition.toString());
+
+        if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
+        {
+          Map<String, String> idealStateMap =
+              idealState.getInstanceStateMap(partition.getPartitionName());
+          bestStateForPartition =
+              computeCustomizedBestStateForPartition(cache,
+                                                     stateModelDef,
+                                                     idealStateMap,
+                                                     currentStateMap,
+                                                     disabledInstancesForPartition);
+        }
+        else
+        // both AUTO and AUTO_REBALANCE mode
+        {
+          List<String> instancePreferenceList =
+              getPreferenceList(cache, partition, idealState, stateModelDef);
+          
+          bestStateForPartition =
+              computeAutoBestStateForPartition(cache,
+                                               stateModelDef,
+                                               instancePreferenceList,
+                                               currentStateMap,
+                                               disabledInstancesForPartition);
+        }
+        output.setState(resourceName, partition, bestStateForPartition);
+      }
+    }
+    return output;
+  }
+
+  /**
+   * Compute best state for resource in AUTO_REBALANCE ideal state mode. the algorithm
+   * will make sure that the master partition are evenly distributed; Also when instances
+   * are added / removed, the amount of diff in master partitions are minimized
+   * 
+   * @param cache
+   * @param idealState
+   * @param instancePreferenceList
+   * @param stateModelDef
+   * @param currentStateOutput
+   * @return
+   */
+  private void calculateAutoBalancedIdealState(ClusterDataCache cache,
+                                               IdealState idealState,
+                                               StateModelDefinition stateModelDef,
+                                               CurrentStateOutput currentStateOutput)
+  {
+    String topStateValue = stateModelDef.getStatesPriorityList().get(0);
+    Set<String> liveInstances = cache._liveInstanceMap.keySet();
+    // Obtain replica number
+    int replicas = 1;
+    try
+    {
+      replicas = Integer.parseInt(idealState.getReplicas());
+    }
+    catch (Exception e)
+    {
+      logger.error("", e);
+    }
+    // Init for all partitions with empty list
+    Map<String, List<String>> defaultListFields = new TreeMap<String, List<String>>();
+    List<String> emptyList = new ArrayList<String>(0);
+    for (String partition : idealState.getPartitionSet())
+    {
+      defaultListFields.put(partition, emptyList);
+    }
+    idealState.getRecord().setListFields(defaultListFields);
+    // Return if no live instance
+    if (liveInstances.size() == 0)
+    {
+      logger.info("No live instances, return. Idealstate : "
+          + idealState.getResourceName());
+      return;
+    }
+    Map<String, List<String>> masterAssignmentMap = new HashMap<String, List<String>>();
+    for (String instanceName : liveInstances)
+    {
+      masterAssignmentMap.put(instanceName, new ArrayList<String>());
+    }
+    Set<String> orphanedPartitions = new HashSet<String>();
+    orphanedPartitions.addAll(idealState.getPartitionSet());
+    // Go through all current states and fill the assignments
+    for (String liveInstanceName : liveInstances)
+    {
+      CurrentState currentState =
+          cache.getCurrentState(liveInstanceName,
+                                cache.getLiveInstances()
+                                     .get(liveInstanceName)
+                                     .getSessionId()).get(idealState.getId());
+      if (currentState != null)
+      {
+        Map<String, String> partitionStates = currentState.getPartitionStateMap();
+        for (String partitionName : partitionStates.keySet())
+        {
+          String state = partitionStates.get(partitionName);
+          if (state.equals(topStateValue))
+          {
+            masterAssignmentMap.get(liveInstanceName).add(partitionName);
+            orphanedPartitions.remove(partitionName);
+          }
+        }
+      }
+    }
+    List<String> orphanedPartitionsList = new ArrayList<String>();
+    orphanedPartitionsList.addAll(orphanedPartitions);
+    normalizeAssignmentMap(masterAssignmentMap, orphanedPartitionsList);
+    idealState.getRecord()
+              .setListFields(generateListFieldFromMasterAssignment(masterAssignmentMap,
+                                                                   replicas));
+
+  }
+
+  /**
+   * Given the current master assignment map and the partitions not hosted, generate an
+   * evenly distributed partition assignment map
+   * 
+   * @param masterAssignmentMap
+   *          current master assignment map
+   * @param orphanPartitions
+   *          partitions not hosted by any instance
+   * @return
+   */
+  private void normalizeAssignmentMap(Map<String, List<String>> masterAssignmentMap,
+                                      List<String> orphanPartitions)
+  {
+    int totalPartitions = 0;
+    String[] instanceNames = new String[masterAssignmentMap.size()];
+    masterAssignmentMap.keySet().toArray(instanceNames);
+    Arrays.sort(instanceNames);
+    // Find out total partition number
+    for (String key : masterAssignmentMap.keySet())
+    {
+      totalPartitions += masterAssignmentMap.get(key).size();
+      Collections.sort(masterAssignmentMap.get(key));
+    }
+    totalPartitions += orphanPartitions.size();
+
+    // Find out how many partitions an instance should host
+    int partitionNumber = totalPartitions / masterAssignmentMap.size();
+    int leave = totalPartitions % masterAssignmentMap.size();
+
+    for (int i = 0; i < instanceNames.length; i++)
+    {
+      int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
+      leave--;
+      // For hosts that has more partitions, move those partitions to "orphaned"
+      while (masterAssignmentMap.get(instanceNames[i]).size() > targetPartitionNo)
+      {
+        int lastElementIndex = masterAssignmentMap.get(instanceNames[i]).size() - 1;
+        orphanPartitions.add(masterAssignmentMap.get(instanceNames[i])
+                                                .get(lastElementIndex));
+        masterAssignmentMap.get(instanceNames[i]).remove(lastElementIndex);
+      }
+    }
+    leave = totalPartitions % masterAssignmentMap.size();
+    Collections.sort(orphanPartitions);
+    // Assign "orphaned" partitions to hosts that do not have enough partitions
+    for (int i = 0; i < instanceNames.length; i++)
+    {
+      int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
+      leave--;
+      while (masterAssignmentMap.get(instanceNames[i]).size() < targetPartitionNo)
+      {
+        int lastElementIndex = orphanPartitions.size() - 1;
+        masterAssignmentMap.get(instanceNames[i])
+                           .add(orphanPartitions.get(lastElementIndex));
+        orphanPartitions.remove(lastElementIndex);
+      }
+    }
+    if (orphanPartitions.size() > 0)
+    {
+      logger.error("orphanPartitions still contains elements");
+    }
+  }
+
+  /**
+   * Generate full preference list from the master assignment map evenly distribute the
+   * slave partitions mastered on a host to other hosts
+   * 
+   * @param masterAssignmentMap
+   *          current master assignment map
+   * @param orphanPartitions
+   *          partitions not hosted by any instance
+   * @return
+   */
+  Map<String, List<String>> generateListFieldFromMasterAssignment(Map<String, List<String>> masterAssignmentMap,
+                                                                  int replicas)
+  {
+    Map<String, List<String>> listFields = new HashMap<String, List<String>>();
+    int slaves = replicas - 1;
+    String[] instanceNames = new String[masterAssignmentMap.size()];
+    masterAssignmentMap.keySet().toArray(instanceNames);
+    Arrays.sort(instanceNames);
+
+    for (int i = 0; i < instanceNames.length; i++)
+    {
+      String instanceName = instanceNames[i];
+      List<String> otherInstances = new ArrayList<String>(masterAssignmentMap.size() - 1);
+      for (int x = 0; x < instanceNames.length - 1; x++)
+      {
+        int index = (x + i + 1) % instanceNames.length;
+        otherInstances.add(instanceNames[index]);
+      }
+
+      List<String> partitionList = masterAssignmentMap.get(instanceName);
+      for (int j = 0; j < partitionList.size(); j++)
+      {
+        String partitionName = partitionList.get(j);
+        listFields.put(partitionName, new ArrayList<String>());
+        listFields.get(partitionName).add(instanceName);
+
+        int slavesCanAssign = Math.min(slaves, otherInstances.size());
+        for (int k = 0; k < slavesCanAssign; k++)
+        {
+          int index = (j + k + 1) % otherInstances.size();
+          listFields.get(partitionName).add(otherInstances.get(index));
+        }
+      }
+    }
+    return listFields;
+  }
+
+  /**
+   * compute best state for resource in AUTO ideal state mode
+   * 
+   * @param cache
+   * @param stateModelDef
+   * @param instancePreferenceList
+   * @param currentStateMap
+   *          : instance->state for each partition
+   * @param disabledInstancesForPartition
+   * @return
+   */
+  private Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
+                                                               StateModelDefinition stateModelDef,
+                                                               List<String> instancePreferenceList,
+                                                               Map<String, String> currentStateMap,
+                                                               Set<String> disabledInstancesForPartition)
+  {
+    Map<String, String> instanceStateMap = new HashMap<String, String>();
+
+    // if the ideal state is deleted, instancePreferenceList will be empty and
+    // we should drop all resources.
+    if (currentStateMap != null)
+    {
+      for (String instance : currentStateMap.keySet())
+      {
+        if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
+            && !"ERROR".equals(currentStateMap.get(instance)))
+        {
+          // move to DROPPED state only if not in ERROR state
+          instanceStateMap.put(instance, "DROPPED");
+        }
+        else if (!"ERROR".equals(currentStateMap.get(instance))
+            && disabledInstancesForPartition.contains(instance))
+        {
+          // if a non-error node is disabled, put it into initial state (OFFLINE)
+          instanceStateMap.put(instance, stateModelDef.getInitialState());
+        }
+      }
+    }
+
+    // ideal state is deleted
+    if (instancePreferenceList == null)
+    {
+      return instanceStateMap;
+    }
+
+    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+    boolean assigned[] = new boolean[instancePreferenceList.size()];
+
+    Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+
+    for (String state : statesPriorityList)
+    {
+      String num = stateModelDef.getNumInstancesPerState(state);
+      int stateCount = -1;
+      if ("N".equals(num))
+      {
+        Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
+        liveAndEnabled.removeAll(disabledInstancesForPartition);
+        stateCount = liveAndEnabled.size();
+      }
+      else if ("R".equals(num))
+      {
+        stateCount = instancePreferenceList.size();
+      }
+      else
+      {
+        try
+        {
+          stateCount = Integer.parseInt(num);
+        }
+        catch (Exception e)
+        {
+          logger.error("Invalid count for state:" + state + " ,count=" + num);
+        }
+      }
+      if (stateCount > -1)
+      {
+        int count = 0;
+        for (int i = 0; i < instancePreferenceList.size(); i++)
+        {
+          String instanceName = instancePreferenceList.get(i);
+
+          boolean notInErrorState =
+              currentStateMap == null
+                  || !"ERROR".equals(currentStateMap.get(instanceName));
+
+          if (liveInstancesMap.containsKey(instanceName) && !assigned[i]
+              && notInErrorState && !disabledInstancesForPartition.contains(instanceName))
+          {
+            instanceStateMap.put(instanceName, state);
+            count = count + 1;
+            assigned[i] = true;
+            if (count == stateCount)
+            {
+              break;
+            }
+          }
+        }
+      }
+    }
+    return instanceStateMap;
+  }
+
+  /**
+   * compute best state for resource in CUSTOMIZED ideal state mode
+   * 
+   * @param cache
+   * @param stateModelDef
+   * @param idealStateMap
+   * @param currentStateMap
+   * @param disabledInstancesForPartition
+   * @return
+   */
+  private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache cache,
+                                                                     StateModelDefinition stateModelDef,
+                                                                     Map<String, String> idealStateMap,
+                                                                     Map<String, String> currentStateMap,
+                                                                     Set<String> disabledInstancesForPartition)
+  {
+    Map<String, String> instanceStateMap = new HashMap<String, String>();
+
+    // if the ideal state is deleted, idealStateMap will be null/empty and
+    // we should drop all resources.
+    if (currentStateMap != null)
+    {
+      for (String instance : currentStateMap.keySet())
+      {
+        if ( (idealStateMap == null || !idealStateMap.containsKey(instance))
+            && !"ERROR".equals(currentStateMap.get(instance)))
+        {
+          // move to DROPPED state only if not in ERROR state
+          instanceStateMap.put(instance, "DROPPED");
+        }
+        else if (!"ERROR".equals(currentStateMap.get(instance))
+            && disabledInstancesForPartition.contains(instance))
+        {
+          // if a non-error node is disabled, put it into initial state (OFFLINE)
+          instanceStateMap.put(instance, stateModelDef.getInitialState());
+        }
+      }
+    }
+
+    // ideal state is deleted
+    if (idealStateMap == null)
+    {
+      return instanceStateMap;
+    }
+
+    Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+    for (String instance : idealStateMap.keySet())
+    {
+      boolean notInErrorState =
+          currentStateMap == null || !"ERROR".equals(currentStateMap.get(instance));
+
+      if (liveInstancesMap.containsKey(instance) && notInErrorState
+          && !disabledInstancesForPartition.contains(instance))
+      {
+        instanceStateMap.put(instance, idealStateMap.get(instance));
+      }
+    }
+
+    return instanceStateMap;
+  }
+
+  private List<String> getPreferenceList(ClusterDataCache cache,
+                                         Partition resource,
+                                         IdealState idealState,
+                                         StateModelDefinition stateModelDef)
+  {
+    List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
+
+    if (listField != null && listField.size() == 1
+        && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0)))
+    {
+      Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+      List<String> prefList = new ArrayList<String>(liveInstances.keySet());
+      Collections.sort(prefList);
+      return prefList;
+    }
+    else
+    {
+      return listField;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
new file mode 100644
index 0000000..63f6eec
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.model.Partition;
+
+
+public class BestPossibleStateOutput
+{
+  // resource->partition->instance->state
+  Map<String, Map<Partition, Map<String, String>>> _dataMap;
+
+  public BestPossibleStateOutput()
+  {
+    _dataMap = new HashMap<String, Map<Partition, Map<String, String>>>();
+  }
+
+  public void setState(String resourceName, Partition resource,
+      Map<String, String> bestInstanceStateMappingForResource)
+  {
+    if (!_dataMap.containsKey(resourceName))
+    {
+      _dataMap.put(resourceName,
+          new HashMap<Partition, Map<String, String>>());
+    }
+    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
+    map.put(resource, bestInstanceStateMappingForResource);
+  }
+
+  public Map<String, String> getInstanceStateMap(String resourceName,
+      Partition resource)
+  {
+    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
+    if (map != null)
+    {
+      return map.get(resource);
+    }
+    return Collections.emptyMap();
+  }
+
+  public Map<Partition, Map<String, String>> getResourceMap(String resourceName)
+  {
+    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
+    if (map != null)
+    {
+      return map;
+    }
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public String toString()
+  {
+    return _dataMap.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
new file mode 100644
index 0000000..914cc0d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -0,0 +1,266 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Reads the data from the cluster using data accessor. This output ClusterData which
+ * provides useful methods to search/lookup properties
+ * 
+ * @author kgopalak
+ * 
+ */
+public class ClusterDataCache
+{
+
+  Map<String, LiveInstance>                           _liveInstanceMap;
+  Map<String, IdealState>                             _idealStateMap;
+  Map<String, StateModelDefinition>                   _stateModelDefMap;
+  Map<String, InstanceConfig>                         _instanceConfigMap;
+  Map<String, ClusterConstraints>                     _constraintMap;
+  Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
+  Map<String, Map<String, Message>>                   _messageMap;
+
+  // Map<String, Map<String, HealthStat>> _healthStatMap;
+  // private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
+  // private PersistentStats _persistentStats;
+  // private Alerts _alerts;
+  // private AlertStatus _alertStatus;
+
+  private static final Logger                         LOG =
+                                                              Logger.getLogger(ClusterDataCache.class.getName());
+
+  public boolean refresh(HelixDataAccessor accessor)
+  {
+    Builder keyBuilder = accessor.keyBuilder();
+    _idealStateMap = accessor.getChildValuesMap(keyBuilder.idealStates());
+    _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+
+    for (LiveInstance instance : _liveInstanceMap.values())
+    {
+      LOG.trace("live instance: " + instance.getInstanceName() + " "
+          + instance.getSessionId());
+    }
+
+    _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
+    _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+    _constraintMap =
+        accessor.getChildValuesMap(keyBuilder.constraints());
+
+    Map<String, Map<String, Message>> msgMap =
+        new HashMap<String, Map<String, Message>>();
+    for (String instanceName : _liveInstanceMap.keySet())
+    {
+      Map<String, Message> map =
+          accessor.getChildValuesMap(keyBuilder.messages(instanceName));
+      msgMap.put(instanceName, map);
+    }
+    _messageMap = Collections.unmodifiableMap(msgMap);
+
+    Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap =
+        new HashMap<String, Map<String, Map<String, CurrentState>>>();
+    for (String instanceName : _liveInstanceMap.keySet())
+    {
+      LiveInstance liveInstance = _liveInstanceMap.get(instanceName);
+      String sessionId = liveInstance.getSessionId();
+      if (!allCurStateMap.containsKey(instanceName))
+      {
+        allCurStateMap.put(instanceName, new HashMap<String, Map<String, CurrentState>>());
+      }
+      Map<String, Map<String, CurrentState>> curStateMap =
+          allCurStateMap.get(instanceName);
+      Map<String, CurrentState> map =
+          accessor.getChildValuesMap(keyBuilder.currentStates(instanceName, sessionId));
+      curStateMap.put(sessionId, map);
+    }
+
+    for (String instance : allCurStateMap.keySet())
+    {
+      allCurStateMap.put(instance,
+                         Collections.unmodifiableMap(allCurStateMap.get(instance)));
+    }
+    _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
+
+    return true;
+  }
+
+  public Map<String, IdealState> getIdealStates()
+  {
+    return _idealStateMap;
+  }
+
+  public Map<String, LiveInstance> getLiveInstances()
+  {
+    return _liveInstanceMap;
+  }
+
+  public Map<String, CurrentState> getCurrentState(String instanceName,
+                                                   String clientSessionId)
+  {
+    return _currentStateMap.get(instanceName).get(clientSessionId);
+  }
+
+  public Map<String, Message> getMessages(String instanceName)
+  {
+    Map<String, Message> map = _messageMap.get(instanceName);
+    if (map != null)
+    {
+      return map;
+    }
+    else
+    {
+      return Collections.emptyMap();
+    }
+  }
+
+  // public HealthStat getGlobalStats()
+  // {
+  // return _globalStats;
+  // }
+  //
+  // public PersistentStats getPersistentStats()
+  // {
+  // return _persistentStats;
+  // }
+  //
+  // public Alerts getAlerts()
+  // {
+  // return _alerts;
+  // }
+  //
+  // public AlertStatus getAlertStatus()
+  // {
+  // return _alertStatus;
+  // }
+  //
+  // public Map<String, HealthStat> getHealthStats(String instanceName)
+  // {
+  // Map<String, HealthStat> map = _healthStatMap.get(instanceName);
+  // if (map != null)
+  // {
+  // return map;
+  // } else
+  // {
+  // return Collections.emptyMap();
+  // }
+  // }
+
+  public StateModelDefinition getStateModelDef(String stateModelDefRef)
+  {
+
+    return _stateModelDefMap.get(stateModelDefRef);
+  }
+
+  public IdealState getIdealState(String resourceName)
+  {
+    return _idealStateMap.get(resourceName);
+  }
+
+  public Map<String, InstanceConfig> getInstanceConfigMap()
+  {
+    return _instanceConfigMap;
+  }
+
+  public Set<String> getDisabledInstancesForPartition(String partition)
+  {
+    Set<String> disabledInstancesSet = new HashSet<String>();
+    for (String instance : _instanceConfigMap.keySet())
+    {
+      InstanceConfig config = _instanceConfigMap.get(instance);
+      if (config.getInstanceEnabled() == false
+          || config.getInstanceEnabledForPartition(partition) == false)
+      {
+        disabledInstancesSet.add(instance);
+      }
+    }
+    return disabledInstancesSet;
+  }
+
+  public int getReplicas(String resourceName)
+  {
+    int replicas = -1;
+
+    if (_idealStateMap.containsKey(resourceName))
+    {
+      String replicasStr = _idealStateMap.get(resourceName).getReplicas();
+
+      if (replicasStr != null)
+      {
+        if (replicasStr.equals(StateModelToken.ANY_LIVEINSTANCE.toString()))
+        {
+          replicas = _liveInstanceMap.size();
+        }
+        else
+        {
+          try
+          {
+            replicas = Integer.parseInt(replicasStr);
+          }
+          catch (Exception e)
+          {
+            LOG.error("invalid replicas string: " + replicasStr);
+          }
+        }
+      }
+      else
+      {
+        LOG.error("idealState for resource: " + resourceName + " does NOT have replicas");
+      }
+    }
+    return replicas;
+  }
+
+  public ClusterConstraints getConstraint(ConstraintType type)
+  {
+    if (_constraintMap != null)
+    {
+      return _constraintMap.get(type.toString());
+    }
+    return null;
+  }
+
+  @Override
+  public String toString()
+  {
+    StringBuilder sb = new StringBuilder();
+    sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n");
+    sb.append("idealStateMap:" + _idealStateMap).append("\n");
+    sb.append("stateModelDefMap:" + _stateModelDefMap).append("\n");
+    sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
+    sb.append("messageMap:" + _messageMap).append("\n");
+
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
new file mode 100644
index 0000000..c7de9e6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class ClusterEvent
+{
+  private static final Logger logger = Logger.getLogger(ClusterEvent.class
+      .getName());
+  private final String _eventName;
+  private final Map<String, Object> _eventAttributeMap;
+
+  public ClusterEvent(String name)
+  {
+    _eventName = name;
+    _eventAttributeMap = new HashMap<String, Object>();
+  }
+
+  public void addAttribute(String attrName, Object attrValue)
+  {
+    if (logger.isTraceEnabled())
+    {
+      logger.trace("Adding attribute:" + attrName);
+      logger.trace(" attribute value:" + attrValue);
+    }
+   
+    _eventAttributeMap.put(attrName, attrValue);
+  }
+
+  public String getName()
+  {
+    return _eventName;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T extends Object> T getAttribute(String attrName)
+  {
+    Object ret = _eventAttributeMap.get(attrName);
+    if (ret != null)
+    {
+      return (T) ret;
+    }
+    return null;
+  }
+  
+  @Override
+  public String toString()
+  {
+    StringBuilder sb = new StringBuilder();
+    sb.append("name:"+ _eventName).append("\n");
+    for(String key:_eventAttributeMap.keySet()){
+      sb.append(key).append(":").append(_eventAttributeMap.get(key)).append("\n");
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
new file mode 100644
index 0000000..7dd0f28
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.LiveInstance;
+import org.apache.log4j.Logger;
+
+
+public class CompatibilityCheckStage extends AbstractBaseStage
+{
+  private static final Logger LOG = Logger
+      .getLogger(CompatibilityCheckStage.class.getName());
+
+  /**
+   * INCOMPATIBLE_MAP stores primary version pairs:
+   *  {controllerPrimaryVersion, participantPrimaryVersion}
+   *  that are incompatible
+   */
+  private static final Map<String, Boolean> INCOMPATIBLE_MAP;
+  static
+  {
+      Map<String, Boolean> map = new HashMap<String, Boolean>();
+      /**
+       * {controllerPrimaryVersion,participantPrimaryVersion} -> false
+       */
+      map.put("0.4,0.3", false);
+      INCOMPATIBLE_MAP = Collections.unmodifiableMap(map);
+  }
+
+  private String getPrimaryVersion(String version)
+  {
+    String[] splits = version.split("\\.");
+    if (splits == null || splits.length != 3)
+    {
+      return null;
+    }
+    return version.substring(0, version.lastIndexOf('.'));
+  }
+
+  private boolean isCompatible(String controllerVersion, String participantVersion)
+  {
+    if (participantVersion == null)
+    {
+      LOG.warn("Missing version of participant. Skip version check.");
+      return true;
+    }
+
+    // compare primary version
+    String controllerPrimaryVersion = getPrimaryVersion(controllerVersion);
+    String participantPrimaryVersion = getPrimaryVersion(participantVersion);
+    if (controllerPrimaryVersion != null && participantPrimaryVersion != null)
+    {
+      if (controllerPrimaryVersion.compareTo(participantPrimaryVersion) < 0)
+      {
+        LOG.info("Controller primary version is less than participant primary version.");
+        return false;
+      }
+      else
+      {
+        if (INCOMPATIBLE_MAP.containsKey(controllerPrimaryVersion + "," + participantPrimaryVersion))
+        {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void process(ClusterEvent event) throws Exception
+  {
+    HelixManager manager = event.getAttribute("helixmanager");
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    if (manager == null || cache == null)
+    {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires HelixManager | DataCache");
+    }
+
+    String controllerVersion = manager.getVersion();
+    if (controllerVersion == null)
+    {
+      String errorMsg = "Missing version of controller: " + manager.getInstanceName()
+          + ". Pipeline will not continue.";
+      LOG.error(errorMsg);
+      throw new StageException(errorMsg);
+    }
+
+    Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
+    for (LiveInstance liveInstance : liveInstanceMap.values())
+    {
+      String participantVersion = liveInstance.getHelixVersion();
+      if (!isCompatible(controllerVersion, participantVersion))
+      {
+        String errorMsg = "cluster manager versions are incompatible; pipeline will not continue. "
+                        + "controller:" + manager.getInstanceName() + ", controllerVersion:" + controllerVersion
+                        + "; participant:" + liveInstance.getInstanceName() + ", participantVersion:" + participantVersion;
+        LOG.error(errorMsg);
+        throw new StageException(errorMsg);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
new file mode 100644
index 0000000..e88e0a6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.Message.MessageType;
+
+
+/**
+ * For each LiveInstances select currentState and message whose sessionId matches
+ * sessionId from LiveInstance Get Partition,State for all the resources computed in
+ * previous State [ResourceComputationStage]
+ * 
+ * @author kgopalak
+ * 
+ */
+public class CurrentStateComputationStage extends AbstractBaseStage
+{
+  @Override
+  public void process(ClusterEvent event) throws Exception
+  {
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    Map<String, Resource> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+
+    if (cache == null || resourceMap == null)
+    {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires DataCache|RESOURCE");
+    }
+
+    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+
+    for (LiveInstance instance : liveInstances.values())
+    {
+      String instanceName = instance.getInstanceName();
+      Map<String, Message> instanceMessages = cache.getMessages(instanceName);
+      for (Message message : instanceMessages.values())
+      {
+        if (!MessageType.STATE_TRANSITION.toString()
+                                         .equalsIgnoreCase(message.getMsgType()))
+        {
+          continue;
+        }
+        if (!instance.getSessionId().equals(message.getTgtSessionId()))
+        {
+          continue;
+        }
+        String resourceName = message.getResourceName();
+        Resource resource = resourceMap.get(resourceName);
+        if (resource == null)
+        {
+          continue;
+        }
+
+        if (!message.getGroupMessageMode())
+        {
+          String partitionName = message.getPartitionName();
+          Partition partition = resource.getPartition(partitionName);
+          if (partition != null)
+          {
+            currentStateOutput.setPendingState(resourceName,
+                                               partition,
+                                               instanceName,
+                                               message.getToState());
+          }
+          else
+          {
+            // log
+          }
+        }
+        else
+        {
+          List<String> partitionNames = message.getPartitionNames();
+          if (!partitionNames.isEmpty())
+          {
+            for (String partitionName : partitionNames)
+            {
+              Partition partition = resource.getPartition(partitionName);
+              if (partition != null)
+              {
+                currentStateOutput.setPendingState(resourceName,
+                                                   partition,
+                                                   instanceName,
+                                                   message.getToState());
+              }
+              else
+              {
+                // log
+              }
+            }
+          }
+        }
+      }
+    }
+    for (LiveInstance instance : liveInstances.values())
+    {
+      String instanceName = instance.getInstanceName();
+
+      String clientSessionId = instance.getSessionId();
+      Map<String, CurrentState> currentStateMap =
+          cache.getCurrentState(instanceName, clientSessionId);
+      for (CurrentState currentState : currentStateMap.values())
+      {
+
+        if (!instance.getSessionId().equals(currentState.getSessionId()))
+        {
+          continue;
+        }
+        String resourceName = currentState.getResourceName();
+        String stateModelDefName = currentState.getStateModelDefRef();
+        Resource resource = resourceMap.get(resourceName);
+        if (resource == null)
+        {
+          continue;
+        }
+        if (stateModelDefName != null)
+        {
+          currentStateOutput.setResourceStateModelDef(resourceName, stateModelDefName);
+        }
+
+        currentStateOutput.setBucketSize(resourceName, currentState.getBucketSize());
+
+        Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
+        for (String partitionName : partitionStateMap.keySet())
+        {
+          Partition partition = resource.getPartition(partitionName);
+          if (partition != null)
+          {
+            currentStateOutput.setCurrentState(resourceName,
+                                               partition,
+                                               instanceName,
+                                               currentState.getState(partitionName));
+
+          }
+          else
+          {
+            // log
+          }
+        }
+      }
+    }
+    event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
new file mode 100644
index 0000000..dde5949
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -0,0 +1,205 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Partition;
+
+
+public class CurrentStateOutput
+{
+  private final Map<String, Map<Partition, Map<String, String>>> _currentStateMap;
+  private final Map<String, Map<Partition, Map<String, String>>> _pendingStateMap;
+  private final Map<String, String>                              _resourceStateModelMap;
+  private final Map<String, CurrentState>                        _curStateMetaMap;
+
+  public CurrentStateOutput()
+  {
+    _currentStateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
+    _pendingStateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
+    _resourceStateModelMap = new HashMap<String, String>();
+    _curStateMetaMap = new HashMap<String, CurrentState>();
+
+  }
+
+  public void setResourceStateModelDef(String resourceName, String stateModelDefName)
+  {
+    _resourceStateModelMap.put(resourceName, stateModelDefName);
+  }
+
+  public String getResourceStateModelDef(String resourceName)
+  {
+    return _resourceStateModelMap.get(resourceName);
+  }
+
+  public void setBucketSize(String resource, int bucketSize)
+  {
+    CurrentState curStateMeta = _curStateMetaMap.get(resource);
+    if (curStateMeta == null)
+    {
+      curStateMeta = new CurrentState(resource);
+      _curStateMetaMap.put(resource, curStateMeta);
+    }
+    curStateMeta.setBucketSize(bucketSize);
+  }
+  
+  public int getBucketSize(String resource)
+  {
+    int bucketSize = 0;
+    CurrentState curStateMeta = _curStateMetaMap.get(resource);
+    if (curStateMeta != null)
+    {
+      bucketSize = curStateMeta.getBucketSize();  
+    }
+    
+    return bucketSize;
+  }
+  
+  public void setCurrentState(String resourceName,
+                              Partition partition,
+                              String instanceName,
+                              String state)
+  {
+    if (!_currentStateMap.containsKey(resourceName))
+    {
+      _currentStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+    }
+    if (!_currentStateMap.get(resourceName).containsKey(partition))
+    {
+      _currentStateMap.get(resourceName).put(partition, new HashMap<String, String>());
+    }
+    _currentStateMap.get(resourceName).get(partition).put(instanceName, state);
+  }
+
+  public void setPendingState(String resourceName,
+                              Partition partition,
+                              String instanceName,
+                              String state)
+  {
+    if (!_pendingStateMap.containsKey(resourceName))
+    {
+      _pendingStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+    }
+    if (!_pendingStateMap.get(resourceName).containsKey(partition))
+    {
+      _pendingStateMap.get(resourceName).put(partition, new HashMap<String, String>());
+    }
+    _pendingStateMap.get(resourceName).get(partition).put(instanceName, state);
+  }
+
+  /**
+   * given (resource, partition, instance), returns currentState
+   * 
+   * @param resourceName
+   * @param partition
+   * @param instanceName
+   * @return
+   */
+  public String getCurrentState(String resourceName,
+                                Partition partition,
+                                String instanceName)
+  {
+    Map<Partition, Map<String, String>> map = _currentStateMap.get(resourceName);
+    if (map != null)
+    {
+      Map<String, String> instanceStateMap = map.get(partition);
+      if (instanceStateMap != null)
+      {
+        return instanceStateMap.get(instanceName);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * given (resource, partition, instance), returns toState
+   * 
+   * @param resourceName
+   * @param partition
+   * @param instanceName
+   * @return
+   */
+  public String getPendingState(String resourceName,
+                                Partition partition,
+                                String instanceName)
+  {
+    Map<Partition, Map<String, String>> map = _pendingStateMap.get(resourceName);
+    if (map != null)
+    {
+      Map<String, String> instanceStateMap = map.get(partition);
+      if (instanceStateMap != null)
+      {
+        return instanceStateMap.get(instanceName);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * given (resource, partition), returns (instance->currentState) map
+   * 
+   * @param resourceName
+   * @param partition
+   * @return
+   */
+  public Map<String, String> getCurrentStateMap(String resourceName, Partition partition)
+  {
+    if (_currentStateMap.containsKey(resourceName))
+    {
+      Map<Partition, Map<String, String>> map = _currentStateMap.get(resourceName);
+      if (map.containsKey(partition))
+      {
+        return map.get(partition);
+      }
+    }
+    return Collections.emptyMap();
+  }
+
+  /**
+   * given (resource, partition), returns (instance->toState) map
+   * 
+   * @param resourceName
+   * @param partition
+   * @return
+   */
+  public Map<String, String> getPendingStateMap(String resourceName, Partition partition)
+  {
+    if (_pendingStateMap.containsKey(resourceName))
+    {
+      Map<Partition, Map<String, String>> map = _pendingStateMap.get(resourceName);
+      if (map.containsKey(partition))
+      {
+        return map.get(partition);
+      }
+    }
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public String toString()
+  {
+    StringBuilder sb = new StringBuilder();
+    sb.append("current state= ").append(_currentStateMap);
+    sb.append(", pending state= ").append(_pendingStateMap);
+    return sb.toString();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
new file mode 100644
index 0000000..5ae21a8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -0,0 +1,122 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.log4j.Logger;
+
+
+public class ExternalViewComputeStage extends AbstractBaseStage
+{
+  private static Logger log = Logger.getLogger(ExternalViewComputeStage.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception
+  {
+    long startTime = System.currentTimeMillis();
+    log.info("START ExternalViewComputeStage.process()");
+
+    HelixManager manager = event.getAttribute("helixmanager");
+    Map<String, Resource> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+
+    if (manager == null || resourceMap == null || cache == null)
+    {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires ClusterManager|RESOURCES|DataCache");
+    }
+
+    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+
+    CurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+
+    List<ExternalView> newExtViews = new ArrayList<ExternalView>();
+    List<PropertyKey> keys = new ArrayList<PropertyKey>();
+    
+    for (String resourceName : resourceMap.keySet())
+    {
+      ExternalView view = new ExternalView(resourceName);
+      view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
+      
+      Resource resource = resourceMap.get(resourceName);
+      for (Partition partition : resource.getPartitions())
+      {
+        Map<String, String> currentStateMap =
+            currentStateOutput.getCurrentStateMap(resourceName, partition);
+        if (currentStateMap != null && currentStateMap.size() > 0)
+        {
+          // Set<String> disabledInstances
+          // = cache.getDisabledInstancesForResource(resource.toString());
+          for (String instance : currentStateMap.keySet())
+          {
+            // if (!disabledInstances.contains(instance))
+            // {
+            view.setState(partition.getPartitionName(),
+                          instance,
+                          currentStateMap.get(instance));
+            // }
+          }
+        }
+      }
+      // Update cluster status monitor mbean
+      ClusterStatusMonitor clusterStatusMonitor =
+          (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+      if (clusterStatusMonitor != null)
+      {
+        clusterStatusMonitor.onExternalViewChange(view,
+                                                  cache._idealStateMap.get(view.getResourceName()));
+      }
+      
+      // compare the new external view with current one, set only on different
+      Map<String, ExternalView> curExtViews =
+          dataAccessor.getChildValuesMap(manager.getHelixDataAccessor()
+                                             .keyBuilder()
+                                             .externalViews());
+
+      ExternalView curExtView = curExtViews.get(resourceName);
+      if (curExtView == null || !curExtView.getRecord().equals(view.getRecord()))
+      {
+        keys.add(manager.getHelixDataAccessor().keyBuilder().externalView(resourceName));
+        newExtViews.add(view);
+        // dataAccessor.setProperty(PropertyType.EXTERNALVIEW, view,
+        // resourceName);
+      }
+    }
+
+    if (newExtViews.size() > 0)
+    {
+      dataAccessor.setChildren(keys, newExtViews);
+    }
+    
+    long endTime = System.currentTimeMillis();
+    log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
new file mode 100644
index 0000000..bae7d4d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
@@ -0,0 +1,131 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.AlertStatus;
+import org.apache.helix.model.Alerts;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.PersistentStats;
+
+
+public class HealthDataCache
+{
+  Map<String, LiveInstance> _liveInstanceMap;
+
+  Map<String, Map<String, HealthStat>> _healthStatMap;
+  HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
+  PersistentStats _persistentStats;
+  Alerts _alerts;
+  AlertStatus _alertStatus;
+
+  public boolean refresh(DataAccessor accessor)
+  {
+    _liveInstanceMap = accessor.getChildValuesMap(LiveInstance.class,
+        PropertyType.LIVEINSTANCES);
+
+    Map<String, Map<String, HealthStat>> hsMap = new HashMap<String, Map<String, HealthStat>>();
+
+    for (String instanceName : _liveInstanceMap.keySet())
+    {
+      // xxx clearly getting znodes for the instance here...so get the
+      // timestamp!
+
+      hsMap.put(instanceName, accessor.getChildValuesMap(HealthStat.class,
+          PropertyType.HEALTHREPORT, instanceName));
+    }
+    _healthStatMap = Collections.unmodifiableMap(hsMap);
+    _persistentStats = accessor.getProperty(PersistentStats.class,
+        PropertyType.PERSISTENTSTATS);
+    _alerts = accessor.getProperty(Alerts.class, PropertyType.ALERTS);
+    _alertStatus = accessor.getProperty(AlertStatus.class,
+        PropertyType.ALERT_STATUS);
+
+    return true;
+  }
+
+  public HealthStat getGlobalStats()
+  {
+    return _globalStats;
+  }
+
+  public PersistentStats getPersistentStats()
+  {
+    return _persistentStats;
+  }
+
+  public Alerts getAlerts()
+  {
+    return _alerts;
+  }
+
+  public AlertStatus getAlertStatus()
+  {
+    return _alertStatus;
+  }
+
+  public Map<String, HealthStat> getHealthStats(String instanceName)
+  {
+    Map<String, HealthStat> map = _healthStatMap.get(instanceName);
+    if (map != null)
+    {
+      return map;
+    } else
+    {
+      return Collections.emptyMap();
+    }
+  }
+
+  public Map<String, LiveInstance> getLiveInstances()
+  {
+    return _liveInstanceMap;
+  }
+
+  public boolean refresh(HelixDataAccessor accessor)
+  {
+    Builder keyBuilder = accessor.keyBuilder();
+    _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+
+    Map<String, Map<String, HealthStat>> hsMap = new HashMap<String, Map<String, HealthStat>>();
+
+    for (String instanceName : _liveInstanceMap.keySet())
+    {
+      // xxx clearly getting znodes for the instance here...so get the
+      // timestamp!
+
+      Map<String, HealthStat> childValuesMap = accessor
+          .getChildValuesMap(keyBuilder.healthReports(instanceName));
+      hsMap.put(instanceName, childValuesMap);
+    }
+    _healthStatMap = Collections.unmodifiableMap(hsMap);
+    _persistentStats = accessor.getProperty(keyBuilder.persistantStat());
+    _alerts = accessor.getProperty(keyBuilder.alerts());
+    _alertStatus = accessor.getProperty(keyBuilder.alertStatus());
+
+    return true;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
new file mode 100644
index 0000000..7002d29
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+
+
+public class MessageGenerationOutput
+{
+
+  private final Map<String, Map<Partition, List<Message>>> _messagesMap;
+
+  public MessageGenerationOutput()
+  {
+    _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
+
+  }
+
+  public void addMessage(String resourceName, Partition resource,
+      Message message)
+  {
+    if (!_messagesMap.containsKey(resourceName))
+    {
+      _messagesMap.put(resourceName,
+          new HashMap<Partition, List<Message>>());
+    }
+    if (!_messagesMap.get(resourceName).containsKey(resource))
+    {
+      _messagesMap.get(resourceName).put(resource,
+          new ArrayList<Message>());
+
+    }
+    _messagesMap.get(resourceName).get(resource).add(message);
+
+  }
+
+  public List<Message> getMessages(String resourceName,
+      Partition resource)
+  {
+    Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
+    if (map != null)
+    {
+      return map.get(resource);
+    }
+    return Collections.emptyList();
+
+  }
+  
+  @Override
+  public String toString()
+  {
+    return _messagesMap.toString();
+  }
+}