You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:40 UTC
[10/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZKPropertyTransferServer.java b/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZKPropertyTransferServer.java
deleted file mode 100644
index a7555ec..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZKPropertyTransferServer.java
+++ /dev/null
@@ -1,262 +0,0 @@
-package com.linkedin.helix.controller.restlet;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.log4j.Logger;
-import org.restlet.Component;
-import org.restlet.Context;
-import org.restlet.data.Protocol;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkClient;
-/**
- * Controller side restlet server that receives ZNRecordUpdate requests from
- * clients, and batch the ZNRecordUpdate and apply them to zookeeper. This is
- * to optimize the concurrency level of zookeeper access for ZNRecord updates
- * that does not require real-time, like message handling status updates and
- * healthcheck reports.
- *
- * As one server will be used by multiple helix controllers that runs on the same machine,
- * This class is designed as a singleton. Application is responsible to call init()
- * and shutdown() on the getInstance().
- * */
-public class ZKPropertyTransferServer
-{
- public static final String PORT = "port";
- public static String RESTRESOURCENAME = "ZNRecordUpdates";
- public static final String SERVER = "ZKPropertyTransferServer";
-
- // Frequency period for the ZNRecords are batch written to zookeeper
- public static int PERIOD = 10 * 1000;
- // If the buffered ZNRecord updates exceed the limit, do a zookeeper batch update.
- public static int MAX_UPDATE_LIMIT = 10000;
- private static Logger LOG = Logger.getLogger(ZKPropertyTransferServer.class);
-
- int _localWebservicePort;
- String _webserviceUrl;
- ZkBaseDataAccessor<ZNRecord> _accessor;
- String _zkAddress;
-
- AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef
- = new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
-
- boolean _initialized = false;
- boolean _shutdownFlag = false;
- Component _component = null;
- Timer _timer = null;
-
- /**
- * Timertask for zookeeper batched writes
- * */
- class ZKPropertyTransferTask extends TimerTask
- {
- @Override
- public void run()
- {
- try
- {
- sendData();
- }
- catch(Throwable t)
- {
- LOG.error("", t);
- }
-
- }
- }
-
- void sendData()
- {
- LOG.info("ZKPropertyTransferServer transfering data to zookeeper");
- ConcurrentHashMap<String, ZNRecordUpdate> updateCache = null;
-
- synchronized(_dataBufferRef)
- {
- updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
-
- if(updateCache != null)
- {
- List<String> paths = new ArrayList<String>();
- List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
- List<ZNRecord> vals = new ArrayList<ZNRecord>();
- // BUGBUG : what if the instance is dropped?
- for(ZNRecordUpdate holder : updateCache.values())
- {
- paths.add(holder.getPath());
- updaters.add(holder.getZNRecordUpdater());
- vals.add(holder.getRecord());
- }
- // Batch write the accumulated updates into zookeeper
- long timeStart = System.currentTimeMillis();
- if(paths.size() > 0)
- {
- _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
- }
- LOG.info("ZKPropertyTransferServer updated " + vals.size() + " records in " + (System.currentTimeMillis() - timeStart) + " ms");
- }
- else
- {
- LOG.warn("null _dataQueueRef. Should be in the beginning only");
- }
- }
-
- static ZKPropertyTransferServer _instance = new ZKPropertyTransferServer();
-
- private ZKPropertyTransferServer()
- {
- _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
-
- public static ZKPropertyTransferServer getInstance()
- {
- return _instance;
- }
-
- public boolean isInitialized()
- {
- return _initialized;
- }
-
- public void init(int localWebservicePort, String zkAddress)
- {
- if(!_initialized && !_shutdownFlag)
- {
- LOG.error("Initializing with port " + localWebservicePort + " zkAddress: " + zkAddress);
- _localWebservicePort = localWebservicePort;
- ZkClient zkClient = new ZkClient(zkAddress);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- _accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
- _zkAddress = zkAddress;
- startServer();
- }
- else
- {
- LOG.error("Already initialized with port " + _localWebservicePort + " shutdownFlag: " + _shutdownFlag);
- }
- }
-
- public String getWebserviceUrl()
- {
- if(!_initialized || _shutdownFlag)
- {
- LOG.debug("inited:" + _initialized + " shutdownFlag:"+_shutdownFlag+" , return");
- return null;
- }
- return _webserviceUrl;
- }
-
- /** Add an ZNRecordUpdate to the change queue.
- * Called by the webservice front-end.
- *
- */
- void enqueueData(ZNRecordUpdate e)
- {
- if(!_initialized || _shutdownFlag)
- {
- LOG.error("zkDataTransferServer inited:" + _initialized
- + " shutdownFlag:"+_shutdownFlag+" , return");
- return;
- }
- // Do local merge if receive multiple update on the same path
- synchronized(_dataBufferRef)
- {
- e.getRecord().setSimpleField(SERVER, _webserviceUrl);
- if(_dataBufferRef.get().containsKey(e.getPath()))
- {
- ZNRecord oldVal = _dataBufferRef.get().get(e.getPath()).getRecord();
- oldVal = e.getZNRecordUpdater().update(oldVal);
- _dataBufferRef.get().get(e.getPath())._record = oldVal;
- }
- else
- {
- _dataBufferRef.get().put(e.getPath(), e);
- }
- }
- if(_dataBufferRef.get().size() > MAX_UPDATE_LIMIT)
- {
- sendData();
- }
- }
-
- void startServer()
- {
- LOG.info("zkDataTransferServer starting on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
-
- _component = new Component();
-
- _component.getServers().add(Protocol.HTTP, _localWebservicePort);
- Context applicationContext = _component.getContext().createChildContext();
- applicationContext.getAttributes().put(SERVER, this);
- applicationContext.getAttributes().put(PORT, "" + _localWebservicePort);
- ZkPropertyTransferApplication application = new ZkPropertyTransferApplication(
- applicationContext);
- // Attach the application to the component and start it
- _component.getDefaultHost().attach(application);
- _timer = new Timer(true);
- _timer.schedule(new ZKPropertyTransferTask(), PERIOD , PERIOD);
-
- try
- {
- _webserviceUrl
- = "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":" + _localWebservicePort
- + "/" + RESTRESOURCENAME;
- _component.start();
- _initialized = true;
- }
- catch (Exception e)
- {
- LOG.error("", e);
- }
- LOG.info("zkDataTransferServer started on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
- }
-
- public void shutdown()
- {
- if(_shutdownFlag)
- {
- LOG.error("ZKPropertyTransferServer already has been shutdown...");
- return;
- }
- LOG.info("zkDataTransferServer shuting down on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
- if(_timer != null)
- {
- _timer.cancel();
- }
- if(_component != null)
- {
- try
- {
- _component.stop();
- }
- catch (Exception e)
- {
- LOG.error("", e);
- }
- }
- _shutdownFlag = true;
- }
-
- public void reset()
- {
- if(_shutdownFlag == true)
- {
- _shutdownFlag = false;
- _initialized = false;
- _component = null;
- _timer = null;
- _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdate.java b/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdate.java
deleted file mode 100644
index 727ab8a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdate.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package com.linkedin.helix.controller.restlet;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.ZNRecordUpdater;
-/**
- * Unit of transfered ZNRecord updates. Contains the ZNRecord Value, zkPath
- * to store the update value, and the property type (used to merge the ZNRecord)
- * For ZNRecord subtraction, it is currently not supported yet.
- * */
-public class ZNRecordUpdate
-{
- public enum OpCode
- {
- // TODO: create is not supported; but update will create if not exist
- CREATE,
- UPDATE,
- SET
- }
- final String _path;
- ZNRecord _record;
- final OpCode _code;
-
- @JsonCreator
- public ZNRecordUpdate(@JsonProperty("path")String path,
- @JsonProperty("opcode")OpCode code,
- @JsonProperty("record")ZNRecord record)
- {
- _path = path;
- _record = record;
- _code = code;
- }
-
- public String getPath()
- {
- return _path;
- }
-
- public ZNRecord getRecord()
- {
- return _record;
- }
-
- public OpCode getOpcode()
- {
- return _code;
- }
-
- @JsonIgnore(true)
- public DataUpdater<ZNRecord> getZNRecordUpdater()
- {
- if(_code == OpCode.SET)
-
- {
- return new ZNRecordUpdater(_record)
- {
- @Override
- public ZNRecord update(ZNRecord current)
- {
- return _record;
- }
- };
- }
- else if ((_code == OpCode.UPDATE))
- {
- return new ZNRecordUpdater(_record);
- }
- else
- {
- throw new UnsupportedOperationException("Not supported : " + _code);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdateResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdateResource.java b/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdateResource.java
deleted file mode 100644
index ed955a7..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZNRecordUpdateResource.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package com.linkedin.helix.controller.restlet;
-
-import java.io.StringReader;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-import org.restlet.data.Form;
-import org.restlet.data.MediaType;
-import org.restlet.data.Status;
-import org.restlet.resource.Representation;
-import org.restlet.resource.Resource;
-
-/**
- * REST resource for ZkPropertyTransfer server to receive PUT requests
- * that submits ZNRecordUpdates
- * */
-public class ZNRecordUpdateResource extends Resource
-{
- public static final String UPDATEKEY = "ZNRecordUpdate";
- private static Logger LOG = Logger.getLogger(ZNRecordUpdateResource.class);
- @Override
- public boolean allowGet()
- {
- return false;
- }
-
- @Override
- public boolean allowPost()
- {
- return false;
- }
-
- @Override
- public boolean allowPut()
- {
- return true;
- }
-
- @Override
- public boolean allowDelete()
- {
- return false;
- }
-
- @Override
- public void storeRepresentation(Representation entity)
- {
- try
- {
- ZKPropertyTransferServer server = ZKPropertyTransferServer.getInstance();
-
- Form form = new Form(entity);
- String jsonPayload = form.getFirstValue(UPDATEKEY, true);
-
- // Parse the map from zkPath --> ZNRecordUpdate from the payload
- StringReader sr = new StringReader(jsonPayload);
- ObjectMapper mapper = new ObjectMapper();
- TypeReference<TreeMap<String, ZNRecordUpdate>> typeRef =
- new TypeReference<TreeMap<String, ZNRecordUpdate>>()
- {
- };
- Map<String, ZNRecordUpdate> holderMap = mapper.readValue(sr, typeRef);
- // Enqueue the ZNRecordUpdate for sending
- for(ZNRecordUpdate holder : holderMap.values())
- {
- server.enqueueData(holder);
- LOG.info("Received " + holder.getPath() + " from " + getRequest().getClientInfo().getAddress());
- }
- getResponse().setStatus(Status.SUCCESS_OK);
- }
- catch(Exception e)
- {
- LOG.error("", e);
- getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferApplication.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferApplication.java b/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferApplication.java
deleted file mode 100644
index 3ac88ed..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferApplication.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.linkedin.helix.controller.restlet;
-
-import org.restlet.Application;
-import org.restlet.Context;
-import org.restlet.Restlet;
-import org.restlet.Router;
-
-/**
- * Restlet application for ZkPropertyTransfer server
- * */
-public class ZkPropertyTransferApplication extends Application
-{
- public ZkPropertyTransferApplication()
- {
- super();
- }
-
- public ZkPropertyTransferApplication(Context context)
- {
- super(context);
- }
-
- @Override
- public Restlet createRoot()
- {
- Router router = new Router(getContext());
- router.attach("/" + ZKPropertyTransferServer.RESTRESOURCENAME, ZNRecordUpdateResource.class);
- return router;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferClient.java b/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferClient.java
deleted file mode 100644
index 9cb3a25..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/ZkPropertyTransferClient.java
+++ /dev/null
@@ -1,178 +0,0 @@
-package com.linkedin.helix.controller.restlet;
-
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.restlet.Client;
-import org.restlet.data.MediaType;
-import org.restlet.data.Method;
-import org.restlet.data.Protocol;
-import org.restlet.data.Reference;
-import org.restlet.data.Request;
-import org.restlet.data.Response;
-import org.restlet.data.Status;
-import com.linkedin.helix.ZNRecord;
-
-public class ZkPropertyTransferClient
-{
- private static Logger LOG = Logger.getLogger(ZkPropertyTransferClient.class);
- public static final int DEFAULT_MAX_CONCURRENTTASKS = 2;
- public static int SEND_PERIOD = 10 * 1000;
-
- public static final String USE_PROPERTYTRANSFER = "UsePropertyTransfer";
-
- int _maxConcurrentTasks;
- ExecutorService _executorService;
- Client[] _clients;
- AtomicInteger _requestCount = new AtomicInteger(0);
-
- // ZNRecord update buffer: key is the zkPath, value is the ZNRecordUpdate
- AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef
- = new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
- Timer _timer;
- volatile String _webServiceUrl = "";
-
- public ZkPropertyTransferClient(int maxConcurrentTasks)
- {
- _maxConcurrentTasks = maxConcurrentTasks;
- _executorService = Executors.newFixedThreadPool(_maxConcurrentTasks);
- _clients = new Client[_maxConcurrentTasks];
- for(int i = 0; i< _clients.length; i++)
- {
- _clients[i] = new Client(Protocol.HTTP);
- }
- _timer = new Timer(true);
- _timer.schedule(new SendZNRecordTimerTask(), SEND_PERIOD, SEND_PERIOD);
- _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
-
- class SendZNRecordTimerTask extends TimerTask
- {
- @Override
- public void run()
- {
- sendUpdateBatch();
- }
- }
-
- public void enqueueZNRecordUpdate(ZNRecordUpdate update, String webserviceUrl)
- {
- try
- {
- LOG.info("Enqueue update to " + update.getPath() + " opcode: " + update.getOpcode() + " to " + webserviceUrl);
- _webServiceUrl = webserviceUrl;
- update.getRecord().setSimpleField(USE_PROPERTYTRANSFER, "true");
- synchronized(_dataBufferRef)
- {
- if(_dataBufferRef.get().containsKey(update._path))
- {
- ZNRecord oldVal = _dataBufferRef.get().get(update.getPath()).getRecord();
- oldVal = update.getZNRecordUpdater().update(oldVal);
- _dataBufferRef.get().get(update.getPath())._record = oldVal;
- }
- else
- {
- _dataBufferRef.get().put(update.getPath(), update);
- }
- }
- }
- catch(Exception e)
- {
- LOG.error("", e);
- }
- }
-
- void sendUpdateBatch()
- {
- LOG.debug("Actual sending update with " + _dataBufferRef.get().size() + " updates to " + _webServiceUrl);
- Map<String, ZNRecordUpdate> updateCache = null;
-
- synchronized(_dataBufferRef)
- {
- updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
-
- if(updateCache != null && updateCache.size() > 0)
- {
- ZNRecordUpdateUploadTask task = new ZNRecordUpdateUploadTask(updateCache, _webServiceUrl, _clients[_requestCount.intValue() % _maxConcurrentTasks]);
- _requestCount.incrementAndGet();
- _executorService.submit(task);
- LOG.trace("Queue size :" + ((ThreadPoolExecutor)_executorService).getQueue().size());
- }
- }
-
- public void shutdown()
- {
- LOG.info("Shutting down ZkPropertyTransferClient");
- _executorService.shutdown();
- _timer.cancel();
- for(Client client: _clients)
- {
- try
- {
- client.stop();
- }
- catch (Exception e)
- {
- LOG.error("", e);
- }
- }
- }
-
- class ZNRecordUpdateUploadTask implements Callable<Void>
- {
- Map<String, ZNRecordUpdate> _updateMap;
- String _webServiceUrl;
- Client _client;
-
- ZNRecordUpdateUploadTask(Map<String, ZNRecordUpdate> update, String webserviceUrl, Client client)
- {
- _updateMap = update;
- _webServiceUrl = webserviceUrl;
- _client = client;
- }
-
- @Override
- public Void call() throws Exception
- {
- LOG.debug("Actual sending update with " + _updateMap.size() + " updates to " + _webServiceUrl);
- long time = System.currentTimeMillis();
- Reference resourceRef = new Reference(_webServiceUrl);
- Request request = new Request(Method.PUT, resourceRef);
-
- ObjectMapper mapper = new ObjectMapper();
- StringWriter sw = new StringWriter();
- try
- {
- mapper.writeValue(sw, _updateMap);
- }
- catch (Exception e)
- {
- LOG.error("",e);
- }
-
- request.setEntity(
- ZNRecordUpdateResource.UPDATEKEY + "=" + sw, MediaType.APPLICATION_ALL);
- // This is a sync call. See com.noelios.restlet.http.StreamClientCall.sendRequest()
- Response response = _client.handle(request);
-
- if(response.getStatus().getCode() != Status.SUCCESS_OK.getCode())
- {
- LOG.error("Status : " + response.getStatus());
- }
- LOG.info("Using time : " + (System.currentTimeMillis() - time));
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/restlet/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/package-info.java b/helix-core/src/main/java/com/linkedin/helix/controller/restlet/package-info.java
deleted file mode 100644
index 777a09f..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/restlet/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Restlet server for Helix controller
- *
- */
-package com.linkedin.helix.controller.restlet;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/AttributeName.java
deleted file mode 100644
index 7cd9383..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/AttributeName.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-public enum AttributeName
-{
- RESOURCES,
- BEST_POSSIBLE_STATE,
- CURRENT_STATE,
- MESSAGES_ALL,
- MESSAGES_SELECTED,
- MESSAGES_THROTTLE,
- LOCAL_STATE
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateCalcStage.java
deleted file mode 100644
index 64f7f91..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateCalcStage.java
+++ /dev/null
@@ -1,609 +0,0 @@
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixConstants.StateModelToken;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.josql.JsqlQueryListProcessor;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.IdealState.IdealStateModeProperty;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-import com.linkedin.helix.model.StateModelDefinition;
-
-/**
- * For partition compute best possible (instance,state) pair based on
- * IdealState,StateModel,LiveInstance
- *
- * @author kgopalak
- *
- */
-// TODO: refactor this
-public class BestPossibleStateCalcStage extends AbstractBaseStage
-{
- private static final Logger logger =
- Logger.getLogger(BestPossibleStateCalcStage.class.getName());
-
- @Override
- public void process(ClusterEvent event) throws Exception
- {
- long startTime = System.currentTimeMillis();
- logger.info("START BestPossibleStateCalcStage.process()");
-
- CurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
- Map<String, Resource> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-
- if (currentStateOutput == null || resourceMap == null || cache == null)
- {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires CURRENT_STATE|RESOURCES|DataCache");
- }
-
- BestPossibleStateOutput bestPossibleStateOutput =
- compute(event, resourceMap, currentStateOutput);
- event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(),
- bestPossibleStateOutput);
-
- long endTime = System.currentTimeMillis();
- logger.info("END BestPossibleStateCalcStage.process(). took: "
- + (endTime - startTime) + " ms");
- }
-
- private BestPossibleStateOutput compute(ClusterEvent event,
- Map<String, Resource> resourceMap,
- CurrentStateOutput currentStateOutput)
- {
- // for each ideal state
- // read the state model def
- // for each resource
- // get the preference list
- // for each instanceName check if its alive then assign a state
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- HelixManager manager = event.getAttribute("helixmanager");
-
- BestPossibleStateOutput output = new BestPossibleStateOutput();
-
- for (String resourceName : resourceMap.keySet())
- {
- logger.debug("Processing resource:" + resourceName);
-
- Resource resource = resourceMap.get(resourceName);
- // Ideal state may be gone. In that case we need to get the state model name
- // from the current state
- IdealState idealState = cache.getIdealState(resourceName);
-
- String stateModelDefName;
-
- if (idealState == null)
- {
- // if ideal state is deleted, use an empty one
- logger.info("resource:" + resourceName + " does not exist anymore");
- stateModelDefName = currentStateOutput.getResourceStateModelDef(resourceName);
- idealState = new IdealState(resourceName);
- }
- else
- {
- stateModelDefName = idealState.getStateModelDefRef();
- }
-
- StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
- if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO_REBALANCE)
- {
- calculateAutoBalancedIdealState(cache,
- idealState,
- stateModelDef,
- currentStateOutput);
- }
-
- // For idealstate that has rebalancing timer and is in AUTO mode, we will run jsql
- // queries to calculate the
- // preference list
-
- Map<String, List<String>> queryPartitionPriorityLists = null;
- if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO
- && idealState.getRebalanceTimerPeriod() > 0)
- {
- if (manager != null)
- {
- queryPartitionPriorityLists =
- calculatePartitionPriorityListWithQuery(manager, idealState);
- }
- }
-
- for (Partition partition : resource.getPartitions())
- {
- Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceName, partition);
-
- Map<String, String> bestStateForPartition;
- Set<String> disabledInstancesForPartition =
- cache.getDisabledInstancesForPartition(partition.toString());
-
- if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
- {
- Map<String, String> idealStateMap =
- idealState.getInstanceStateMap(partition.getPartitionName());
- bestStateForPartition =
- computeCustomizedBestStateForPartition(cache,
- stateModelDef,
- idealStateMap,
- currentStateMap,
- disabledInstancesForPartition);
- }
- else
- // both AUTO and AUTO_REBALANCE mode
- {
- List<String> instancePreferenceList =
- getPreferenceList(cache, partition, idealState, stateModelDef);
- if (queryPartitionPriorityLists != null)
- {
- String partitionName = partition.getPartitionName();
- if (queryPartitionPriorityLists.containsKey(partitionName))
- {
- List<String> queryInstancePreferenceList =
- queryPartitionPriorityLists.get(partitionName);
- // For instances that is not included in the queryInstancePreferenceList,
- // add them to the end of the list
- for (String instanceName : instancePreferenceList)
- {
- if (!queryInstancePreferenceList.contains(instanceName))
- {
- queryInstancePreferenceList.add(instanceName);
- }
- }
- instancePreferenceList = queryInstancePreferenceList;
- }
- }
- bestStateForPartition =
- computeAutoBestStateForPartition(cache,
- stateModelDef,
- instancePreferenceList,
- currentStateMap,
- disabledInstancesForPartition);
- }
- output.setState(resourceName, partition, bestStateForPartition);
- }
- }
- return output;
- }
-
- Map<String, List<String>> calculatePartitionPriorityListWithQuery(HelixManager manager,
- IdealState idealState)
- {
- // Read queries from the resource config
- String querys = idealState.getRecord().getSimpleField(IdealState.QUERY_LIST);
- if (querys == null)
- {
- logger.warn("IdealState " + idealState.getResourceName()
- + " does not have query list");
- return null;
- }
- try
- {
- List<String> queryList = Arrays.asList(querys.split(";"));
- List<ZNRecord> resultList =
- JsqlQueryListProcessor.executeQueryList(manager.getHelixDataAccessor(),
- manager.getClusterName(),
- queryList);
- Map<String, List<String>> priorityLists = new TreeMap<String, List<String>>();
- for (ZNRecord result : resultList)
- {
- String partition = result.getSimpleField("partition");
- String instance = result.getSimpleField("instance");
- if (instance.equals("") || partition.equals(""))
- {
- continue;
- }
-
- if (!priorityLists.containsKey(partition))
- {
- priorityLists.put(partition, new ArrayList<String>());
- }
- priorityLists.get(partition).add(instance);
- }
- return priorityLists;
- }
- catch (Exception e)
- {
- logger.error("", e);
- return null;
- }
-
- }
-
- /**
- * Compute best state for resource in AUTO_REBALANCE ideal state mode. the algorithm
- * will make sure that the master partition are evenly distributed; Also when instances
- * are added / removed, the amount of diff in master partitions are minimized
- *
- * @param cache
- * @param idealState
- * @param instancePreferenceList
- * @param stateModelDef
- * @param currentStateOutput
- * @return
- */
- private void calculateAutoBalancedIdealState(ClusterDataCache cache,
- IdealState idealState,
- StateModelDefinition stateModelDef,
- CurrentStateOutput currentStateOutput)
- {
- String topStateValue = stateModelDef.getStatesPriorityList().get(0);
- Set<String> liveInstances = cache._liveInstanceMap.keySet();
- // Obtain replica number
- int replicas = 1;
- try
- {
- replicas = Integer.parseInt(idealState.getReplicas());
- }
- catch (Exception e)
- {
- logger.error("", e);
- }
- // Init for all partitions with empty list
- Map<String, List<String>> defaultListFields = new TreeMap<String, List<String>>();
- List<String> emptyList = new ArrayList<String>(0);
- for (String partition : idealState.getPartitionSet())
- {
- defaultListFields.put(partition, emptyList);
- }
- idealState.getRecord().setListFields(defaultListFields);
- // Return if no live instance
- if (liveInstances.size() == 0)
- {
- logger.info("No live instances, return. Idealstate : "
- + idealState.getResourceName());
- return;
- }
- Map<String, List<String>> masterAssignmentMap = new HashMap<String, List<String>>();
- for (String instanceName : liveInstances)
- {
- masterAssignmentMap.put(instanceName, new ArrayList<String>());
- }
- Set<String> orphanedPartitions = new HashSet<String>();
- orphanedPartitions.addAll(idealState.getPartitionSet());
- // Go through all current states and fill the assignments
- for (String liveInstanceName : liveInstances)
- {
- CurrentState currentState =
- cache.getCurrentState(liveInstanceName,
- cache.getLiveInstances()
- .get(liveInstanceName)
- .getSessionId()).get(idealState.getId());
- if (currentState != null)
- {
- Map<String, String> partitionStates = currentState.getPartitionStateMap();
- for (String partitionName : partitionStates.keySet())
- {
- String state = partitionStates.get(partitionName);
- if (state.equals(topStateValue))
- {
- masterAssignmentMap.get(liveInstanceName).add(partitionName);
- orphanedPartitions.remove(partitionName);
- }
- }
- }
- }
- List<String> orphanedPartitionsList = new ArrayList<String>();
- orphanedPartitionsList.addAll(orphanedPartitions);
- normalizeAssignmentMap(masterAssignmentMap, orphanedPartitionsList);
- idealState.getRecord()
- .setListFields(generateListFieldFromMasterAssignment(masterAssignmentMap,
- replicas));
-
- }
-
- /**
- * Given the current master assignment map and the partitions not hosted, generate an
- * evenly distributed partition assignment map
- *
- * @param masterAssignmentMap
- * current master assignment map
- * @param orphanPartitions
- * partitions not hosted by any instance
- * @return
- */
- private void normalizeAssignmentMap(Map<String, List<String>> masterAssignmentMap,
- List<String> orphanPartitions)
- {
- int totalPartitions = 0;
- String[] instanceNames = new String[masterAssignmentMap.size()];
- masterAssignmentMap.keySet().toArray(instanceNames);
- Arrays.sort(instanceNames);
- // Find out total partition number
- for (String key : masterAssignmentMap.keySet())
- {
- totalPartitions += masterAssignmentMap.get(key).size();
- Collections.sort(masterAssignmentMap.get(key));
- }
- totalPartitions += orphanPartitions.size();
-
- // Find out how many partitions an instance should host
- int partitionNumber = totalPartitions / masterAssignmentMap.size();
- int leave = totalPartitions % masterAssignmentMap.size();
-
- for (int i = 0; i < instanceNames.length; i++)
- {
- int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
- leave--;
- // For hosts that has more partitions, move those partitions to "orphaned"
- while (masterAssignmentMap.get(instanceNames[i]).size() > targetPartitionNo)
- {
- int lastElementIndex = masterAssignmentMap.get(instanceNames[i]).size() - 1;
- orphanPartitions.add(masterAssignmentMap.get(instanceNames[i])
- .get(lastElementIndex));
- masterAssignmentMap.get(instanceNames[i]).remove(lastElementIndex);
- }
- }
- leave = totalPartitions % masterAssignmentMap.size();
- Collections.sort(orphanPartitions);
- // Assign "orphaned" partitions to hosts that do not have enough partitions
- for (int i = 0; i < instanceNames.length; i++)
- {
- int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
- leave--;
- while (masterAssignmentMap.get(instanceNames[i]).size() < targetPartitionNo)
- {
- int lastElementIndex = orphanPartitions.size() - 1;
- masterAssignmentMap.get(instanceNames[i])
- .add(orphanPartitions.get(lastElementIndex));
- orphanPartitions.remove(lastElementIndex);
- }
- }
- if (orphanPartitions.size() > 0)
- {
- logger.error("orphanPartitions still contains elements");
- }
- }
-
- /**
- * Generate full preference list from the master assignment map evenly distribute the
- * slave partitions mastered on a host to other hosts
- *
- * @param masterAssignmentMap
- * current master assignment map
- * @param orphanPartitions
- * partitions not hosted by any instance
- * @return
- */
- Map<String, List<String>> generateListFieldFromMasterAssignment(Map<String, List<String>> masterAssignmentMap,
- int replicas)
- {
- Map<String, List<String>> listFields = new HashMap<String, List<String>>();
- int slaves = replicas - 1;
- String[] instanceNames = new String[masterAssignmentMap.size()];
- masterAssignmentMap.keySet().toArray(instanceNames);
- Arrays.sort(instanceNames);
-
- for (int i = 0; i < instanceNames.length; i++)
- {
- String instanceName = instanceNames[i];
- List<String> otherInstances = new ArrayList<String>(masterAssignmentMap.size() - 1);
- for (int x = 0; x < instanceNames.length - 1; x++)
- {
- int index = (x + i + 1) % instanceNames.length;
- otherInstances.add(instanceNames[index]);
- }
-
- List<String> partitionList = masterAssignmentMap.get(instanceName);
- for (int j = 0; j < partitionList.size(); j++)
- {
- String partitionName = partitionList.get(j);
- listFields.put(partitionName, new ArrayList<String>());
- listFields.get(partitionName).add(instanceName);
-
- int slavesCanAssign = Math.min(slaves, otherInstances.size());
- for (int k = 0; k < slavesCanAssign; k++)
- {
- int index = (j + k + 1) % otherInstances.size();
- listFields.get(partitionName).add(otherInstances.get(index));
- }
- }
- }
- return listFields;
- }
-
- /**
- * compute best state for resource in AUTO ideal state mode
- *
- * @param cache
- * @param stateModelDef
- * @param instancePreferenceList
- * @param currentStateMap
- * : instance->state for each partition
- * @param disabledInstancesForPartition
- * @return
- */
- private Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
- StateModelDefinition stateModelDef,
- List<String> instancePreferenceList,
- Map<String, String> currentStateMap,
- Set<String> disabledInstancesForPartition)
- {
- Map<String, String> instanceStateMap = new HashMap<String, String>();
-
- // if the ideal state is deleted, instancePreferenceList will be empty and
- // we should drop all resources.
- if (currentStateMap != null)
- {
- for (String instance : currentStateMap.keySet())
- {
- if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
- && !"ERROR".equals(currentStateMap.get(instance)))
- {
- // move to DROPPED state only if not in ERROR state
- instanceStateMap.put(instance, "DROPPED");
- }
- else if (!"ERROR".equals(currentStateMap.get(instance))
- && disabledInstancesForPartition.contains(instance))
- {
- // if a non-error node is disabled, put it into initial state (OFFLINE)
- instanceStateMap.put(instance, stateModelDef.getInitialState());
- }
- }
- }
-
- // ideal state is deleted
- if (instancePreferenceList == null)
- {
- return instanceStateMap;
- }
-
- List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
- boolean assigned[] = new boolean[instancePreferenceList.size()];
-
- Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
-
- for (String state : statesPriorityList)
- {
- String num = stateModelDef.getNumInstancesPerState(state);
- int stateCount = -1;
- if ("N".equals(num))
- {
- Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
- liveAndEnabled.removeAll(disabledInstancesForPartition);
- stateCount = liveAndEnabled.size();
- }
- else if ("R".equals(num))
- {
- stateCount = instancePreferenceList.size();
- }
- else
- {
- try
- {
- stateCount = Integer.parseInt(num);
- }
- catch (Exception e)
- {
- logger.error("Invalid count for state:" + state + " ,count=" + num);
- }
- }
- if (stateCount > -1)
- {
- int count = 0;
- for (int i = 0; i < instancePreferenceList.size(); i++)
- {
- String instanceName = instancePreferenceList.get(i);
-
- boolean notInErrorState =
- currentStateMap == null
- || !"ERROR".equals(currentStateMap.get(instanceName));
-
- if (liveInstancesMap.containsKey(instanceName) && !assigned[i]
- && notInErrorState && !disabledInstancesForPartition.contains(instanceName))
- {
- instanceStateMap.put(instanceName, state);
- count = count + 1;
- assigned[i] = true;
- if (count == stateCount)
- {
- break;
- }
- }
- }
- }
- }
- return instanceStateMap;
- }
-
- /**
- * compute best state for resource in CUSTOMIZED ideal state mode
- *
- * @param cache
- * @param stateModelDef
- * @param idealStateMap
- * @param currentStateMap
- * @param disabledInstancesForPartition
- * @return
- */
- private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache cache,
- StateModelDefinition stateModelDef,
- Map<String, String> idealStateMap,
- Map<String, String> currentStateMap,
- Set<String> disabledInstancesForPartition)
- {
- Map<String, String> instanceStateMap = new HashMap<String, String>();
-
- // if the ideal state is deleted, idealStateMap will be null/empty and
- // we should drop all resources.
- if (currentStateMap != null)
- {
- for (String instance : currentStateMap.keySet())
- {
- if ( (idealStateMap == null || !idealStateMap.containsKey(instance))
- && !"ERROR".equals(currentStateMap.get(instance)))
- {
- // move to DROPPED state only if not in ERROR state
- instanceStateMap.put(instance, "DROPPED");
- }
- else if (!"ERROR".equals(currentStateMap.get(instance))
- && disabledInstancesForPartition.contains(instance))
- {
- // if a non-error node is disabled, put it into initial state (OFFLINE)
- instanceStateMap.put(instance, stateModelDef.getInitialState());
- }
- }
- }
-
- // ideal state is deleted
- if (idealStateMap == null)
- {
- return instanceStateMap;
- }
-
- Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
- for (String instance : idealStateMap.keySet())
- {
- boolean notInErrorState =
- currentStateMap == null || !"ERROR".equals(currentStateMap.get(instance));
-
- if (liveInstancesMap.containsKey(instance) && notInErrorState
- && !disabledInstancesForPartition.contains(instance))
- {
- instanceStateMap.put(instance, idealStateMap.get(instance));
- }
- }
-
- return instanceStateMap;
- }
-
- private List<String> getPreferenceList(ClusterDataCache cache,
- Partition resource,
- IdealState idealState,
- StateModelDefinition stateModelDef)
- {
- List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
-
- if (listField != null && listField.size() == 1
- && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0)))
- {
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
- List<String> prefList = new ArrayList<String>(liveInstances.keySet());
- Collections.sort(prefList);
- return prefList;
- }
- else
- {
- return listField;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateOutput.java
deleted file mode 100644
index 1334b47..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/BestPossibleStateOutput.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.linkedin.helix.model.Partition;
-
-public class BestPossibleStateOutput
-{
- // resource->partition->instance->state
- Map<String, Map<Partition, Map<String, String>>> _dataMap;
-
- public BestPossibleStateOutput()
- {
- _dataMap = new HashMap<String, Map<Partition, Map<String, String>>>();
- }
-
- public void setState(String resourceName, Partition resource,
- Map<String, String> bestInstanceStateMappingForResource)
- {
- if (!_dataMap.containsKey(resourceName))
- {
- _dataMap.put(resourceName,
- new HashMap<Partition, Map<String, String>>());
- }
- Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
- map.put(resource, bestInstanceStateMappingForResource);
- }
-
- public Map<String, String> getInstanceStateMap(String resourceName,
- Partition resource)
- {
- Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
- if (map != null)
- {
- return map.get(resource);
- }
- return Collections.emptyMap();
- }
-
- public Map<Partition, Map<String, String>> getResourceMap(String resourceName)
- {
- Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
- if (map != null)
- {
- return map;
- }
- return Collections.emptyMap();
- }
-
- @Override
- public String toString()
- {
- return _dataMap.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterDataCache.java
deleted file mode 100644
index 34e06e6..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterDataCache.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixConstants.StateModelToken;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.model.ClusterConstraints;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintType;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.StateModelDefinition;
-
-/**
- * Reads the data from the cluster using data accessor. This output ClusterData which
- * provides useful methods to search/lookup properties
- *
- * @author kgopalak
- *
- */
-public class ClusterDataCache
-{
-
- Map<String, LiveInstance> _liveInstanceMap;
- Map<String, IdealState> _idealStateMap;
- Map<String, StateModelDefinition> _stateModelDefMap;
- Map<String, InstanceConfig> _instanceConfigMap;
- Map<String, ClusterConstraints> _constraintMap;
- Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
- Map<String, Map<String, Message>> _messageMap;
-
- // Map<String, Map<String, HealthStat>> _healthStatMap;
- // private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
- // private PersistentStats _persistentStats;
- // private Alerts _alerts;
- // private AlertStatus _alertStatus;
-
- private static final Logger LOG =
- Logger.getLogger(ClusterDataCache.class.getName());
-
- public boolean refresh(HelixDataAccessor accessor)
- {
- Builder keyBuilder = accessor.keyBuilder();
- _idealStateMap = accessor.getChildValuesMap(keyBuilder.idealStates());
- _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
-
- for (LiveInstance instance : _liveInstanceMap.values())
- {
- LOG.trace("live instance: " + instance.getInstanceName() + " "
- + instance.getSessionId());
- }
-
- _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
- _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
- _constraintMap =
- accessor.getChildValuesMap(keyBuilder.constraints());
-
- Map<String, Map<String, Message>> msgMap =
- new HashMap<String, Map<String, Message>>();
- for (String instanceName : _liveInstanceMap.keySet())
- {
- Map<String, Message> map =
- accessor.getChildValuesMap(keyBuilder.messages(instanceName));
- msgMap.put(instanceName, map);
- }
- _messageMap = Collections.unmodifiableMap(msgMap);
-
- Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap =
- new HashMap<String, Map<String, Map<String, CurrentState>>>();
- for (String instanceName : _liveInstanceMap.keySet())
- {
- LiveInstance liveInstance = _liveInstanceMap.get(instanceName);
- String sessionId = liveInstance.getSessionId();
- if (!allCurStateMap.containsKey(instanceName))
- {
- allCurStateMap.put(instanceName, new HashMap<String, Map<String, CurrentState>>());
- }
- Map<String, Map<String, CurrentState>> curStateMap =
- allCurStateMap.get(instanceName);
- Map<String, CurrentState> map =
- accessor.getChildValuesMap(keyBuilder.currentStates(instanceName, sessionId));
- curStateMap.put(sessionId, map);
- }
-
- for (String instance : allCurStateMap.keySet())
- {
- allCurStateMap.put(instance,
- Collections.unmodifiableMap(allCurStateMap.get(instance)));
- }
- _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
-
- return true;
- }
-
- public Map<String, IdealState> getIdealStates()
- {
- return _idealStateMap;
- }
-
- public Map<String, LiveInstance> getLiveInstances()
- {
- return _liveInstanceMap;
- }
-
- public Map<String, CurrentState> getCurrentState(String instanceName,
- String clientSessionId)
- {
- return _currentStateMap.get(instanceName).get(clientSessionId);
- }
-
- public Map<String, Message> getMessages(String instanceName)
- {
- Map<String, Message> map = _messageMap.get(instanceName);
- if (map != null)
- {
- return map;
- }
- else
- {
- return Collections.emptyMap();
- }
- }
-
- // public HealthStat getGlobalStats()
- // {
- // return _globalStats;
- // }
- //
- // public PersistentStats getPersistentStats()
- // {
- // return _persistentStats;
- // }
- //
- // public Alerts getAlerts()
- // {
- // return _alerts;
- // }
- //
- // public AlertStatus getAlertStatus()
- // {
- // return _alertStatus;
- // }
- //
- // public Map<String, HealthStat> getHealthStats(String instanceName)
- // {
- // Map<String, HealthStat> map = _healthStatMap.get(instanceName);
- // if (map != null)
- // {
- // return map;
- // } else
- // {
- // return Collections.emptyMap();
- // }
- // }
-
- public StateModelDefinition getStateModelDef(String stateModelDefRef)
- {
-
- return _stateModelDefMap.get(stateModelDefRef);
- }
-
- public IdealState getIdealState(String resourceName)
- {
- return _idealStateMap.get(resourceName);
- }
-
- public Map<String, InstanceConfig> getInstanceConfigMap()
- {
- return _instanceConfigMap;
- }
-
- public Set<String> getDisabledInstancesForPartition(String partition)
- {
- Set<String> disabledInstancesSet = new HashSet<String>();
- for (String instance : _instanceConfigMap.keySet())
- {
- InstanceConfig config = _instanceConfigMap.get(instance);
- if (config.getInstanceEnabled() == false
- || config.getInstanceEnabledForPartition(partition) == false)
- {
- disabledInstancesSet.add(instance);
- }
- }
- return disabledInstancesSet;
- }
-
- public int getReplicas(String resourceName)
- {
- int replicas = -1;
-
- if (_idealStateMap.containsKey(resourceName))
- {
- String replicasStr = _idealStateMap.get(resourceName).getReplicas();
-
- if (replicasStr != null)
- {
- if (replicasStr.equals(StateModelToken.ANY_LIVEINSTANCE.toString()))
- {
- replicas = _liveInstanceMap.size();
- }
- else
- {
- try
- {
- replicas = Integer.parseInt(replicasStr);
- }
- catch (Exception e)
- {
- LOG.error("invalid replicas string: " + replicasStr);
- }
- }
- }
- else
- {
- LOG.error("idealState for resource: " + resourceName + " does NOT have replicas");
- }
- }
- return replicas;
- }
-
- public ClusterConstraints getConstraint(ConstraintType type)
- {
- if (_constraintMap != null)
- {
- return _constraintMap.get(type.toString());
- }
- return null;
- }
-
- @Override
- public String toString()
- {
- StringBuilder sb = new StringBuilder();
- sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n");
- sb.append("idealStateMap:" + _idealStateMap).append("\n");
- sb.append("stateModelDefMap:" + _stateModelDefMap).append("\n");
- sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
- sb.append("messageMap:" + _messageMap).append("\n");
-
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterEvent.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterEvent.java
deleted file mode 100644
index 3a3049a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ClusterEvent.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class ClusterEvent
-{
- private static final Logger logger = Logger.getLogger(ClusterEvent.class
- .getName());
- private final String _eventName;
- private final Map<String, Object> _eventAttributeMap;
-
- public ClusterEvent(String name)
- {
- _eventName = name;
- _eventAttributeMap = new HashMap<String, Object>();
- }
-
- public void addAttribute(String attrName, Object attrValue)
- {
- if (logger.isTraceEnabled())
- {
- logger.trace("Adding attribute:" + attrName);
- logger.trace(" attribute value:" + attrValue);
- }
-
- _eventAttributeMap.put(attrName, attrValue);
- }
-
- public String getName()
- {
- return _eventName;
- }
-
- @SuppressWarnings("unchecked")
- public <T extends Object> T getAttribute(String attrName)
- {
- Object ret = _eventAttributeMap.get(attrName);
- if (ret != null)
- {
- return (T) ret;
- }
- return null;
- }
-
- @Override
- public String toString()
- {
- StringBuilder sb = new StringBuilder();
- sb.append("name:"+ _eventName).append("\n");
- for(String key:_eventAttributeMap.keySet()){
- sb.append(key).append(":").append(_eventAttributeMap.get(key)).append("\n");
- }
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/CompatibilityCheckStage.java
deleted file mode 100644
index bf350b1..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/CompatibilityCheckStage.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.LiveInstance;
-
-public class CompatibilityCheckStage extends AbstractBaseStage
-{
- private static final Logger LOG = Logger
- .getLogger(CompatibilityCheckStage.class.getName());
-
- /**
- * INCOMPATIBLE_MAP stores primary version pairs:
- * {controllerPrimaryVersion, participantPrimaryVersion}
- * that are incompatible
- */
- private static final Map<String, Boolean> INCOMPATIBLE_MAP;
- static
- {
- Map<String, Boolean> map = new HashMap<String, Boolean>();
- /**
- * {controllerPrimaryVersion,participantPrimaryVersion} -> false
- */
- map.put("0.4,0.3", false);
- INCOMPATIBLE_MAP = Collections.unmodifiableMap(map);
- }
-
- private String getPrimaryVersion(String version)
- {
- String[] splits = version.split("\\.");
- if (splits == null || splits.length != 3)
- {
- return null;
- }
- return version.substring(0, version.lastIndexOf('.'));
- }
-
- private boolean isCompatible(String controllerVersion, String participantVersion)
- {
- if (participantVersion == null)
- {
- LOG.warn("Missing version of participant. Skip version check.");
- return true;
- }
-
- // compare primary version
- String controllerPrimaryVersion = getPrimaryVersion(controllerVersion);
- String participantPrimaryVersion = getPrimaryVersion(participantVersion);
- if (controllerPrimaryVersion != null && participantPrimaryVersion != null)
- {
- if (controllerPrimaryVersion.compareTo(participantPrimaryVersion) < 0)
- {
- LOG.info("Controller primary version is less than participant primary version.");
- return false;
- }
- else
- {
- if (INCOMPATIBLE_MAP.containsKey(controllerPrimaryVersion + "," + participantPrimaryVersion))
- {
- return false;
- }
- }
- }
- return true;
- }
-
- @Override
- public void process(ClusterEvent event) throws Exception
- {
- HelixManager manager = event.getAttribute("helixmanager");
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- if (manager == null || cache == null)
- {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires HelixManager | DataCache");
- }
-
- String controllerVersion = manager.getVersion();
- if (controllerVersion == null)
- {
- String errorMsg = "Missing version of controller: " + manager.getInstanceName()
- + ". Pipeline will not continue.";
- LOG.error(errorMsg);
- throw new StageException(errorMsg);
- }
-
- Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
- for (LiveInstance liveInstance : liveInstanceMap.values())
- {
- String participantVersion = liveInstance.getHelixVersion();
- if (!isCompatible(controllerVersion, participantVersion))
- {
- String errorMsg = "cluster manager versions are incompatible; pipeline will not continue. "
- + "controller:" + manager.getInstanceName() + ", controllerVersion:" + controllerVersion
- + "; participant:" + liveInstance.getInstanceName() + ", participantVersion:" + participantVersion;
- LOG.error(errorMsg);
- throw new StageException(errorMsg);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateComputationStage.java
deleted file mode 100644
index 833caba..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateComputationStage.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.List;
-import java.util.Map;
-
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-
-/**
- * For each LiveInstances select currentState and message whose sessionId matches
- * sessionId from LiveInstance Get Partition,State for all the resources computed in
- * previous State [ResourceComputationStage]
- *
- * @author kgopalak
- *
- */
-public class CurrentStateComputationStage extends AbstractBaseStage
-{
- @Override
- public void process(ClusterEvent event) throws Exception
- {
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- Map<String, Resource> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
-
- if (cache == null || resourceMap == null)
- {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires DataCache|RESOURCE");
- }
-
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
- CurrentStateOutput currentStateOutput = new CurrentStateOutput();
-
- for (LiveInstance instance : liveInstances.values())
- {
- String instanceName = instance.getInstanceName();
- Map<String, Message> instanceMessages = cache.getMessages(instanceName);
- for (Message message : instanceMessages.values())
- {
- if (!MessageType.STATE_TRANSITION.toString()
- .equalsIgnoreCase(message.getMsgType()))
- {
- continue;
- }
- if (!instance.getSessionId().equals(message.getTgtSessionId()))
- {
- continue;
- }
- String resourceName = message.getResourceName();
- Resource resource = resourceMap.get(resourceName);
- if (resource == null)
- {
- continue;
- }
-
- if (!message.getGroupMessageMode())
- {
- String partitionName = message.getPartitionName();
- Partition partition = resource.getPartition(partitionName);
- if (partition != null)
- {
- currentStateOutput.setPendingState(resourceName,
- partition,
- instanceName,
- message.getToState());
- }
- else
- {
- // log
- }
- }
- else
- {
- List<String> partitionNames = message.getPartitionNames();
- if (!partitionNames.isEmpty())
- {
- for (String partitionName : partitionNames)
- {
- Partition partition = resource.getPartition(partitionName);
- if (partition != null)
- {
- currentStateOutput.setPendingState(resourceName,
- partition,
- instanceName,
- message.getToState());
- }
- else
- {
- // log
- }
- }
- }
- }
- }
- }
- for (LiveInstance instance : liveInstances.values())
- {
- String instanceName = instance.getInstanceName();
-
- String clientSessionId = instance.getSessionId();
- Map<String, CurrentState> currentStateMap =
- cache.getCurrentState(instanceName, clientSessionId);
- for (CurrentState currentState : currentStateMap.values())
- {
-
- if (!instance.getSessionId().equals(currentState.getSessionId()))
- {
- continue;
- }
- String resourceName = currentState.getResourceName();
- String stateModelDefName = currentState.getStateModelDefRef();
- Resource resource = resourceMap.get(resourceName);
- if (resource == null)
- {
- continue;
- }
- if (stateModelDefName != null)
- {
- currentStateOutput.setResourceStateModelDef(resourceName, stateModelDefName);
- }
-
- currentStateOutput.setBucketSize(resourceName, currentState.getBucketSize());
-
- Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
- for (String partitionName : partitionStateMap.keySet())
- {
- Partition partition = resource.getPartition(partitionName);
- if (partition != null)
- {
- currentStateOutput.setCurrentState(resourceName,
- partition,
- instanceName,
- currentState.getState(partitionName));
-
- }
- else
- {
- // log
- }
- }
- }
- }
- event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateOutput.java
deleted file mode 100644
index 9f531ba..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/CurrentStateOutput.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.Partition;
-
-public class CurrentStateOutput
-{
- private final Map<String, Map<Partition, Map<String, String>>> _currentStateMap;
- private final Map<String, Map<Partition, Map<String, String>>> _pendingStateMap;
- private final Map<String, String> _resourceStateModelMap;
- private final Map<String, CurrentState> _curStateMetaMap;
-
- public CurrentStateOutput()
- {
- _currentStateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
- _pendingStateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
- _resourceStateModelMap = new HashMap<String, String>();
- _curStateMetaMap = new HashMap<String, CurrentState>();
-
- }
-
- public void setResourceStateModelDef(String resourceName, String stateModelDefName)
- {
- _resourceStateModelMap.put(resourceName, stateModelDefName);
- }
-
- public String getResourceStateModelDef(String resourceName)
- {
- return _resourceStateModelMap.get(resourceName);
- }
-
- public void setBucketSize(String resource, int bucketSize)
- {
- CurrentState curStateMeta = _curStateMetaMap.get(resource);
- if (curStateMeta == null)
- {
- curStateMeta = new CurrentState(resource);
- _curStateMetaMap.put(resource, curStateMeta);
- }
- curStateMeta.setBucketSize(bucketSize);
- }
-
- public int getBucketSize(String resource)
- {
- int bucketSize = 0;
- CurrentState curStateMeta = _curStateMetaMap.get(resource);
- if (curStateMeta != null)
- {
- bucketSize = curStateMeta.getBucketSize();
- }
-
- return bucketSize;
- }
-
- public void setCurrentState(String resourceName,
- Partition partition,
- String instanceName,
- String state)
- {
- if (!_currentStateMap.containsKey(resourceName))
- {
- _currentStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
- }
- if (!_currentStateMap.get(resourceName).containsKey(partition))
- {
- _currentStateMap.get(resourceName).put(partition, new HashMap<String, String>());
- }
- _currentStateMap.get(resourceName).get(partition).put(instanceName, state);
- }
-
- public void setPendingState(String resourceName,
- Partition partition,
- String instanceName,
- String state)
- {
- if (!_pendingStateMap.containsKey(resourceName))
- {
- _pendingStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
- }
- if (!_pendingStateMap.get(resourceName).containsKey(partition))
- {
- _pendingStateMap.get(resourceName).put(partition, new HashMap<String, String>());
- }
- _pendingStateMap.get(resourceName).get(partition).put(instanceName, state);
- }
-
- /**
- * given (resource, partition, instance), returns currentState
- *
- * @param resourceName
- * @param partition
- * @param instanceName
- * @return
- */
- public String getCurrentState(String resourceName,
- Partition partition,
- String instanceName)
- {
- Map<Partition, Map<String, String>> map = _currentStateMap.get(resourceName);
- if (map != null)
- {
- Map<String, String> instanceStateMap = map.get(partition);
- if (instanceStateMap != null)
- {
- return instanceStateMap.get(instanceName);
- }
- }
- return null;
- }
-
- /**
- * given (resource, partition, instance), returns toState
- *
- * @param resourceName
- * @param partition
- * @param instanceName
- * @return
- */
- public String getPendingState(String resourceName,
- Partition partition,
- String instanceName)
- {
- Map<Partition, Map<String, String>> map = _pendingStateMap.get(resourceName);
- if (map != null)
- {
- Map<String, String> instanceStateMap = map.get(partition);
- if (instanceStateMap != null)
- {
- return instanceStateMap.get(instanceName);
- }
- }
- return null;
- }
-
- /**
- * given (resource, partition), returns (instance->currentState) map
- *
- * @param resourceName
- * @param partition
- * @return
- */
- public Map<String, String> getCurrentStateMap(String resourceName, Partition partition)
- {
- if (_currentStateMap.containsKey(resourceName))
- {
- Map<Partition, Map<String, String>> map = _currentStateMap.get(resourceName);
- if (map.containsKey(partition))
- {
- return map.get(partition);
- }
- }
- return Collections.emptyMap();
- }
-
- /**
- * given (resource, partition), returns (instance->toState) map
- *
- * @param resourceName
- * @param partition
- * @return
- */
- public Map<String, String> getPendingStateMap(String resourceName, Partition partition)
- {
- if (_pendingStateMap.containsKey(resourceName))
- {
- Map<Partition, Map<String, String>> map = _pendingStateMap.get(resourceName);
- if (map.containsKey(partition))
- {
- return map.get(partition);
- }
- }
- return Collections.emptyMap();
- }
-
- @Override
- public String toString()
- {
- StringBuilder sb = new StringBuilder();
- sb.append("current state= ").append(_currentStateMap);
- sb.append(", pending state= ").append(_pendingStateMap);
- return sb.toString();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/ExternalViewComputeStage.java
deleted file mode 100644
index 6bad69f..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ExternalViewComputeStage.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-import com.linkedin.helix.monitoring.mbeans.ClusterStatusMonitor;
-
-public class ExternalViewComputeStage extends AbstractBaseStage
-{
- private static Logger log = Logger.getLogger(ExternalViewComputeStage.class);
-
- @Override
- public void process(ClusterEvent event) throws Exception
- {
- long startTime = System.currentTimeMillis();
- log.info("START ExternalViewComputeStage.process()");
-
- HelixManager manager = event.getAttribute("helixmanager");
- Map<String, Resource> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-
- if (manager == null || resourceMap == null || cache == null)
- {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires ClusterManager|RESOURCES|DataCache");
- }
-
- HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
-
- CurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
-
- List<ExternalView> newExtViews = new ArrayList<ExternalView>();
- List<PropertyKey> keys = new ArrayList<PropertyKey>();
-
- for (String resourceName : resourceMap.keySet())
- {
- ExternalView view = new ExternalView(resourceName);
- view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
-
- Resource resource = resourceMap.get(resourceName);
- for (Partition partition : resource.getPartitions())
- {
- Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceName, partition);
- if (currentStateMap != null && currentStateMap.size() > 0)
- {
- // Set<String> disabledInstances
- // = cache.getDisabledInstancesForResource(resource.toString());
- for (String instance : currentStateMap.keySet())
- {
- // if (!disabledInstances.contains(instance))
- // {
- view.setState(partition.getPartitionName(),
- instance,
- currentStateMap.get(instance));
- // }
- }
- }
- }
- // Update cluster status monitor mbean
- ClusterStatusMonitor clusterStatusMonitor =
- (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
- if (clusterStatusMonitor != null)
- {
- clusterStatusMonitor.onExternalViewChange(view,
- cache._idealStateMap.get(view.getResourceName()));
- }
-
- // compare the new external view with current one, set only on different
- Map<String, ExternalView> curExtViews =
- dataAccessor.getChildValuesMap(manager.getHelixDataAccessor()
- .keyBuilder()
- .externalViews());
-
- ExternalView curExtView = curExtViews.get(resourceName);
- if (curExtView == null || !curExtView.getRecord().equals(view.getRecord()))
- {
- keys.add(manager.getHelixDataAccessor().keyBuilder().externalView(resourceName));
- newExtViews.add(view);
- // dataAccessor.setProperty(PropertyType.EXTERNALVIEW, view,
- // resourceName);
- }
- }
-
- if (newExtViews.size() > 0)
- {
- dataAccessor.setChildren(keys, newExtViews);
- }
-
- long endTime = System.currentTimeMillis();
- log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
- }
-
-}