You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC
[8/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
new file mode 100644
index 0000000..af0d95e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
@@ -0,0 +1,262 @@
+package org.apache.helix.controller.restlet;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
+import org.restlet.Component;
+import org.restlet.Context;
+import org.restlet.data.Protocol;
+
+/**
+ * Controller side restlet server that receives ZNRecordUpdate requests from
+ * clients, and batch the ZNRecordUpdate and apply them to zookeeper. This is
+ * to optimize the concurrency level of zookeeper access for ZNRecord updates
+ * that does not require real-time, like message handling status updates and
+ * healthcheck reports.
+ *
+ * As one server will be used by multiple helix controllers that runs on the same machine,
+ * This class is designed as a singleton. Application is responsible to call init()
+ * and shutdown() on the getInstance().
+ * */
+public class ZKPropertyTransferServer
+{
+ public static final String PORT = "port";
+ public static String RESTRESOURCENAME = "ZNRecordUpdates";
+ public static final String SERVER = "ZKPropertyTransferServer";
+
+ // Frequency period for the ZNRecords are batch written to zookeeper
+ public static int PERIOD = 10 * 1000;
+ // If the buffered ZNRecord updates exceed the limit, do a zookeeper batch update.
+ public static int MAX_UPDATE_LIMIT = 10000;
+ private static Logger LOG = Logger.getLogger(ZKPropertyTransferServer.class);
+
+ int _localWebservicePort;
+ String _webserviceUrl;
+ ZkBaseDataAccessor<ZNRecord> _accessor;
+ String _zkAddress;
+
+ AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef
+ = new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
+
+ boolean _initialized = false;
+ boolean _shutdownFlag = false;
+ Component _component = null;
+ Timer _timer = null;
+
+ /**
+ * Timertask for zookeeper batched writes
+ * */
+ class ZKPropertyTransferTask extends TimerTask
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ sendData();
+ }
+ catch(Throwable t)
+ {
+ LOG.error("", t);
+ }
+
+ }
+ }
+
+ void sendData()
+ {
+ LOG.info("ZKPropertyTransferServer transfering data to zookeeper");
+ ConcurrentHashMap<String, ZNRecordUpdate> updateCache = null;
+
+ synchronized(_dataBufferRef)
+ {
+ updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
+ }
+
+ if(updateCache != null)
+ {
+ List<String> paths = new ArrayList<String>();
+ List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
+ List<ZNRecord> vals = new ArrayList<ZNRecord>();
+ // BUGBUG : what if the instance is dropped?
+ for(ZNRecordUpdate holder : updateCache.values())
+ {
+ paths.add(holder.getPath());
+ updaters.add(holder.getZNRecordUpdater());
+ vals.add(holder.getRecord());
+ }
+ // Batch write the accumulated updates into zookeeper
+ long timeStart = System.currentTimeMillis();
+ if(paths.size() > 0)
+ {
+ _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+ }
+ LOG.info("ZKPropertyTransferServer updated " + vals.size() + " records in " + (System.currentTimeMillis() - timeStart) + " ms");
+ }
+ else
+ {
+ LOG.warn("null _dataQueueRef. Should be in the beginning only");
+ }
+ }
+
+ static ZKPropertyTransferServer _instance = new ZKPropertyTransferServer();
+
+ private ZKPropertyTransferServer()
+ {
+ _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
+ }
+
+ public static ZKPropertyTransferServer getInstance()
+ {
+ return _instance;
+ }
+
+ public boolean isInitialized()
+ {
+ return _initialized;
+ }
+
+ public void init(int localWebservicePort, String zkAddress)
+ {
+ if(!_initialized && !_shutdownFlag)
+ {
+ LOG.error("Initializing with port " + localWebservicePort + " zkAddress: " + zkAddress);
+ _localWebservicePort = localWebservicePort;
+ ZkClient zkClient = new ZkClient(zkAddress);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ _accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
+ _zkAddress = zkAddress;
+ startServer();
+ }
+ else
+ {
+ LOG.error("Already initialized with port " + _localWebservicePort + " shutdownFlag: " + _shutdownFlag);
+ }
+ }
+
+ public String getWebserviceUrl()
+ {
+ if(!_initialized || _shutdownFlag)
+ {
+ LOG.debug("inited:" + _initialized + " shutdownFlag:"+_shutdownFlag+" , return");
+ return null;
+ }
+ return _webserviceUrl;
+ }
+
+ /** Add an ZNRecordUpdate to the change queue.
+ * Called by the webservice front-end.
+ *
+ */
+ void enqueueData(ZNRecordUpdate e)
+ {
+ if(!_initialized || _shutdownFlag)
+ {
+ LOG.error("zkDataTransferServer inited:" + _initialized
+ + " shutdownFlag:"+_shutdownFlag+" , return");
+ return;
+ }
+ // Do local merge if receive multiple update on the same path
+ synchronized(_dataBufferRef)
+ {
+ e.getRecord().setSimpleField(SERVER, _webserviceUrl);
+ if(_dataBufferRef.get().containsKey(e.getPath()))
+ {
+ ZNRecord oldVal = _dataBufferRef.get().get(e.getPath()).getRecord();
+ oldVal = e.getZNRecordUpdater().update(oldVal);
+ _dataBufferRef.get().get(e.getPath())._record = oldVal;
+ }
+ else
+ {
+ _dataBufferRef.get().put(e.getPath(), e);
+ }
+ }
+ if(_dataBufferRef.get().size() > MAX_UPDATE_LIMIT)
+ {
+ sendData();
+ }
+ }
+
+ void startServer()
+ {
+ LOG.info("zkDataTransferServer starting on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
+
+ _component = new Component();
+
+ _component.getServers().add(Protocol.HTTP, _localWebservicePort);
+ Context applicationContext = _component.getContext().createChildContext();
+ applicationContext.getAttributes().put(SERVER, this);
+ applicationContext.getAttributes().put(PORT, "" + _localWebservicePort);
+ ZkPropertyTransferApplication application = new ZkPropertyTransferApplication(
+ applicationContext);
+ // Attach the application to the component and start it
+ _component.getDefaultHost().attach(application);
+ _timer = new Timer(true);
+ _timer.schedule(new ZKPropertyTransferTask(), PERIOD , PERIOD);
+
+ try
+ {
+ _webserviceUrl
+ = "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":" + _localWebservicePort
+ + "/" + RESTRESOURCENAME;
+ _component.start();
+ _initialized = true;
+ }
+ catch (Exception e)
+ {
+ LOG.error("", e);
+ }
+ LOG.info("zkDataTransferServer started on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
+ }
+
+ public void shutdown()
+ {
+ if(_shutdownFlag)
+ {
+ LOG.error("ZKPropertyTransferServer already has been shutdown...");
+ return;
+ }
+ LOG.info("zkDataTransferServer shuting down on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
+ if(_timer != null)
+ {
+ _timer.cancel();
+ }
+ if(_component != null)
+ {
+ try
+ {
+ _component.stop();
+ }
+ catch (Exception e)
+ {
+ LOG.error("", e);
+ }
+ }
+ _shutdownFlag = true;
+ }
+
+ public void reset()
+ {
+ if(_shutdownFlag == true)
+ {
+ _shutdownFlag = false;
+ _initialized = false;
+ _component = null;
+ _timer = null;
+ _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
new file mode 100644
index 0000000..ee18587
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
@@ -0,0 +1,77 @@
+package org.apache.helix.controller.restlet;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordUpdater;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Unit of transfered ZNRecord updates. Contains the ZNRecord Value, zkPath
+ * to store the update value, and the property type (used to merge the ZNRecord)
+ * For ZNRecord subtraction, it is currently not supported yet.
+ * */
+public class ZNRecordUpdate
+{
+ public enum OpCode
+ {
+ // TODO: create is not supported; but update will create if not exist
+ CREATE,
+ UPDATE,
+ SET
+ }
+ final String _path;
+ ZNRecord _record;
+ final OpCode _code;
+
+ @JsonCreator
+ public ZNRecordUpdate(@JsonProperty("path")String path,
+ @JsonProperty("opcode")OpCode code,
+ @JsonProperty("record")ZNRecord record)
+ {
+ _path = path;
+ _record = record;
+ _code = code;
+ }
+
+ public String getPath()
+ {
+ return _path;
+ }
+
+ public ZNRecord getRecord()
+ {
+ return _record;
+ }
+
+ public OpCode getOpcode()
+ {
+ return _code;
+ }
+
+ @JsonIgnore(true)
+ public DataUpdater<ZNRecord> getZNRecordUpdater()
+ {
+ if(_code == OpCode.SET)
+
+ {
+ return new ZNRecordUpdater(_record)
+ {
+ @Override
+ public ZNRecord update(ZNRecord current)
+ {
+ return _record;
+ }
+ };
+ }
+ else if ((_code == OpCode.UPDATE))
+ {
+ return new ZNRecordUpdater(_record);
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Not supported : " + _code);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
new file mode 100644
index 0000000..61aa0a9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
@@ -0,0 +1,80 @@
+package org.apache.helix.controller.restlet;
+
+import java.io.StringReader;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.restlet.data.Form;
+import org.restlet.data.MediaType;
+import org.restlet.data.Status;
+import org.restlet.resource.Representation;
+import org.restlet.resource.Resource;
+
+/**
+ * REST resource for ZkPropertyTransfer server to receive PUT requests
+ * that submits ZNRecordUpdates
+ * */
+public class ZNRecordUpdateResource extends Resource
+{
+ public static final String UPDATEKEY = "ZNRecordUpdate";
+ private static Logger LOG = Logger.getLogger(ZNRecordUpdateResource.class);
+ @Override
+ public boolean allowGet()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean allowPost()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean allowPut()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean allowDelete()
+ {
+ return false;
+ }
+
+ @Override
+ public void storeRepresentation(Representation entity)
+ {
+ try
+ {
+ ZKPropertyTransferServer server = ZKPropertyTransferServer.getInstance();
+
+ Form form = new Form(entity);
+ String jsonPayload = form.getFirstValue(UPDATEKEY, true);
+
+ // Parse the map from zkPath --> ZNRecordUpdate from the payload
+ StringReader sr = new StringReader(jsonPayload);
+ ObjectMapper mapper = new ObjectMapper();
+ TypeReference<TreeMap<String, ZNRecordUpdate>> typeRef =
+ new TypeReference<TreeMap<String, ZNRecordUpdate>>()
+ {
+ };
+ Map<String, ZNRecordUpdate> holderMap = mapper.readValue(sr, typeRef);
+ // Enqueue the ZNRecordUpdate for sending
+ for(ZNRecordUpdate holder : holderMap.values())
+ {
+ server.enqueueData(holder);
+ LOG.info("Received " + holder.getPath() + " from " + getRequest().getClientInfo().getAddress());
+ }
+ getResponse().setStatus(Status.SUCCESS_OK);
+ }
+ catch(Exception e)
+ {
+ LOG.error("", e);
+ getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
new file mode 100644
index 0000000..293011b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
@@ -0,0 +1,30 @@
+package org.apache.helix.controller.restlet;
+
+import org.restlet.Application;
+import org.restlet.Context;
+import org.restlet.Restlet;
+import org.restlet.Router;
+
+/**
+ * Restlet application for ZkPropertyTransfer server
+ * */
+public class ZkPropertyTransferApplication extends Application
+{
+ public ZkPropertyTransferApplication()
+ {
+ super();
+ }
+
+ public ZkPropertyTransferApplication(Context context)
+ {
+ super(context);
+ }
+
+ @Override
+ public Restlet createRoot()
+ {
+ Router router = new Router(getContext());
+ router.attach("/" + ZKPropertyTransferServer.RESTRESOURCENAME, ZNRecordUpdateResource.class);
+ return router;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
new file mode 100644
index 0000000..83d355f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
@@ -0,0 +1,178 @@
+package org.apache.helix.controller.restlet;
+
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.restlet.Client;
+import org.restlet.data.MediaType;
+import org.restlet.data.Method;
+import org.restlet.data.Protocol;
+import org.restlet.data.Reference;
+import org.restlet.data.Request;
+import org.restlet.data.Response;
+import org.restlet.data.Status;
+
+public class ZkPropertyTransferClient
+{
+ private static Logger LOG = Logger.getLogger(ZkPropertyTransferClient.class);
+ public static final int DEFAULT_MAX_CONCURRENTTASKS = 2;
+ public static int SEND_PERIOD = 10 * 1000;
+
+ public static final String USE_PROPERTYTRANSFER = "UsePropertyTransfer";
+
+ int _maxConcurrentTasks;
+ ExecutorService _executorService;
+ Client[] _clients;
+ AtomicInteger _requestCount = new AtomicInteger(0);
+
+ // ZNRecord update buffer: key is the zkPath, value is the ZNRecordUpdate
+ AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef
+ = new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
+ Timer _timer;
+ volatile String _webServiceUrl = "";
+
+ public ZkPropertyTransferClient(int maxConcurrentTasks)
+ {
+ _maxConcurrentTasks = maxConcurrentTasks;
+ _executorService = Executors.newFixedThreadPool(_maxConcurrentTasks);
+ _clients = new Client[_maxConcurrentTasks];
+ for(int i = 0; i< _clients.length; i++)
+ {
+ _clients[i] = new Client(Protocol.HTTP);
+ }
+ _timer = new Timer(true);
+ _timer.schedule(new SendZNRecordTimerTask(), SEND_PERIOD, SEND_PERIOD);
+ _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
+ }
+
+ class SendZNRecordTimerTask extends TimerTask
+ {
+ @Override
+ public void run()
+ {
+ sendUpdateBatch();
+ }
+ }
+
+ public void enqueueZNRecordUpdate(ZNRecordUpdate update, String webserviceUrl)
+ {
+ try
+ {
+ LOG.info("Enqueue update to " + update.getPath() + " opcode: " + update.getOpcode() + " to " + webserviceUrl);
+ _webServiceUrl = webserviceUrl;
+ update.getRecord().setSimpleField(USE_PROPERTYTRANSFER, "true");
+ synchronized(_dataBufferRef)
+ {
+ if(_dataBufferRef.get().containsKey(update._path))
+ {
+ ZNRecord oldVal = _dataBufferRef.get().get(update.getPath()).getRecord();
+ oldVal = update.getZNRecordUpdater().update(oldVal);
+ _dataBufferRef.get().get(update.getPath())._record = oldVal;
+ }
+ else
+ {
+ _dataBufferRef.get().put(update.getPath(), update);
+ }
+ }
+ }
+ catch(Exception e)
+ {
+ LOG.error("", e);
+ }
+ }
+
+ void sendUpdateBatch()
+ {
+ LOG.debug("Actual sending update with " + _dataBufferRef.get().size() + " updates to " + _webServiceUrl);
+ Map<String, ZNRecordUpdate> updateCache = null;
+
+ synchronized(_dataBufferRef)
+ {
+ updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
+ }
+
+ if(updateCache != null && updateCache.size() > 0)
+ {
+ ZNRecordUpdateUploadTask task = new ZNRecordUpdateUploadTask(updateCache, _webServiceUrl, _clients[_requestCount.intValue() % _maxConcurrentTasks]);
+ _requestCount.incrementAndGet();
+ _executorService.submit(task);
+ LOG.trace("Queue size :" + ((ThreadPoolExecutor)_executorService).getQueue().size());
+ }
+ }
+
+ public void shutdown()
+ {
+ LOG.info("Shutting down ZkPropertyTransferClient");
+ _executorService.shutdown();
+ _timer.cancel();
+ for(Client client: _clients)
+ {
+ try
+ {
+ client.stop();
+ }
+ catch (Exception e)
+ {
+ LOG.error("", e);
+ }
+ }
+ }
+
+ class ZNRecordUpdateUploadTask implements Callable<Void>
+ {
+ Map<String, ZNRecordUpdate> _updateMap;
+ String _webServiceUrl;
+ Client _client;
+
+ ZNRecordUpdateUploadTask(Map<String, ZNRecordUpdate> update, String webserviceUrl, Client client)
+ {
+ _updateMap = update;
+ _webServiceUrl = webserviceUrl;
+ _client = client;
+ }
+
+ @Override
+ public Void call() throws Exception
+ {
+ LOG.debug("Actual sending update with " + _updateMap.size() + " updates to " + _webServiceUrl);
+ long time = System.currentTimeMillis();
+ Reference resourceRef = new Reference(_webServiceUrl);
+ Request request = new Request(Method.PUT, resourceRef);
+
+ ObjectMapper mapper = new ObjectMapper();
+ StringWriter sw = new StringWriter();
+ try
+ {
+ mapper.writeValue(sw, _updateMap);
+ }
+ catch (Exception e)
+ {
+ LOG.error("",e);
+ }
+
+ request.setEntity(
+ ZNRecordUpdateResource.UPDATEKEY + "=" + sw, MediaType.APPLICATION_ALL);
+ // This is a sync call. See com.noelios.restlet.http.StreamClientCall.sendRequest()
+ Response response = _client.handle(request);
+
+ if(response.getStatus().getCode() != Status.SUCCESS_OK.getCode())
+ {
+ LOG.error("Status : " + response.getStatus());
+ }
+ LOG.info("Using time : " + (System.currentTimeMillis() - time));
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
new file mode 100644
index 0000000..35e5f01
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Restlet server for Helix controller
+ *
+ */
+package org.apache.helix.controller.restlet;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
new file mode 100644
index 0000000..be44ba9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+public enum AttributeName
+{
+ RESOURCES,
+ BEST_POSSIBLE_STATE,
+ CURRENT_STATE,
+ MESSAGES_ALL,
+ MESSAGES_SELECTED,
+ MESSAGES_THROTTLE,
+ LOCAL_STATE
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
new file mode 100644
index 0000000..8ba4a19
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -0,0 +1,532 @@
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.log4j.Logger;
+
+
+/**
+ * For partition compute best possible (instance,state) pair based on
+ * IdealState,StateModel,LiveInstance
+ *
+ * @author kgopalak
+ *
+ */
+// TODO: refactor this
+public class BestPossibleStateCalcStage extends AbstractBaseStage
+{
+ private static final Logger logger =
+ Logger.getLogger(BestPossibleStateCalcStage.class.getName());
+
+ @Override
+ public void process(ClusterEvent event) throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+ logger.info("START BestPossibleStateCalcStage.process()");
+
+ CurrentStateOutput currentStateOutput =
+ event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ Map<String, Resource> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+
+ if (currentStateOutput == null || resourceMap == null || cache == null)
+ {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires CURRENT_STATE|RESOURCES|DataCache");
+ }
+
+ BestPossibleStateOutput bestPossibleStateOutput =
+ compute(event, resourceMap, currentStateOutput);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(),
+ bestPossibleStateOutput);
+
+ long endTime = System.currentTimeMillis();
+ logger.info("END BestPossibleStateCalcStage.process(). took: "
+ + (endTime - startTime) + " ms");
+ }
+
+ private BestPossibleStateOutput compute(ClusterEvent event,
+ Map<String, Resource> resourceMap,
+ CurrentStateOutput currentStateOutput)
+ {
+ // for each ideal state
+ // read the state model def
+ // for each resource
+ // get the preference list
+ // for each instanceName check if its alive then assign a state
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+ HelixManager manager = event.getAttribute("helixmanager");
+
+ BestPossibleStateOutput output = new BestPossibleStateOutput();
+
+ for (String resourceName : resourceMap.keySet())
+ {
+ logger.debug("Processing resource:" + resourceName);
+
+ Resource resource = resourceMap.get(resourceName);
+ // Ideal state may be gone. In that case we need to get the state model name
+ // from the current state
+ IdealState idealState = cache.getIdealState(resourceName);
+
+ String stateModelDefName;
+
+ if (idealState == null)
+ {
+ // if ideal state is deleted, use an empty one
+ logger.info("resource:" + resourceName + " does not exist anymore");
+ stateModelDefName = currentStateOutput.getResourceStateModelDef(resourceName);
+ idealState = new IdealState(resourceName);
+ }
+ else
+ {
+ stateModelDefName = idealState.getStateModelDefRef();
+ }
+
+ StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+ if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO_REBALANCE)
+ {
+ calculateAutoBalancedIdealState(cache,
+ idealState,
+ stateModelDef,
+ currentStateOutput);
+ }
+
+
+ for (Partition partition : resource.getPartitions())
+ {
+ Map<String, String> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resourceName, partition);
+
+ Map<String, String> bestStateForPartition;
+ Set<String> disabledInstancesForPartition =
+ cache.getDisabledInstancesForPartition(partition.toString());
+
+ if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
+ {
+ Map<String, String> idealStateMap =
+ idealState.getInstanceStateMap(partition.getPartitionName());
+ bestStateForPartition =
+ computeCustomizedBestStateForPartition(cache,
+ stateModelDef,
+ idealStateMap,
+ currentStateMap,
+ disabledInstancesForPartition);
+ }
+ else
+ // both AUTO and AUTO_REBALANCE mode
+ {
+ List<String> instancePreferenceList =
+ getPreferenceList(cache, partition, idealState, stateModelDef);
+
+ bestStateForPartition =
+ computeAutoBestStateForPartition(cache,
+ stateModelDef,
+ instancePreferenceList,
+ currentStateMap,
+ disabledInstancesForPartition);
+ }
+ output.setState(resourceName, partition, bestStateForPartition);
+ }
+ }
+ return output;
+ }
+
+ /**
+ * Compute best state for resource in AUTO_REBALANCE ideal state mode. the algorithm
+ * will make sure that the master partition are evenly distributed; Also when instances
+ * are added / removed, the amount of diff in master partitions are minimized
+ *
+ * @param cache
+ * @param idealState
+ * @param instancePreferenceList
+ * @param stateModelDef
+ * @param currentStateOutput
+ * @return
+ */
+ private void calculateAutoBalancedIdealState(ClusterDataCache cache,
+ IdealState idealState,
+ StateModelDefinition stateModelDef,
+ CurrentStateOutput currentStateOutput)
+ {
+ String topStateValue = stateModelDef.getStatesPriorityList().get(0);
+ Set<String> liveInstances = cache._liveInstanceMap.keySet();
+ // Obtain replica number
+ int replicas = 1;
+ try
+ {
+ replicas = Integer.parseInt(idealState.getReplicas());
+ }
+ catch (Exception e)
+ {
+ logger.error("", e);
+ }
+ // Init for all partitions with empty list
+ Map<String, List<String>> defaultListFields = new TreeMap<String, List<String>>();
+ List<String> emptyList = new ArrayList<String>(0);
+ for (String partition : idealState.getPartitionSet())
+ {
+ defaultListFields.put(partition, emptyList);
+ }
+ idealState.getRecord().setListFields(defaultListFields);
+ // Return if no live instance
+ if (liveInstances.size() == 0)
+ {
+ logger.info("No live instances, return. Idealstate : "
+ + idealState.getResourceName());
+ return;
+ }
+ Map<String, List<String>> masterAssignmentMap = new HashMap<String, List<String>>();
+ for (String instanceName : liveInstances)
+ {
+ masterAssignmentMap.put(instanceName, new ArrayList<String>());
+ }
+ Set<String> orphanedPartitions = new HashSet<String>();
+ orphanedPartitions.addAll(idealState.getPartitionSet());
+ // Go through all current states and fill the assignments
+ for (String liveInstanceName : liveInstances)
+ {
+ CurrentState currentState =
+ cache.getCurrentState(liveInstanceName,
+ cache.getLiveInstances()
+ .get(liveInstanceName)
+ .getSessionId()).get(idealState.getId());
+ if (currentState != null)
+ {
+ Map<String, String> partitionStates = currentState.getPartitionStateMap();
+ for (String partitionName : partitionStates.keySet())
+ {
+ String state = partitionStates.get(partitionName);
+ if (state.equals(topStateValue))
+ {
+ masterAssignmentMap.get(liveInstanceName).add(partitionName);
+ orphanedPartitions.remove(partitionName);
+ }
+ }
+ }
+ }
+ List<String> orphanedPartitionsList = new ArrayList<String>();
+ orphanedPartitionsList.addAll(orphanedPartitions);
+ normalizeAssignmentMap(masterAssignmentMap, orphanedPartitionsList);
+ idealState.getRecord()
+ .setListFields(generateListFieldFromMasterAssignment(masterAssignmentMap,
+ replicas));
+
+ }
+
+ /**
+ * Given the current master assignment map and the partitions not hosted, generate an
+ * evenly distributed partition assignment map
+ *
+ * @param masterAssignmentMap
+ * current master assignment map
+ * @param orphanPartitions
+ * partitions not hosted by any instance
+ * @return
+ */
+ private void normalizeAssignmentMap(Map<String, List<String>> masterAssignmentMap,
+ List<String> orphanPartitions)
+ {
+ int totalPartitions = 0;
+ String[] instanceNames = new String[masterAssignmentMap.size()];
+ masterAssignmentMap.keySet().toArray(instanceNames);
+ Arrays.sort(instanceNames);
+ // Find out total partition number
+ for (String key : masterAssignmentMap.keySet())
+ {
+ totalPartitions += masterAssignmentMap.get(key).size();
+ Collections.sort(masterAssignmentMap.get(key));
+ }
+ totalPartitions += orphanPartitions.size();
+
+ // Find out how many partitions an instance should host
+ int partitionNumber = totalPartitions / masterAssignmentMap.size();
+ int leave = totalPartitions % masterAssignmentMap.size();
+
+ for (int i = 0; i < instanceNames.length; i++)
+ {
+ int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
+ leave--;
+ // For hosts that has more partitions, move those partitions to "orphaned"
+ while (masterAssignmentMap.get(instanceNames[i]).size() > targetPartitionNo)
+ {
+ int lastElementIndex = masterAssignmentMap.get(instanceNames[i]).size() - 1;
+ orphanPartitions.add(masterAssignmentMap.get(instanceNames[i])
+ .get(lastElementIndex));
+ masterAssignmentMap.get(instanceNames[i]).remove(lastElementIndex);
+ }
+ }
+ leave = totalPartitions % masterAssignmentMap.size();
+ Collections.sort(orphanPartitions);
+ // Assign "orphaned" partitions to hosts that do not have enough partitions
+ for (int i = 0; i < instanceNames.length; i++)
+ {
+ int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
+ leave--;
+ while (masterAssignmentMap.get(instanceNames[i]).size() < targetPartitionNo)
+ {
+ int lastElementIndex = orphanPartitions.size() - 1;
+ masterAssignmentMap.get(instanceNames[i])
+ .add(orphanPartitions.get(lastElementIndex));
+ orphanPartitions.remove(lastElementIndex);
+ }
+ }
+ if (orphanPartitions.size() > 0)
+ {
+ logger.error("orphanPartitions still contains elements");
+ }
+ }
+
+ /**
+ * Generate full preference list from the master assignment map evenly distribute the
+ * slave partitions mastered on a host to other hosts
+ *
+ * @param masterAssignmentMap
+ * current master assignment map
+ * @param orphanPartitions
+ * partitions not hosted by any instance
+ * @return
+ */
+ Map<String, List<String>> generateListFieldFromMasterAssignment(Map<String, List<String>> masterAssignmentMap,
+ int replicas)
+ {
+ Map<String, List<String>> listFields = new HashMap<String, List<String>>();
+ int slaves = replicas - 1;
+ String[] instanceNames = new String[masterAssignmentMap.size()];
+ masterAssignmentMap.keySet().toArray(instanceNames);
+ Arrays.sort(instanceNames);
+
+ for (int i = 0; i < instanceNames.length; i++)
+ {
+ String instanceName = instanceNames[i];
+ List<String> otherInstances = new ArrayList<String>(masterAssignmentMap.size() - 1);
+ for (int x = 0; x < instanceNames.length - 1; x++)
+ {
+ int index = (x + i + 1) % instanceNames.length;
+ otherInstances.add(instanceNames[index]);
+ }
+
+ List<String> partitionList = masterAssignmentMap.get(instanceName);
+ for (int j = 0; j < partitionList.size(); j++)
+ {
+ String partitionName = partitionList.get(j);
+ listFields.put(partitionName, new ArrayList<String>());
+ listFields.get(partitionName).add(instanceName);
+
+ int slavesCanAssign = Math.min(slaves, otherInstances.size());
+ for (int k = 0; k < slavesCanAssign; k++)
+ {
+ int index = (j + k + 1) % otherInstances.size();
+ listFields.get(partitionName).add(otherInstances.get(index));
+ }
+ }
+ }
+ return listFields;
+ }
+
+ /**
+ * compute best state for resource in AUTO ideal state mode
+ *
+ * @param cache
+ * @param stateModelDef
+ * @param instancePreferenceList
+ * @param currentStateMap
+ * : instance->state for each partition
+ * @param disabledInstancesForPartition
+ * @return
+ */
+ private Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
+ StateModelDefinition stateModelDef,
+ List<String> instancePreferenceList,
+ Map<String, String> currentStateMap,
+ Set<String> disabledInstancesForPartition)
+ {
+ Map<String, String> instanceStateMap = new HashMap<String, String>();
+
+ // if the ideal state is deleted, instancePreferenceList will be empty and
+ // we should drop all resources.
+ if (currentStateMap != null)
+ {
+ for (String instance : currentStateMap.keySet())
+ {
+ if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
+ && !"ERROR".equals(currentStateMap.get(instance)))
+ {
+ // move to DROPPED state only if not in ERROR state
+ instanceStateMap.put(instance, "DROPPED");
+ }
+ else if (!"ERROR".equals(currentStateMap.get(instance))
+ && disabledInstancesForPartition.contains(instance))
+ {
+ // if a non-error node is disabled, put it into initial state (OFFLINE)
+ instanceStateMap.put(instance, stateModelDef.getInitialState());
+ }
+ }
+ }
+
+ // ideal state is deleted
+ if (instancePreferenceList == null)
+ {
+ return instanceStateMap;
+ }
+
+ List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+ boolean assigned[] = new boolean[instancePreferenceList.size()];
+
+ Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+
+ for (String state : statesPriorityList)
+ {
+ String num = stateModelDef.getNumInstancesPerState(state);
+ int stateCount = -1;
+ if ("N".equals(num))
+ {
+ Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
+ liveAndEnabled.removeAll(disabledInstancesForPartition);
+ stateCount = liveAndEnabled.size();
+ }
+ else if ("R".equals(num))
+ {
+ stateCount = instancePreferenceList.size();
+ }
+ else
+ {
+ try
+ {
+ stateCount = Integer.parseInt(num);
+ }
+ catch (Exception e)
+ {
+ logger.error("Invalid count for state:" + state + " ,count=" + num);
+ }
+ }
+ if (stateCount > -1)
+ {
+ int count = 0;
+ for (int i = 0; i < instancePreferenceList.size(); i++)
+ {
+ String instanceName = instancePreferenceList.get(i);
+
+ boolean notInErrorState =
+ currentStateMap == null
+ || !"ERROR".equals(currentStateMap.get(instanceName));
+
+ if (liveInstancesMap.containsKey(instanceName) && !assigned[i]
+ && notInErrorState && !disabledInstancesForPartition.contains(instanceName))
+ {
+ instanceStateMap.put(instanceName, state);
+ count = count + 1;
+ assigned[i] = true;
+ if (count == stateCount)
+ {
+ break;
+ }
+ }
+ }
+ }
+ }
+ return instanceStateMap;
+ }
+
+ /**
+ * compute best state for resource in CUSTOMIZED ideal state mode
+ *
+ * @param cache
+ * @param stateModelDef
+ * @param idealStateMap
+ * @param currentStateMap
+ * @param disabledInstancesForPartition
+ * @return
+ */
+ private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache cache,
+ StateModelDefinition stateModelDef,
+ Map<String, String> idealStateMap,
+ Map<String, String> currentStateMap,
+ Set<String> disabledInstancesForPartition)
+ {
+ Map<String, String> instanceStateMap = new HashMap<String, String>();
+
+ // if the ideal state is deleted, idealStateMap will be null/empty and
+ // we should drop all resources.
+ if (currentStateMap != null)
+ {
+ for (String instance : currentStateMap.keySet())
+ {
+ if ( (idealStateMap == null || !idealStateMap.containsKey(instance))
+ && !"ERROR".equals(currentStateMap.get(instance)))
+ {
+ // move to DROPPED state only if not in ERROR state
+ instanceStateMap.put(instance, "DROPPED");
+ }
+ else if (!"ERROR".equals(currentStateMap.get(instance))
+ && disabledInstancesForPartition.contains(instance))
+ {
+ // if a non-error node is disabled, put it into initial state (OFFLINE)
+ instanceStateMap.put(instance, stateModelDef.getInitialState());
+ }
+ }
+ }
+
+ // ideal state is deleted
+ if (idealStateMap == null)
+ {
+ return instanceStateMap;
+ }
+
+ Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+ for (String instance : idealStateMap.keySet())
+ {
+ boolean notInErrorState =
+ currentStateMap == null || !"ERROR".equals(currentStateMap.get(instance));
+
+ if (liveInstancesMap.containsKey(instance) && notInErrorState
+ && !disabledInstancesForPartition.contains(instance))
+ {
+ instanceStateMap.put(instance, idealStateMap.get(instance));
+ }
+ }
+
+ return instanceStateMap;
+ }
+
+ private List<String> getPreferenceList(ClusterDataCache cache,
+ Partition resource,
+ IdealState idealState,
+ StateModelDefinition stateModelDef)
+ {
+ List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
+
+ if (listField != null && listField.size() == 1
+ && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0)))
+ {
+ Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+ List<String> prefList = new ArrayList<String>(liveInstances.keySet());
+ Collections.sort(prefList);
+ return prefList;
+ }
+ else
+ {
+ return listField;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
new file mode 100644
index 0000000..63f6eec
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.model.Partition;
+
+
+public class BestPossibleStateOutput
+{
+ // resource->partition->instance->state
+ Map<String, Map<Partition, Map<String, String>>> _dataMap;
+
+ public BestPossibleStateOutput()
+ {
+ _dataMap = new HashMap<String, Map<Partition, Map<String, String>>>();
+ }
+
+ public void setState(String resourceName, Partition resource,
+ Map<String, String> bestInstanceStateMappingForResource)
+ {
+ if (!_dataMap.containsKey(resourceName))
+ {
+ _dataMap.put(resourceName,
+ new HashMap<Partition, Map<String, String>>());
+ }
+ Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
+ map.put(resource, bestInstanceStateMappingForResource);
+ }
+
+ public Map<String, String> getInstanceStateMap(String resourceName,
+ Partition resource)
+ {
+ Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
+ if (map != null)
+ {
+ return map.get(resource);
+ }
+ return Collections.emptyMap();
+ }
+
+ public Map<Partition, Map<String, String>> getResourceMap(String resourceName)
+ {
+ Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
+ if (map != null)
+ {
+ return map;
+ }
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public String toString()
+ {
+ return _dataMap.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
new file mode 100644
index 0000000..914cc0d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -0,0 +1,266 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Reads the data from the cluster using data accessor. This output ClusterData which
+ * provides useful methods to search/lookup properties
+ *
+ * @author kgopalak
+ *
+ */
+public class ClusterDataCache
+{
+
+ Map<String, LiveInstance> _liveInstanceMap;
+ Map<String, IdealState> _idealStateMap;
+ Map<String, StateModelDefinition> _stateModelDefMap;
+ Map<String, InstanceConfig> _instanceConfigMap;
+ Map<String, ClusterConstraints> _constraintMap;
+ Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
+ Map<String, Map<String, Message>> _messageMap;
+
+ // Map<String, Map<String, HealthStat>> _healthStatMap;
+ // private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
+ // private PersistentStats _persistentStats;
+ // private Alerts _alerts;
+ // private AlertStatus _alertStatus;
+
+ private static final Logger LOG =
+ Logger.getLogger(ClusterDataCache.class.getName());
+
+ public boolean refresh(HelixDataAccessor accessor)
+ {
+ Builder keyBuilder = accessor.keyBuilder();
+ _idealStateMap = accessor.getChildValuesMap(keyBuilder.idealStates());
+ _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+
+ for (LiveInstance instance : _liveInstanceMap.values())
+ {
+ LOG.trace("live instance: " + instance.getInstanceName() + " "
+ + instance.getSessionId());
+ }
+
+ _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
+ _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+ _constraintMap =
+ accessor.getChildValuesMap(keyBuilder.constraints());
+
+ Map<String, Map<String, Message>> msgMap =
+ new HashMap<String, Map<String, Message>>();
+ for (String instanceName : _liveInstanceMap.keySet())
+ {
+ Map<String, Message> map =
+ accessor.getChildValuesMap(keyBuilder.messages(instanceName));
+ msgMap.put(instanceName, map);
+ }
+ _messageMap = Collections.unmodifiableMap(msgMap);
+
+ Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap =
+ new HashMap<String, Map<String, Map<String, CurrentState>>>();
+ for (String instanceName : _liveInstanceMap.keySet())
+ {
+ LiveInstance liveInstance = _liveInstanceMap.get(instanceName);
+ String sessionId = liveInstance.getSessionId();
+ if (!allCurStateMap.containsKey(instanceName))
+ {
+ allCurStateMap.put(instanceName, new HashMap<String, Map<String, CurrentState>>());
+ }
+ Map<String, Map<String, CurrentState>> curStateMap =
+ allCurStateMap.get(instanceName);
+ Map<String, CurrentState> map =
+ accessor.getChildValuesMap(keyBuilder.currentStates(instanceName, sessionId));
+ curStateMap.put(sessionId, map);
+ }
+
+ for (String instance : allCurStateMap.keySet())
+ {
+ allCurStateMap.put(instance,
+ Collections.unmodifiableMap(allCurStateMap.get(instance)));
+ }
+ _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
+
+ return true;
+ }
+
+ public Map<String, IdealState> getIdealStates()
+ {
+ return _idealStateMap;
+ }
+
+ public Map<String, LiveInstance> getLiveInstances()
+ {
+ return _liveInstanceMap;
+ }
+
+ public Map<String, CurrentState> getCurrentState(String instanceName,
+ String clientSessionId)
+ {
+ return _currentStateMap.get(instanceName).get(clientSessionId);
+ }
+
+ public Map<String, Message> getMessages(String instanceName)
+ {
+ Map<String, Message> map = _messageMap.get(instanceName);
+ if (map != null)
+ {
+ return map;
+ }
+ else
+ {
+ return Collections.emptyMap();
+ }
+ }
+
+ // public HealthStat getGlobalStats()
+ // {
+ // return _globalStats;
+ // }
+ //
+ // public PersistentStats getPersistentStats()
+ // {
+ // return _persistentStats;
+ // }
+ //
+ // public Alerts getAlerts()
+ // {
+ // return _alerts;
+ // }
+ //
+ // public AlertStatus getAlertStatus()
+ // {
+ // return _alertStatus;
+ // }
+ //
+ // public Map<String, HealthStat> getHealthStats(String instanceName)
+ // {
+ // Map<String, HealthStat> map = _healthStatMap.get(instanceName);
+ // if (map != null)
+ // {
+ // return map;
+ // } else
+ // {
+ // return Collections.emptyMap();
+ // }
+ // }
+
+ public StateModelDefinition getStateModelDef(String stateModelDefRef)
+ {
+
+ return _stateModelDefMap.get(stateModelDefRef);
+ }
+
+ public IdealState getIdealState(String resourceName)
+ {
+ return _idealStateMap.get(resourceName);
+ }
+
+ public Map<String, InstanceConfig> getInstanceConfigMap()
+ {
+ return _instanceConfigMap;
+ }
+
+ public Set<String> getDisabledInstancesForPartition(String partition)
+ {
+ Set<String> disabledInstancesSet = new HashSet<String>();
+ for (String instance : _instanceConfigMap.keySet())
+ {
+ InstanceConfig config = _instanceConfigMap.get(instance);
+ if (config.getInstanceEnabled() == false
+ || config.getInstanceEnabledForPartition(partition) == false)
+ {
+ disabledInstancesSet.add(instance);
+ }
+ }
+ return disabledInstancesSet;
+ }
+
+ public int getReplicas(String resourceName)
+ {
+ int replicas = -1;
+
+ if (_idealStateMap.containsKey(resourceName))
+ {
+ String replicasStr = _idealStateMap.get(resourceName).getReplicas();
+
+ if (replicasStr != null)
+ {
+ if (replicasStr.equals(StateModelToken.ANY_LIVEINSTANCE.toString()))
+ {
+ replicas = _liveInstanceMap.size();
+ }
+ else
+ {
+ try
+ {
+ replicas = Integer.parseInt(replicasStr);
+ }
+ catch (Exception e)
+ {
+ LOG.error("invalid replicas string: " + replicasStr);
+ }
+ }
+ }
+ else
+ {
+ LOG.error("idealState for resource: " + resourceName + " does NOT have replicas");
+ }
+ }
+ return replicas;
+ }
+
+ public ClusterConstraints getConstraint(ConstraintType type)
+ {
+ if (_constraintMap != null)
+ {
+ return _constraintMap.get(type.toString());
+ }
+ return null;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n");
+ sb.append("idealStateMap:" + _idealStateMap).append("\n");
+ sb.append("stateModelDefMap:" + _stateModelDefMap).append("\n");
+ sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
+ sb.append("messageMap:" + _messageMap).append("\n");
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
new file mode 100644
index 0000000..c7de9e6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class ClusterEvent
+{
+ private static final Logger logger = Logger.getLogger(ClusterEvent.class
+ .getName());
+ private final String _eventName;
+ private final Map<String, Object> _eventAttributeMap;
+
+ public ClusterEvent(String name)
+ {
+ _eventName = name;
+ _eventAttributeMap = new HashMap<String, Object>();
+ }
+
+ public void addAttribute(String attrName, Object attrValue)
+ {
+ if (logger.isTraceEnabled())
+ {
+ logger.trace("Adding attribute:" + attrName);
+ logger.trace(" attribute value:" + attrValue);
+ }
+
+ _eventAttributeMap.put(attrName, attrValue);
+ }
+
+ public String getName()
+ {
+ return _eventName;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends Object> T getAttribute(String attrName)
+ {
+ Object ret = _eventAttributeMap.get(attrName);
+ if (ret != null)
+ {
+ return (T) ret;
+ }
+ return null;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("name:"+ _eventName).append("\n");
+ for(String key:_eventAttributeMap.keySet()){
+ sb.append(key).append(":").append(_eventAttributeMap.get(key)).append("\n");
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
new file mode 100644
index 0000000..7dd0f28
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.LiveInstance;
+import org.apache.log4j.Logger;
+
+
+public class CompatibilityCheckStage extends AbstractBaseStage
+{
+ private static final Logger LOG = Logger
+ .getLogger(CompatibilityCheckStage.class.getName());
+
+ /**
+ * INCOMPATIBLE_MAP stores primary version pairs:
+ * {controllerPrimaryVersion, participantPrimaryVersion}
+ * that are incompatible
+ */
+ private static final Map<String, Boolean> INCOMPATIBLE_MAP;
+ static
+ {
+ Map<String, Boolean> map = new HashMap<String, Boolean>();
+ /**
+ * {controllerPrimaryVersion,participantPrimaryVersion} -> false
+ */
+ map.put("0.4,0.3", false);
+ INCOMPATIBLE_MAP = Collections.unmodifiableMap(map);
+ }
+
+ private String getPrimaryVersion(String version)
+ {
+ String[] splits = version.split("\\.");
+ if (splits == null || splits.length != 3)
+ {
+ return null;
+ }
+ return version.substring(0, version.lastIndexOf('.'));
+ }
+
+ private boolean isCompatible(String controllerVersion, String participantVersion)
+ {
+ if (participantVersion == null)
+ {
+ LOG.warn("Missing version of participant. Skip version check.");
+ return true;
+ }
+
+ // compare primary version
+ String controllerPrimaryVersion = getPrimaryVersion(controllerVersion);
+ String participantPrimaryVersion = getPrimaryVersion(participantVersion);
+ if (controllerPrimaryVersion != null && participantPrimaryVersion != null)
+ {
+ if (controllerPrimaryVersion.compareTo(participantPrimaryVersion) < 0)
+ {
+ LOG.info("Controller primary version is less than participant primary version.");
+ return false;
+ }
+ else
+ {
+ if (INCOMPATIBLE_MAP.containsKey(controllerPrimaryVersion + "," + participantPrimaryVersion))
+ {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void process(ClusterEvent event) throws Exception
+ {
+ HelixManager manager = event.getAttribute("helixmanager");
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+ if (manager == null || cache == null)
+ {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires HelixManager | DataCache");
+ }
+
+ String controllerVersion = manager.getVersion();
+ if (controllerVersion == null)
+ {
+ String errorMsg = "Missing version of controller: " + manager.getInstanceName()
+ + ". Pipeline will not continue.";
+ LOG.error(errorMsg);
+ throw new StageException(errorMsg);
+ }
+
+ Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
+ for (LiveInstance liveInstance : liveInstanceMap.values())
+ {
+ String participantVersion = liveInstance.getHelixVersion();
+ if (!isCompatible(controllerVersion, participantVersion))
+ {
+ String errorMsg = "cluster manager versions are incompatible; pipeline will not continue. "
+ + "controller:" + manager.getInstanceName() + ", controllerVersion:" + controllerVersion
+ + "; participant:" + liveInstance.getInstanceName() + ", participantVersion:" + participantVersion;
+ LOG.error(errorMsg);
+ throw new StageException(errorMsg);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
new file mode 100644
index 0000000..e88e0a6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.Message.MessageType;
+
+
+/**
+ * For each LiveInstances select currentState and message whose sessionId matches
+ * sessionId from LiveInstance Get Partition,State for all the resources computed in
+ * previous State [ResourceComputationStage]
+ *
+ * @author kgopalak
+ *
+ */
+public class CurrentStateComputationStage extends AbstractBaseStage
+{
+ @Override
+ public void process(ClusterEvent event) throws Exception
+ {
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+ Map<String, Resource> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+
+ if (cache == null || resourceMap == null)
+ {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires DataCache|RESOURCE");
+ }
+
+ Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+ CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+
+ for (LiveInstance instance : liveInstances.values())
+ {
+ String instanceName = instance.getInstanceName();
+ Map<String, Message> instanceMessages = cache.getMessages(instanceName);
+ for (Message message : instanceMessages.values())
+ {
+ if (!MessageType.STATE_TRANSITION.toString()
+ .equalsIgnoreCase(message.getMsgType()))
+ {
+ continue;
+ }
+ if (!instance.getSessionId().equals(message.getTgtSessionId()))
+ {
+ continue;
+ }
+ String resourceName = message.getResourceName();
+ Resource resource = resourceMap.get(resourceName);
+ if (resource == null)
+ {
+ continue;
+ }
+
+ if (!message.getGroupMessageMode())
+ {
+ String partitionName = message.getPartitionName();
+ Partition partition = resource.getPartition(partitionName);
+ if (partition != null)
+ {
+ currentStateOutput.setPendingState(resourceName,
+ partition,
+ instanceName,
+ message.getToState());
+ }
+ else
+ {
+ // log
+ }
+ }
+ else
+ {
+ List<String> partitionNames = message.getPartitionNames();
+ if (!partitionNames.isEmpty())
+ {
+ for (String partitionName : partitionNames)
+ {
+ Partition partition = resource.getPartition(partitionName);
+ if (partition != null)
+ {
+ currentStateOutput.setPendingState(resourceName,
+ partition,
+ instanceName,
+ message.getToState());
+ }
+ else
+ {
+ // log
+ }
+ }
+ }
+ }
+ }
+ }
+ for (LiveInstance instance : liveInstances.values())
+ {
+ String instanceName = instance.getInstanceName();
+
+ String clientSessionId = instance.getSessionId();
+ Map<String, CurrentState> currentStateMap =
+ cache.getCurrentState(instanceName, clientSessionId);
+ for (CurrentState currentState : currentStateMap.values())
+ {
+
+ if (!instance.getSessionId().equals(currentState.getSessionId()))
+ {
+ continue;
+ }
+ String resourceName = currentState.getResourceName();
+ String stateModelDefName = currentState.getStateModelDefRef();
+ Resource resource = resourceMap.get(resourceName);
+ if (resource == null)
+ {
+ continue;
+ }
+ if (stateModelDefName != null)
+ {
+ currentStateOutput.setResourceStateModelDef(resourceName, stateModelDefName);
+ }
+
+ currentStateOutput.setBucketSize(resourceName, currentState.getBucketSize());
+
+ Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
+ for (String partitionName : partitionStateMap.keySet())
+ {
+ Partition partition = resource.getPartition(partitionName);
+ if (partition != null)
+ {
+ currentStateOutput.setCurrentState(resourceName,
+ partition,
+ instanceName,
+ currentState.getState(partitionName));
+
+ }
+ else
+ {
+ // log
+ }
+ }
+ }
+ }
+ event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
new file mode 100644
index 0000000..dde5949
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -0,0 +1,205 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Partition;
+
+
+public class CurrentStateOutput
+{
+ private final Map<String, Map<Partition, Map<String, String>>> _currentStateMap;
+ private final Map<String, Map<Partition, Map<String, String>>> _pendingStateMap;
+ private final Map<String, String> _resourceStateModelMap;
+ private final Map<String, CurrentState> _curStateMetaMap;
+
+ public CurrentStateOutput()
+ {
+ _currentStateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
+ _pendingStateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
+ _resourceStateModelMap = new HashMap<String, String>();
+ _curStateMetaMap = new HashMap<String, CurrentState>();
+
+ }
+
+ public void setResourceStateModelDef(String resourceName, String stateModelDefName)
+ {
+ _resourceStateModelMap.put(resourceName, stateModelDefName);
+ }
+
+ public String getResourceStateModelDef(String resourceName)
+ {
+ return _resourceStateModelMap.get(resourceName);
+ }
+
+ public void setBucketSize(String resource, int bucketSize)
+ {
+ CurrentState curStateMeta = _curStateMetaMap.get(resource);
+ if (curStateMeta == null)
+ {
+ curStateMeta = new CurrentState(resource);
+ _curStateMetaMap.put(resource, curStateMeta);
+ }
+ curStateMeta.setBucketSize(bucketSize);
+ }
+
+ public int getBucketSize(String resource)
+ {
+ int bucketSize = 0;
+ CurrentState curStateMeta = _curStateMetaMap.get(resource);
+ if (curStateMeta != null)
+ {
+ bucketSize = curStateMeta.getBucketSize();
+ }
+
+ return bucketSize;
+ }
+
+ public void setCurrentState(String resourceName,
+ Partition partition,
+ String instanceName,
+ String state)
+ {
+ if (!_currentStateMap.containsKey(resourceName))
+ {
+ _currentStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+ }
+ if (!_currentStateMap.get(resourceName).containsKey(partition))
+ {
+ _currentStateMap.get(resourceName).put(partition, new HashMap<String, String>());
+ }
+ _currentStateMap.get(resourceName).get(partition).put(instanceName, state);
+ }
+
+ public void setPendingState(String resourceName,
+ Partition partition,
+ String instanceName,
+ String state)
+ {
+ if (!_pendingStateMap.containsKey(resourceName))
+ {
+ _pendingStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+ }
+ if (!_pendingStateMap.get(resourceName).containsKey(partition))
+ {
+ _pendingStateMap.get(resourceName).put(partition, new HashMap<String, String>());
+ }
+ _pendingStateMap.get(resourceName).get(partition).put(instanceName, state);
+ }
+
+ /**
+ * given (resource, partition, instance), returns currentState
+ *
+ * @param resourceName
+ * @param partition
+ * @param instanceName
+ * @return
+ */
+ public String getCurrentState(String resourceName,
+ Partition partition,
+ String instanceName)
+ {
+ Map<Partition, Map<String, String>> map = _currentStateMap.get(resourceName);
+ if (map != null)
+ {
+ Map<String, String> instanceStateMap = map.get(partition);
+ if (instanceStateMap != null)
+ {
+ return instanceStateMap.get(instanceName);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * given (resource, partition, instance), returns toState
+ *
+ * @param resourceName
+ * @param partition
+ * @param instanceName
+ * @return
+ */
+ public String getPendingState(String resourceName,
+ Partition partition,
+ String instanceName)
+ {
+ Map<Partition, Map<String, String>> map = _pendingStateMap.get(resourceName);
+ if (map != null)
+ {
+ Map<String, String> instanceStateMap = map.get(partition);
+ if (instanceStateMap != null)
+ {
+ return instanceStateMap.get(instanceName);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * given (resource, partition), returns (instance->currentState) map
+ *
+ * @param resourceName
+ * @param partition
+ * @return
+ */
+ public Map<String, String> getCurrentStateMap(String resourceName, Partition partition)
+ {
+ if (_currentStateMap.containsKey(resourceName))
+ {
+ Map<Partition, Map<String, String>> map = _currentStateMap.get(resourceName);
+ if (map.containsKey(partition))
+ {
+ return map.get(partition);
+ }
+ }
+ return Collections.emptyMap();
+ }
+
+ /**
+ * given (resource, partition), returns (instance->toState) map
+ *
+ * @param resourceName
+ * @param partition
+ * @return
+ */
+ public Map<String, String> getPendingStateMap(String resourceName, Partition partition)
+ {
+ if (_pendingStateMap.containsKey(resourceName))
+ {
+ Map<Partition, Map<String, String>> map = _pendingStateMap.get(resourceName);
+ if (map.containsKey(partition))
+ {
+ return map.get(partition);
+ }
+ }
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("current state= ").append(_currentStateMap);
+ sb.append(", pending state= ").append(_pendingStateMap);
+ return sb.toString();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
new file mode 100644
index 0000000..5ae21a8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -0,0 +1,122 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.log4j.Logger;
+
+
+public class ExternalViewComputeStage extends AbstractBaseStage
+{
+ private static Logger log = Logger.getLogger(ExternalViewComputeStage.class);
+
+ @Override
+ public void process(ClusterEvent event) throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+ log.info("START ExternalViewComputeStage.process()");
+
+ HelixManager manager = event.getAttribute("helixmanager");
+ Map<String, Resource> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+
+ if (manager == null || resourceMap == null || cache == null)
+ {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires ClusterManager|RESOURCES|DataCache");
+ }
+
+ HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+
+ CurrentStateOutput currentStateOutput =
+ event.getAttribute(AttributeName.CURRENT_STATE.toString());
+
+ List<ExternalView> newExtViews = new ArrayList<ExternalView>();
+ List<PropertyKey> keys = new ArrayList<PropertyKey>();
+
+ for (String resourceName : resourceMap.keySet())
+ {
+ ExternalView view = new ExternalView(resourceName);
+ view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
+
+ Resource resource = resourceMap.get(resourceName);
+ for (Partition partition : resource.getPartitions())
+ {
+ Map<String, String> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resourceName, partition);
+ if (currentStateMap != null && currentStateMap.size() > 0)
+ {
+ // Set<String> disabledInstances
+ // = cache.getDisabledInstancesForResource(resource.toString());
+ for (String instance : currentStateMap.keySet())
+ {
+ // if (!disabledInstances.contains(instance))
+ // {
+ view.setState(partition.getPartitionName(),
+ instance,
+ currentStateMap.get(instance));
+ // }
+ }
+ }
+ }
+ // Update cluster status monitor mbean
+ ClusterStatusMonitor clusterStatusMonitor =
+ (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+ if (clusterStatusMonitor != null)
+ {
+ clusterStatusMonitor.onExternalViewChange(view,
+ cache._idealStateMap.get(view.getResourceName()));
+ }
+
+ // compare the new external view with current one, set only on different
+ Map<String, ExternalView> curExtViews =
+ dataAccessor.getChildValuesMap(manager.getHelixDataAccessor()
+ .keyBuilder()
+ .externalViews());
+
+ ExternalView curExtView = curExtViews.get(resourceName);
+ if (curExtView == null || !curExtView.getRecord().equals(view.getRecord()))
+ {
+ keys.add(manager.getHelixDataAccessor().keyBuilder().externalView(resourceName));
+ newExtViews.add(view);
+ // dataAccessor.setProperty(PropertyType.EXTERNALVIEW, view,
+ // resourceName);
+ }
+ }
+
+ if (newExtViews.size() > 0)
+ {
+ dataAccessor.setChildren(keys, newExtViews);
+ }
+
+ long endTime = System.currentTimeMillis();
+ log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
new file mode 100644
index 0000000..bae7d4d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
@@ -0,0 +1,131 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.AlertStatus;
+import org.apache.helix.model.Alerts;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.PersistentStats;
+
+
+public class HealthDataCache
+{
+ Map<String, LiveInstance> _liveInstanceMap;
+
+ Map<String, Map<String, HealthStat>> _healthStatMap;
+ HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
+ PersistentStats _persistentStats;
+ Alerts _alerts;
+ AlertStatus _alertStatus;
+
+ public boolean refresh(DataAccessor accessor)
+ {
+ _liveInstanceMap = accessor.getChildValuesMap(LiveInstance.class,
+ PropertyType.LIVEINSTANCES);
+
+ Map<String, Map<String, HealthStat>> hsMap = new HashMap<String, Map<String, HealthStat>>();
+
+ for (String instanceName : _liveInstanceMap.keySet())
+ {
+ // xxx clearly getting znodes for the instance here...so get the
+ // timestamp!
+
+ hsMap.put(instanceName, accessor.getChildValuesMap(HealthStat.class,
+ PropertyType.HEALTHREPORT, instanceName));
+ }
+ _healthStatMap = Collections.unmodifiableMap(hsMap);
+ _persistentStats = accessor.getProperty(PersistentStats.class,
+ PropertyType.PERSISTENTSTATS);
+ _alerts = accessor.getProperty(Alerts.class, PropertyType.ALERTS);
+ _alertStatus = accessor.getProperty(AlertStatus.class,
+ PropertyType.ALERT_STATUS);
+
+ return true;
+ }
+
+ public HealthStat getGlobalStats()
+ {
+ return _globalStats;
+ }
+
+ public PersistentStats getPersistentStats()
+ {
+ return _persistentStats;
+ }
+
+ public Alerts getAlerts()
+ {
+ return _alerts;
+ }
+
+ public AlertStatus getAlertStatus()
+ {
+ return _alertStatus;
+ }
+
+ public Map<String, HealthStat> getHealthStats(String instanceName)
+ {
+ Map<String, HealthStat> map = _healthStatMap.get(instanceName);
+ if (map != null)
+ {
+ return map;
+ } else
+ {
+ return Collections.emptyMap();
+ }
+ }
+
+ public Map<String, LiveInstance> getLiveInstances()
+ {
+ return _liveInstanceMap;
+ }
+
+ public boolean refresh(HelixDataAccessor accessor)
+ {
+ Builder keyBuilder = accessor.keyBuilder();
+ _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+
+ Map<String, Map<String, HealthStat>> hsMap = new HashMap<String, Map<String, HealthStat>>();
+
+ for (String instanceName : _liveInstanceMap.keySet())
+ {
+ // xxx clearly getting znodes for the instance here...so get the
+ // timestamp!
+
+ Map<String, HealthStat> childValuesMap = accessor
+ .getChildValuesMap(keyBuilder.healthReports(instanceName));
+ hsMap.put(instanceName, childValuesMap);
+ }
+ _healthStatMap = Collections.unmodifiableMap(hsMap);
+ _persistentStats = accessor.getProperty(keyBuilder.persistantStat());
+ _alerts = accessor.getProperty(keyBuilder.alerts());
+ _alertStatus = accessor.getProperty(keyBuilder.alertStatus());
+
+ return true;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
new file mode 100644
index 0000000..7002d29
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+
+
+public class MessageGenerationOutput
+{
+
+ private final Map<String, Map<Partition, List<Message>>> _messagesMap;
+
+ public MessageGenerationOutput()
+ {
+ _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
+
+ }
+
+ public void addMessage(String resourceName, Partition resource,
+ Message message)
+ {
+ if (!_messagesMap.containsKey(resourceName))
+ {
+ _messagesMap.put(resourceName,
+ new HashMap<Partition, List<Message>>());
+ }
+ if (!_messagesMap.get(resourceName).containsKey(resource))
+ {
+ _messagesMap.get(resourceName).put(resource,
+ new ArrayList<Message>());
+
+ }
+ _messagesMap.get(resourceName).get(resource).add(message);
+
+ }
+
+ public List<Message> getMessages(String resourceName,
+ Partition resource)
+ {
+ Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
+ if (map != null)
+ {
+ return map.get(resource);
+ }
+ return Collections.emptyList();
+
+ }
+
+ @Override
+ public String toString()
+ {
+ return _messagesMap.toString();
+ }
+}