You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[14/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/file/DynamicFileHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/file/DynamicFileHelixManager.java b/helix-core/src/main/java/com/linkedin/helix/manager/file/DynamicFileHelixManager.java
deleted file mode 100644
index 75a9967..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/file/DynamicFileHelixManager.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.file;
-
-import static com.linkedin.helix.HelixConstants.ChangeType.CURRENT_STATE;
-import static com.linkedin.helix.HelixConstants.ChangeType.IDEAL_STATE;
-import static com.linkedin.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-import com.linkedin.helix.ClusterMessagingService;
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigChangeListener;
-import com.linkedin.helix.ConfigScope.ConfigScopeProperty;
-import com.linkedin.helix.ControllerChangeListener;
-import com.linkedin.helix.CurrentStateChangeListener;
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.ExternalViewChangeListener;
-import com.linkedin.helix.HealthStateChangeListener;
-import com.linkedin.helix.HelixAdmin;
-import com.linkedin.helix.HelixConstants.ChangeType;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.IdealStateChangeListener;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.LiveInstanceChangeListener;
-import com.linkedin.helix.MessageListener;
-import com.linkedin.helix.PreConnectCallback;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.healthcheck.ParticipantHealthReportCollector;
-import com.linkedin.helix.messaging.DefaultMessagingService;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.participant.HelixStateMachineEngine;
-import com.linkedin.helix.participant.StateMachineEngine;
-import com.linkedin.helix.store.PropertyJsonComparator;
-import com.linkedin.helix.store.PropertyJsonSerializer;
-import com.linkedin.helix.store.PropertyStore;
-import com.linkedin.helix.store.file.FilePropertyStore;
-import com.linkedin.helix.store.zk.ZkHelixPropertyStore;
-import com.linkedin.helix.tools.PropertiesReader;
-import com.linkedin.helix.util.HelixUtil;
-
-@Deprecated
-public class DynamicFileHelixManager implements HelixManager
-{
- private static final Logger LOG = Logger.getLogger(StaticFileHelixManager.class.getName());
- private final FileDataAccessor _fileDataAccessor;
- private final FileHelixDataAccessor _accessor;
-
- private final String _clusterName;
- private final InstanceType _instanceType;
- private final String _instanceName;
- private boolean _isConnected;
- private final List<FileCallbackHandler> _handlers;
- private final FileHelixAdmin _mgmtTool;
-
- private final String _sessionId; // = "12345";
- public static final String configFile = "configFile";
- private final DefaultMessagingService _messagingService;
- private final FilePropertyStore<ZNRecord> _store;
- private final String _version;
- private final StateMachineEngine _stateMachEngine;
- private PropertyStore<ZNRecord> _propertyStore = null;
-
- public DynamicFileHelixManager(String clusterName, String instanceName,
- InstanceType instanceType, FilePropertyStore<ZNRecord> store)
- {
- _clusterName = clusterName;
- _instanceName = instanceName;
- _instanceType = instanceType;
-
- _handlers = new ArrayList<FileCallbackHandler>();
-
- _store = store;
- _fileDataAccessor = new FileDataAccessor(_store, clusterName);
- _accessor = new FileHelixDataAccessor(_store, clusterName);
-
- _mgmtTool = new FileHelixAdmin(_store);
- _messagingService = new DefaultMessagingService(this);
- _sessionId = UUID.randomUUID().toString();
- if (instanceType == InstanceType.PARTICIPANT)
- {
- addLiveInstance();
- addMessageListener(_messagingService.getExecutor(), _instanceName);
- }
-
- _version = new PropertiesReader("cluster-manager-version.properties")
- .getProperty("clustermanager.version");
-
- _stateMachEngine = new HelixStateMachineEngine(this);
-
- _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
- _stateMachEngine);
- }
-
- @Override
- public void disconnect()
- {
- _store.stop();
- _messagingService.getExecutor().shutDown();
-
- _isConnected = false;
- }
-
- @Override
- public void addIdealStateChangeListener(IdealStateChangeListener listener)
- {
- final String path = HelixUtil.getIdealStatePath(_clusterName);
-
- FileCallbackHandler callbackHandler = createCallBackHandler(path, listener, new EventType[] {
- EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated }, IDEAL_STATE);
- _handlers.add(callbackHandler);
-
- }
-
- @Override
- public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener)
- {
- final String path = HelixUtil.getLiveInstancesPath(_clusterName);
- FileCallbackHandler callbackHandler = createCallBackHandler(path, listener, new EventType[] {
- EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated },
- LIVE_INSTANCE);
- _handlers.add(callbackHandler);
- }
-
- @Override
- public void addConfigChangeListener(ConfigChangeListener listener)
- {
- throw new UnsupportedOperationException(
- "addConfigChangeListener() is NOT supported by File Based cluster manager");
- }
-
- @Override
- public void addMessageListener(MessageListener listener, String instanceName)
- {
- final String path = HelixUtil.getMessagePath(_clusterName, instanceName);
-
- FileCallbackHandler callbackHandler = createCallBackHandler(path, listener, new EventType[] {
- EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated },
- ChangeType.MESSAGE);
- _handlers.add(callbackHandler);
-
- }
-
- @Override
- public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
- String instanceName, String sessionId)
- {
- final String path = HelixUtil.getCurrentStateBasePath(_clusterName, instanceName) + "/"
- + sessionId;
-
- FileCallbackHandler callbackHandler = createCallBackHandler(path, listener, new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated },
- CURRENT_STATE);
-
- _handlers.add(callbackHandler);
- }
-
- @Override
- public void addExternalViewChangeListener(ExternalViewChangeListener listener)
- {
- throw new UnsupportedOperationException(
- "addExternalViewChangeListener() is NOT supported by File Based cluster manager");
- }
-
- @Override
- public DataAccessor getDataAccessor()
- {
- return _fileDataAccessor;
- }
-
- @Override
- public String getClusterName()
- {
- return _clusterName;
- }
-
- @Override
- public String getInstanceName()
- {
- return _instanceName;
- }
-
- @Override
- public void connect()
- {
- if (!isClusterSetup(_clusterName))
- {
- throw new HelixException("Initial cluster structure is not set up for cluster:"
- + _clusterName);
- }
- _messagingService.onConnected();
- _store.start();
- _isConnected = true;
- }
-
- @Override
- public String getSessionId()
- {
- return _sessionId;
- }
-
- @Override
- public boolean isConnected()
- {
- return _isConnected;
- }
-
- private boolean isClusterSetup(String clusterName)
- {
- if (clusterName == null || _store == null)
- {
- return false;
- }
-
- boolean isValid = _store.exists(PropertyPathConfig.getPath(PropertyType.IDEALSTATES,
- clusterName))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.CLUSTER.toString()))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.PARTICIPANT.toString()))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
- ConfigScopeProperty.RESOURCE.toString()))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.LIVEINSTANCES, clusterName))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.INSTANCES, clusterName))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.CONTROLLER, clusterName))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS, clusterName))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER,
- clusterName))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName));
-
- return isValid;
- }
-
- private boolean isInstanceSetup()
- {
- if (_instanceType == InstanceType.PARTICIPANT
- || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
- {
- boolean isValid = _store.exists(PropertyPathConfig.getPath(PropertyType.CONFIGS,
- _clusterName, ConfigScopeProperty.PARTICIPANT.toString(), _instanceName))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.MESSAGES, _clusterName,
- _instanceName))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName,
- _instanceName))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES, _clusterName,
- _instanceName))
- && _store.exists(PropertyPathConfig.getPath(PropertyType.ERRORS, _clusterName,
- _instanceName));
-
- return isValid;
- }
- return true;
- }
-
- private void addLiveInstance()
- {
- if (!isClusterSetup(_clusterName))
- {
- throw new HelixException("Initial cluster structure is not set up for cluster:"
- + _clusterName);
- }
-
- if (!isInstanceSetup())
- {
- throw new HelixException("Instance is not configured for instance:" + _instanceName
- + " instanceType:" + _instanceType);
- }
-
- LiveInstance liveInstance = new LiveInstance(_instanceName);
- liveInstance.setSessionId(_sessionId);
-// _fileDataAccessor.setProperty(PropertyType.LIVEINSTANCES, liveInstance.getRecord(),
-// _instanceName);
-
- Builder keyBuilder = _accessor.keyBuilder();
- _accessor.setProperty(keyBuilder.liveInstance(_instanceName), liveInstance);
-
- }
-
- @Override
- public long getLastNotificationTime()
- {
- return 0;
- }
-
- @Override
- public void addControllerListener(ControllerChangeListener listener)
- {
- throw new UnsupportedOperationException(
- "addControllerListener() is NOT supported by File Based cluster manager");
- }
-
- @Override
- public boolean removeListener(Object listener)
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- private FileCallbackHandler createCallBackHandler(String path, Object listener,
- EventType[] eventTypes, ChangeType changeType)
- {
- if (listener == null)
- {
- throw new HelixException("Listener cannot be null");
- }
- return new FileCallbackHandler(this, path, listener, eventTypes, changeType);
- }
-
- @Override
- public HelixAdmin getClusterManagmentTool()
- {
- return _mgmtTool;
- }
-
- private void checkConnected()
- {
- if (!isConnected())
- {
- throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
- }
- }
-
- @Override
- public PropertyStore<ZNRecord> getPropertyStore()
- {
- checkConnected();
-
- if (_propertyStore == null)
- {
- String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
-
- String propertyStoreRoot = _store.getPropertyRootNamespace() + path;
- _propertyStore =
- new FilePropertyStore<ZNRecord>(new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
- propertyStoreRoot,
- new PropertyJsonComparator<ZNRecord>(ZNRecord.class));
- }
- return _propertyStore;
- }
-
- @Override
- public ClusterMessagingService getMessagingService()
- {
- return _messagingService;
- }
-
- @Override
- public ParticipantHealthReportCollector getHealthReportCollector()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public InstanceType getInstanceType()
- {
- return _instanceType;
- }
-
- @Override
- public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
- throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public String getVersion()
- {
- return _version;
- }
-
- @Override
- public StateMachineEngine getStateMachineEngine()
- {
- return _stateMachEngine;
- }
-
- @Override
- public boolean isLeader()
- {
- if (_instanceType != InstanceType.CONTROLLER)
- {
- return false;
- }
-
- return true;
- }
-
- @Override
- public ConfigAccessor getConfigAccessor()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void startTimerTasks()
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void stopTimerTasks()
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public HelixDataAccessor getHelixDataAccessor()
- {
- return _accessor;
- }
-
- @Override
- public void addPreConnectCallback(PreConnectCallback callback)
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/file/FileCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/file/FileCallbackHandler.java b/helix-core/src/main/java/com/linkedin/helix/manager/file/FileCallbackHandler.java
deleted file mode 100644
index bab2c00..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/file/FileCallbackHandler.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.file;
-
-import static com.linkedin.helix.HelixConstants.ChangeType.CONFIG;
-import static com.linkedin.helix.HelixConstants.ChangeType.CURRENT_STATE;
-import static com.linkedin.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
-import static com.linkedin.helix.HelixConstants.ChangeType.IDEAL_STATE;
-import static com.linkedin.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
-import static com.linkedin.helix.HelixConstants.ChangeType.MESSAGE;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-import com.linkedin.helix.ConfigChangeListener;
-import com.linkedin.helix.ControllerChangeListener;
-import com.linkedin.helix.CurrentStateChangeListener;
-import com.linkedin.helix.ExternalViewChangeListener;
-import com.linkedin.helix.HelixConstants.ChangeType;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.IdealStateChangeListener;
-import com.linkedin.helix.LiveInstanceChangeListener;
-import com.linkedin.helix.MessageListener;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.store.PropertyChangeListener;
-import com.linkedin.helix.store.PropertyStoreException;
-import com.linkedin.helix.store.file.FilePropertyStore;
-import com.linkedin.helix.util.HelixUtil;
-
-// TODO remove code duplication: CallbackHandler and CallbackHandlerForFile
-@Deprecated
-public class FileCallbackHandler implements PropertyChangeListener<ZNRecord>
-{
-
- private static Logger LOG = Logger.getLogger(FileCallbackHandler.class);
-
- private final String _path;
- private final Object _listener;
- private final EventType[] _eventTypes;
- private final ChangeType _changeType;
-// private final FileDataAccessor _accessor;
- private final FileHelixDataAccessor _accessor;
- private final AtomicLong lastNotificationTimeStamp;
- private final HelixManager _manager;
- private final FilePropertyStore<ZNRecord> _store;
-
- public FileCallbackHandler(HelixManager manager, String path, Object listener,
- EventType[] eventTypes, ChangeType changeType)
- {
- this._manager = manager;
- this._accessor = (FileHelixDataAccessor) manager.getHelixDataAccessor();
- this._path = path;
- this._listener = listener;
- this._eventTypes = eventTypes;
- this._changeType = changeType;
- this._store = (FilePropertyStore<ZNRecord>) _accessor.getStore();
- lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
-
- init();
- }
-
- public Object getListener()
- {
- return _listener;
- }
-
- public Object getPath()
- {
- return _path;
- }
-
- public void invoke(NotificationContext changeContext) throws Exception
- {
- // This allows the listener to work with one change at a time
- synchronized (_listener)
- {
- if (LOG.isDebugEnabled())
- {
- LOG.debug(Thread.currentThread().getId() + " START:INVOKE "
- + changeContext.getPathChanged() + " listener:"
- + _listener.getClass().getCanonicalName());
- }
-
- Builder keyBuilder = _accessor.keyBuilder();
-
- if (_changeType == IDEAL_STATE)
- {
- // System.err.println("ideal state change");
- IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
- subscribeForChanges(changeContext, true, true);
- List<IdealState> idealStates = _accessor.getChildValues(keyBuilder.idealStates());
- idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
-
- } else if (_changeType == CONFIG)
- {
-
- ConfigChangeListener configChangeListener = (ConfigChangeListener) _listener;
- subscribeForChanges(changeContext, true, true);
- List<InstanceConfig> configs = _accessor.getChildValues(keyBuilder.instanceConfigs());
- configChangeListener.onConfigChange(configs, changeContext);
-
- } else if (_changeType == LIVE_INSTANCE)
- {
- LiveInstanceChangeListener liveInstanceChangeListener = (LiveInstanceChangeListener) _listener;
- subscribeForChanges(changeContext, true, false);
- List<LiveInstance> liveInstances = _accessor.getChildValues(keyBuilder.liveInstances());
- liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
-
- } else if (_changeType == CURRENT_STATE)
- {
- CurrentStateChangeListener currentStateChangeListener;
- currentStateChangeListener = (CurrentStateChangeListener) _listener;
- subscribeForChanges(changeContext, true, true);
- String instanceName = HelixUtil.getInstanceNameFromPath(_path);
- String[] pathParts = _path.split("/");
- List<CurrentState> currentStates = _accessor.getChildValues(keyBuilder.currentStates(instanceName, pathParts[pathParts.length - 1]));
- currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext);
-
- } else if (_changeType == MESSAGE)
- {
- MessageListener messageListener = (MessageListener) _listener;
- subscribeForChanges(changeContext, true, false);
- String instanceName = _manager.getInstanceName();
- List<Message> messages = _accessor.getChildValues(keyBuilder.messages(instanceName));
- messageListener.onMessage(instanceName, messages, changeContext);
- } else if (_changeType == EXTERNAL_VIEW)
- {
- ExternalViewChangeListener externalViewListener = (ExternalViewChangeListener) _listener;
- subscribeForChanges(changeContext, true, true);
- List<ExternalView> externalViewList = _accessor.getChildValues(keyBuilder.externalViews());
- externalViewListener.onExternalViewChange(externalViewList, changeContext);
- } else if (_changeType == ChangeType.CONTROLLER)
- {
- ControllerChangeListener controllerChangelistener = (ControllerChangeListener) _listener;
- subscribeForChanges(changeContext, true, false);
- controllerChangelistener.onControllerChange(changeContext);
- }
-
- if (LOG.isDebugEnabled())
- {
- LOG.debug(Thread.currentThread().getId() + " END:INVOKE " + changeContext.getPathChanged()
- + " listener:" + _listener.getClass().getCanonicalName());
- }
- }
- }
-
- private void subscribeForChanges(NotificationContext changeContext, boolean watchParent,
- boolean watchChild)
- {
- if (changeContext.getType() == NotificationContext.Type.INIT)
- {
- try
- {
- // _accessor.subscribeForPropertyChange(_path, this);
- _store.subscribeForPropertyChange(_path, this);
- } catch (PropertyStoreException e)
- {
- LOG.error("fail to subscribe for changes" + "\nexception:" + e);
- }
- }
- }
-
- public EventType[] getEventTypes()
- {
- return _eventTypes;
- }
-
- // this will invoke the listener so that it sets up the initial values from
- // the file property store if any exists
- public void init()
- {
- updateNotificationTime(System.nanoTime());
- try
- {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.INIT);
- invoke(changeContext);
- } catch (Exception e)
- {
- // TODO handle exception
- LOG.error("fail to init", e);
- }
- }
-
- public void reset()
- {
- try
- {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.FINALIZE);
- invoke(changeContext);
- } catch (Exception e)
- {
- // TODO handle exception
- LOG.error("fail to reset" + "\nexception:" + e);
- // ZKExceptionHandler.getInstance().handle(e);
- }
- }
-
- private void updateNotificationTime(long nanoTime)
- {
- long l = lastNotificationTimeStamp.get();
- while (nanoTime > l)
- {
- boolean b = lastNotificationTimeStamp.compareAndSet(l, nanoTime);
- if (b)
- {
- break;
- } else
- {
- l = lastNotificationTimeStamp.get();
- }
- }
- }
-
- @Override
- public void onPropertyChange(String key)
- {
- // debug
- // LOG.error("on property change, key:" + key + ", path:" + _path);
-
- try
- {
- if (needToNotify(key))
- {
- // debug
- // System.err.println("notified on property change, key:" + key +
- // ", path:" +
- // path);
-
- updateNotificationTime(System.nanoTime());
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.CALLBACK);
- invoke(changeContext);
- }
- } catch (Exception e)
- {
- // TODO handle exception
- // ZKExceptionHandler.getInstance().handle(e);
- LOG.error("fail onPropertyChange", e);
- }
- }
-
- private boolean needToNotify(String key)
- {
- boolean ret = false;
- switch (_changeType)
- {
- // both child/data changes matter
- case IDEAL_STATE:
- case CURRENT_STATE:
- case CONFIG:
- ret = key.startsWith(_path);
- break;
- // only child changes matter
- case LIVE_INSTANCE:
- case MESSAGE:
- case EXTERNAL_VIEW:
- case CONTROLLER:
- // ret = key.equals(_path);
- ret = key.startsWith(_path);
- break;
- default:
- break;
- }
-
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/file/FileDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/file/FileDataAccessor.java b/helix-core/src/main/java/com/linkedin/helix/manager/file/FileDataAccessor.java
deleted file mode 100644
index ce2f4e3..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/file/FileDataAccessor.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.file;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.store.PropertyStore;
-import com.linkedin.helix.store.PropertyStoreException;
-import com.linkedin.helix.store.file.FilePropertyStore;
-
-@Deprecated
-public class FileDataAccessor implements DataAccessor
-{
- private static Logger LOG = Logger.getLogger(FileDataAccessor.class);
- // store that is used by FileDataAccessor
- private final FilePropertyStore<ZNRecord> _store;
- private final String _clusterName;
- private final ReadWriteLock _readWriteLock = new ReentrantReadWriteLock();
-
- public FileDataAccessor(FilePropertyStore<ZNRecord> store, String clusterName)
- {
- _store = store;
- _clusterName = clusterName;
- }
-
- @Override
- public boolean setProperty(PropertyType type, HelixProperty value, String... keys)
- {
- return setProperty(type, value.getRecord(), keys);
- }
-
- @Override
- public boolean setProperty(PropertyType type, ZNRecord value, String... keys)
- {
- String path = PropertyPathConfig.getPath(type, _clusterName, keys);
-
- try
- {
- _readWriteLock.writeLock().lock();
- _store.setProperty(path, value);
- return true;
- }
- catch(PropertyStoreException e)
- {
- LOG.error("Fail to set cluster property clusterName: " + _clusterName +
- " type:" + type +
- " keys:" + keys + "\nexception: " + e);
- }
- finally
- {
- _readWriteLock.writeLock().unlock();
- }
- return false;
- }
-
- @Override
- public boolean updateProperty(PropertyType type, HelixProperty value, String... keys)
- {
- return updateProperty(type, value.getRecord(), keys);
- }
-
- @Override
- public boolean updateProperty(PropertyType type, ZNRecord value, String... keys)
- {
- try
- {
- _readWriteLock.writeLock().lock();
- String path = PropertyPathConfig.getPath(type, _clusterName, keys);
- if (type.isUpdateOnlyOnExists())
- {
- updateIfExists(path, value, type.isMergeOnUpdate());
- }
- else
- {
- createOrUpdate(path, value, type.isMergeOnUpdate());
- }
- return true;
- }
- catch (PropertyStoreException e)
- {
- LOG.error("fail to update instance property, " +
- " type:" + type + " keys:" + keys, e);
- }
- finally
- {
- _readWriteLock.writeLock().unlock();
- }
- return false;
-
- }
-
- @Override
- public <T extends HelixProperty>
- T getProperty(Class<T> clazz, PropertyType type, String... keys)
- {
- ZNRecord record = getProperty(type, keys);
- if (record == null)
- {
- return null;
- }
- return HelixProperty.convertToTypedInstance(clazz, record);
- }
-
- @Override
- public ZNRecord getProperty(PropertyType type, String... keys)
- {
- String path = PropertyPathConfig.getPath(type, _clusterName, keys);
-
- try
- {
- _readWriteLock.readLock().lock();
- return _store.getProperty(path);
- }
- catch(PropertyStoreException e)
- {
- LOG.error("Fail to get cluster property clusterName: " + _clusterName +
- " type:" + type +
- " keys:" + keys, e);
- }
- finally
- {
- _readWriteLock.readLock().unlock();
- }
- return null;
- }
-
- @Override
- public boolean removeProperty(PropertyType type, String... keys)
- {
- String path = PropertyPathConfig.getPath(type, _clusterName, keys);
-
- try
- {
- _readWriteLock.writeLock().lock();
- _store.removeProperty(path);
- return true;
- }
- catch (PropertyStoreException e)
- {
- LOG.error("Fail to remove instance property, " +
- " type:" + type + " keys:" + keys, e);
- }
- finally
- {
- _readWriteLock.writeLock().unlock();
- }
-
- return false;
- }
-
- @Override
- public List<String> getChildNames(PropertyType type, String... keys)
- {
- String path = PropertyPathConfig.getPath(type, _clusterName, keys);
-
- try
- {
- _readWriteLock.readLock().lock();
-
- List<String> childs = _store.getPropertyNames(path);
- return childs;
- }
- catch(PropertyStoreException e)
- {
- LOG.error("Fail to get child names:" + _clusterName +
- " parentPath:" + path + "\nexception: " + e);
- }
- finally
- {
- _readWriteLock.readLock().unlock();
- }
-
- return Collections.emptyList();
- }
-
- @Override
- public <T extends HelixProperty>
- List<T> getChildValues(Class<T> clazz, PropertyType type, String... keys)
- {
- List<ZNRecord> list = getChildValues(type, keys);
- return HelixProperty.convertToTypedList(clazz, list);
- }
-
- @Override
- public List<ZNRecord> getChildValues(PropertyType type, String... keys)
- {
- String path = PropertyPathConfig.getPath(type, _clusterName, keys);
- List<ZNRecord> records = new ArrayList<ZNRecord>();
- try
- {
- _readWriteLock.readLock().lock();
-
- List<String> childs = _store.getPropertyNames(path);
- if (childs == null || childs.size() == 0)
- {
- return Collections.emptyList();
- }
-
- for (String child : childs)
- {
- ZNRecord record = _store.getProperty(child);
- if (record != null)
- {
- records.add(record);
- }
- }
- }
- catch(PropertyStoreException e)
- {
- LOG.error("Fail to get child properties cluster:" + _clusterName +
- " parentPath:" + path + "\nexception: " + e);
- }
- finally
- {
- _readWriteLock.readLock().unlock();
- }
- return records;
- }
-
- // HACK remove it later
- public PropertyStore<ZNRecord> getStore()
- {
- return _store;
- }
-
- private void updateIfExists(String path, final ZNRecord record, boolean mergeOnUpdate)
- throws PropertyStoreException
- {
- if (_store.exists(path))
- {
- _store.setProperty(path, record);
- }
- }
-
- private void createOrUpdate(String path, final ZNRecord record, final boolean mergeOnUpdate)
- throws PropertyStoreException
- {
- final int RETRYLIMIT = 3;
- int retryCount = 0;
- while (retryCount < RETRYLIMIT)
- {
- try
- {
- if (_store.exists(path))
- {
- DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
- {
- @Override
- public ZNRecord update(ZNRecord currentData)
- {
- if(mergeOnUpdate)
- {
- currentData.merge(record);
- return currentData;
- }
- return record;
- }
- };
- _store.updatePropertyUntilSucceed(path, updater);
-
- }
- else
- {
- if(record.getDeltaList().size() > 0)
- {
- ZNRecord newRecord = new ZNRecord(record.getId());
- newRecord.merge(record);
- _store.setProperty(path, newRecord);
- }
- else
- {
- _store.setProperty(path, record);
- }
- }
- break;
- }
- catch (Exception e)
- {
- retryCount = retryCount + 1;
- LOG.warn("Exception trying to update " + path + " Exception:"
- + e.getMessage() + ". Will retry.");
- }
- }
- }
-
- @Override
- public <T extends HelixProperty> Map<String, T> getChildValuesMap(Class<T> clazz,
- PropertyType type, String... keys)
- {
- List<T> list = getChildValues(clazz, type, keys);
- return HelixProperty.convertListToMap(list);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/file/FileHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/file/FileHelixAdmin.java b/helix-core/src/main/java/com/linkedin/helix/manager/file/FileHelixAdmin.java
deleted file mode 100644
index 8882283..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/file/FileHelixAdmin.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.file;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScope.ConfigScopeProperty;
-import com.linkedin.helix.HelixAdmin;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.IdealState.IdealStateModeProperty;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.model.StateModelDefinition;
-import com.linkedin.helix.store.PropertyStoreException;
-import com.linkedin.helix.store.file.FilePropertyStore;
-import com.linkedin.helix.tools.StateModelConfigGenerator;
-import com.linkedin.helix.util.HelixUtil;
-
-@Deprecated
-public class FileHelixAdmin implements HelixAdmin
-{
- private static Logger logger =
- Logger.getLogger(FileHelixAdmin.class);
- private final FilePropertyStore<ZNRecord> _store;
-
- public FileHelixAdmin(FilePropertyStore<ZNRecord> store)
- {
- _store = store;
- }
-
- @Override
- public List<String> getClusters()
- {
- throw new UnsupportedOperationException("getClusters() is NOT supported by FileClusterManagementTool");
-
- }
-
- @Override
- public List<String> getInstancesInCluster(String clusterName)
- {
- // String path = HelixUtil.getConfigPath(clusterName);
- String path =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.PARTICIPANT.toString());
-
- List<String> childs = null;
- List<String> instanceNames = new ArrayList<String>();
- try
- {
- childs = _store.getPropertyNames(path);
- for (String child : childs)
- {
- // strip config path from instanceName
- String instanceName = child.substring(child.lastIndexOf('/') + 1);
- instanceNames.add(instanceName);
- }
- return instanceNames;
- }
- catch (PropertyStoreException e)
- {
- logger.error("Fail to getInstancesInCluster, cluster " + clusterName, e);
- }
-
- return null;
- }
-
- @Override
- public List<String> getResourcesInCluster(String clusterName)
- {
- // TODO Auto-generated method stub
- // return null;
- throw new UnsupportedOperationException("getResourcesInCluster() is NOT supported by FileClusterManagementTool");
-
- }
-
- @Override
- public void addCluster(String clusterName, boolean overwritePrevRecord)
- {
- String path;
- try
- {
- _store.removeNamespace(clusterName);
- _store.createPropertyNamespace(clusterName);
- _store.createPropertyNamespace(HelixUtil.getIdealStatePath(clusterName));
-
- // CONFIG's
- // _store.createPropertyNamespace(HelixUtil.getConfigPath(clusterName));
- path =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.CLUSTER.toString(),
- clusterName);
- _store.setProperty(path, new ZNRecord(clusterName));
- path =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.PARTICIPANT.toString());
- _store.createPropertyNamespace(path);
- path =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.RESOURCE.toString());
- _store.createPropertyNamespace(path);
-
- // PROPERTY STORE
- path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
- _store.createPropertyNamespace(path);
-
- _store.createPropertyNamespace(HelixUtil.getLiveInstancesPath(clusterName));
- _store.createPropertyNamespace(HelixUtil.getMemberInstancesPath(clusterName));
- _store.createPropertyNamespace(HelixUtil.getExternalViewPath(clusterName));
- _store.createPropertyNamespace(HelixUtil.getStateModelDefinitionPath(clusterName));
-
- StateModelConfigGenerator generator = new StateModelConfigGenerator();
- addStateModelDef(clusterName,
- "MasterSlave",
- new StateModelDefinition(generator.generateConfigForMasterSlave()));
-
- // controller
- _store.createPropertyNamespace(HelixUtil.getControllerPath(clusterName));
- path = PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName);
- final ZNRecord emptyHistory = new ZNRecord(PropertyType.HISTORY.toString());
- final List<String> emptyList = new ArrayList<String>();
- emptyHistory.setListField(clusterName, emptyList);
- _store.setProperty(path, emptyHistory);
-
- path = PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName);
- _store.createPropertyNamespace(path);
-
- path =
- PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName);
- _store.createPropertyNamespace(path);
-
- path = PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName);
- _store.createPropertyNamespace(path);
-
- }
- catch (PropertyStoreException e)
- {
- logger.error("Fail to add cluster " + clusterName, e);
- }
-
- }
-
- @Override
- public void addResource(String clusterName,
- String resource,
- int numResources,
- String stateModelRef)
- {
- String idealStatePath = HelixUtil.getIdealStatePath(clusterName);
- String resourceIdealStatePath = idealStatePath + "/" + resource;
-
- // if (_zkClient.exists(dbIdealStatePath))
- // {
- // logger.warn("Skip the operation. DB ideal state directory exists:"
- // + dbIdealStatePath);
- // return;
- // }
-
- IdealState idealState = new IdealState(resource);
- idealState.setNumPartitions(numResources);
- idealState.setStateModelDefRef(stateModelRef);
- idealState.setReplicas(Integer.toString(0));
- idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
- try
- {
- _store.setProperty(resourceIdealStatePath, idealState.getRecord());
- }
- catch (PropertyStoreException e)
- {
- logger.error("Fail to add resource, cluster:" + clusterName + " resourceName:"
- + resource, e);
- }
-
- }
-
- @Override
- public void addResource(String clusterName,
- String resource,
- int numResources,
- String stateModelRef,
- String idealStateMode)
- {
- throw new UnsupportedOperationException("ideal state mode not supported in file-based cluster manager");
- }
-
- @Override
- public void addResource(String clusterName,
- String resource,
- int numResources,
- String stateModelRef,
- String idealStateMode,
- int bucketSize)
- {
- throw new UnsupportedOperationException("bucketize not supported in file-based cluster manager");
- }
-
- @Override
- public void addInstance(String clusterName, InstanceConfig config)
- {
- String configsPath =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.PARTICIPANT.toString());
- String nodeId = config.getId();
- String nodeConfigPath = configsPath + "/" + nodeId;
-
- try
- {
- _store.setProperty(nodeConfigPath, config.getRecord());
- _store.createPropertyNamespace(HelixUtil.getMessagePath(clusterName, nodeId));
- _store.createPropertyNamespace(HelixUtil.getCurrentStateBasePath(clusterName,
- nodeId));
- _store.createPropertyNamespace(HelixUtil.getErrorsPath(clusterName, nodeId));
- _store.createPropertyNamespace(HelixUtil.getStatusUpdatesPath(clusterName, nodeId));
- }
- catch (Exception e)
- {
- logger.error("Fail to add node, cluster:" + clusterName + "\nexception: " + e);
- }
-
- }
-
- @Override
- public void dropInstance(String clusterName, InstanceConfig config)
- {
- String configsPath =
- PropertyPathConfig.getPath(PropertyType.CONFIGS,
- clusterName,
- ConfigScopeProperty.PARTICIPANT.toString());
- String nodeId = config.getId();
- String nodeConfigPath = configsPath + "/" + nodeId;
-
- try
- {
- _store.setProperty(nodeConfigPath, config.getRecord());
- }
- catch (Exception e)
- {
- logger.error("Fail to drop node, cluster:" + clusterName, e);
- }
- }
-
- @Override
- public IdealState getResourceIdealState(String clusterName, String resourceName)
- {
- return new FileDataAccessor(_store, clusterName).getProperty(IdealState.class,
- PropertyType.IDEALSTATES,
- resourceName);
- }
-
- @Override
- public void setResourceIdealState(String clusterName,
- String resourceName,
- IdealState idealState)
- {
- new FileDataAccessor(_store, clusterName).setProperty(PropertyType.IDEALSTATES,
- idealState,
- resourceName);
- }
-
- @Override
- public void enableInstance(String clusterName, String instanceName, boolean enabled)
- {
- throw new UnsupportedOperationException("enableInstance() is NOT supported by FileClusterManagementTool");
- }
-
- @Override
- public void enableCluster(String clusterName, boolean enabled)
- {
- throw new UnsupportedOperationException("enableCluster() is NOT supported by FileClusterManagementTool");
- }
-
- @Override
- public void addStateModelDef(String clusterName,
- String stateModelDef,
- StateModelDefinition stateModel)
- {
-
- String stateModelDefPath = HelixUtil.getStateModelDefinitionPath(clusterName);
- String stateModelPath = stateModelDefPath + "/" + stateModelDef;
-
- try
- {
- _store.setProperty(stateModelPath, stateModel.getRecord());
- }
- catch (PropertyStoreException e)
- {
- logger.error("Fail to addStateModelDef, cluster:" + clusterName + " stateModelDef:"
- + stateModelDef, e);
- }
-
- }
-
- @Override
- public void dropResource(String clusterName, String resourceName)
- {
- new FileDataAccessor(_store, clusterName).removeProperty(PropertyType.IDEALSTATES,
- resourceName);
-
- }
-
- @Override
- public List<String> getStateModelDefs(String clusterName)
- {
- throw new UnsupportedOperationException("getStateModelDefs() is NOT supported by FileClusterManagementTool");
- }
-
- @Override
- public InstanceConfig getInstanceConfig(String clusterName, String instanceName)
- {
- throw new UnsupportedOperationException("getInstanceConfig() is NOT supported by FileClusterManagementTool");
- }
-
- @Override
- public StateModelDefinition getStateModelDef(String clusterName, String stateModelName)
- {
- throw new UnsupportedOperationException("getStateModelDef() is NOT supported by FileClusterManagementTool");
- }
-
- @Override
- public ExternalView getResourceExternalView(String clusterName, String resource)
- {
- throw new UnsupportedOperationException("getResourceExternalView() is NOT supported by FileClusterManagementTool");
- }
-
- @Override
- public void enablePartition(boolean enabled,
- String clusterName,
- String instanceName,
- String resourceName,
- List<String> partitionNames)
- {
- throw new UnsupportedOperationException("enablePartition() is NOT supported by FileClusterManagementTool");
- }
-
- @Override
- public void resetPartition(String clusterName,
- String instanceName,
- String resourceName,
- List<String> partitionNames)
- {
- throw new UnsupportedOperationException("resetPartition() is NOT supported by FileClusterManagementTool");
- }
-
- @Override
- public void resetInstance(String clusterName, List<String> instanceNames)
- {
- throw new UnsupportedOperationException("resetInstance() is NOT supported by FileClusterManagementTool");
- }
-
- @Override
- public void resetResource(String clusterName, List<String> resourceNames)
- {
- throw new UnsupportedOperationException("resetResource() is NOT supported by FileClusterManagementTool");
- }
-
- @Override
- public void addStat(String clusterName, String statName)
- {
- throw new UnsupportedOperationException("addStat() is NOT supported by FileClusterManagementTool");
-
- }
-
- @Override
- public void addAlert(String clusterName, String alertName)
- {
- throw new UnsupportedOperationException("addAlert() is NOT supported by FileClusterManagementTool");
-
- }
-
- @Override
- public void dropStat(String clusterName, String statName)
- {
- throw new UnsupportedOperationException("dropStat() is NOT supported by FileClusterManagementTool");
-
- }
-
- @Override
- public void dropAlert(String clusterName, String alertName)
- {
- throw new UnsupportedOperationException("dropAlert() is NOT supported by FileClusterManagementTool");
-
- }
-
- @Override
- public void dropCluster(String clusterName)
- {
- throw new UnsupportedOperationException("dropCluster() is NOT supported by FileClusterManagementTool");
- }
-
- @Override
- public void addClusterToGrandCluster(String clusterName, String grandCluster)
- {
- throw new UnsupportedOperationException("addCluster(clusterName, overwritePrevRecord, grandCluster) is NOT supported by FileClusterManagementTool");
- }
-
- @Override
- public void setConfig(ConfigScope scope, Map<String, String> properties)
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("unsupported operation");
-
- }
-
- @Override
- public Map<String, String> getConfig(ConfigScope scope, Set<String> keys)
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("unsupported operation");
- }
-
- @Override
- public List<String> getConfigKeys(ConfigScopeProperty scope,
- String clusterName,
- String... keys)
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("unsupported operation");
- }
-
- @Override
- public void removeConfig(ConfigScope scope, Set<String> keys)
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("unsupported operation");
-
- }
-
- @Override
- public void rebalance(String clusterName, String resourceName, int replica)
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("unsupported operation");
- }
-
- @Override
- public void addIdealState(String clusterName, String resourceName, String idealStateFile) throws IOException
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("unsupported operation");
- }
-
- @Override
- public void addStateModelDef(String clusterName,
- String stateModelDefName,
- String stateModelDefFile) throws IOException
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("unsupported operation");
- }
-
- @Override
- public void addMessageConstraint(String clusterName,
- String constraintId,
- Map<String, String> constraints)
- {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("unsupported operation");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/file/FileHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/file/FileHelixDataAccessor.java b/helix-core/src/main/java/com/linkedin/helix/manager/file/FileHelixDataAccessor.java
deleted file mode 100644
index 9b985e2..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/file/FileHelixDataAccessor.java
+++ /dev/null
@@ -1,351 +0,0 @@
-package com.linkedin.helix.manager.file;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.store.PropertyStore;
-import com.linkedin.helix.store.PropertyStoreException;
-import com.linkedin.helix.store.file.FilePropertyStore;
-
-@Deprecated
-public class FileHelixDataAccessor implements HelixDataAccessor
-{
- private static Logger LOG = Logger.getLogger(FileHelixDataAccessor.class);
-
- private final FilePropertyStore<ZNRecord> _store;
- private final String _clusterName;
- private final ReadWriteLock _readWriteLock = new ReentrantReadWriteLock();
- private final Builder _propertyKeyBuilder;
-
-
- public FileHelixDataAccessor(FilePropertyStore<ZNRecord> store,
- String clusterName)
- {
- _store = store;
- _clusterName = clusterName;
- _propertyKeyBuilder = new PropertyKey.Builder(_clusterName);
- }
-
- @Override
- public boolean createProperty(PropertyKey key, HelixProperty value)
- {
- return updateProperty(key, value);
- }
-
- @Override
- public <T extends HelixProperty> boolean setProperty(PropertyKey key, T value)
- {
- String path = key.getPath();
- try
- {
- _readWriteLock.writeLock().lock();
- _store.setProperty(path, value.getRecord());
- return true;
- }
- catch(PropertyStoreException e)
- {
- LOG.error("Fail to set cluster property clusterName: " + _clusterName +
- " type:" + key.getType() +
- " keys:" + Arrays.toString(key.getParams()), e);
- return false;
- }
- finally
- {
- _readWriteLock.writeLock().unlock();
- }
- }
-
- @Override
- public <T extends HelixProperty> boolean updateProperty(PropertyKey key,
- T value)
- {
- PropertyType type = key.getType();
- String path = key.getPath();
-
- try
- {
- _readWriteLock.writeLock().lock();
-
- if (type.isUpdateOnlyOnExists())
- {
- updateIfExists(path, value.getRecord(), type.isMergeOnUpdate());
- }
- else
- {
- createOrUpdate(path, value.getRecord(), type.isMergeOnUpdate());
- }
- return true;
- }
- catch (PropertyStoreException e)
- {
- LOG.error("fail to update property. type:" +
- type + ", keys:" + Arrays.toString(key.getParams()), e);
- return false;
- }
- finally
- {
- _readWriteLock.writeLock().unlock();
- }
-
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T extends HelixProperty> T getProperty(PropertyKey key)
- {
- String path = key.getPath();
- try
- {
- _readWriteLock.readLock().lock();
- ZNRecord record = _store.getProperty(path);
- if (record == null)
- {
- return null;
- }
- return (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
- }
- catch(PropertyStoreException e)
- {
- LOG.error("Fail to get property. clusterName: " + _clusterName +
- " type:" + key.getType() +
- " keys:" + Arrays.toString(key.getParams()), e);
- return null;
- }
- finally
- {
- _readWriteLock.readLock().unlock();
- }
- }
-
- @Override
- public boolean removeProperty(PropertyKey key)
- {
- String path = key.getPath();;
-
- try
- {
- _readWriteLock.writeLock().lock();
- _store.removeProperty(path);
- return true;
- }
- catch (PropertyStoreException e)
- {
- LOG.error("Fail to remove property. type:" +
- key.getType() + ", keys:" + Arrays.toString(key.getParams()), e);
- return false;
- }
- finally
- {
- _readWriteLock.writeLock().unlock();
- }
- }
-
- @Override
- public List<String> getChildNames(PropertyKey key)
- {
- String path = key.getPath();;
-
- try
- {
- _readWriteLock.readLock().lock();
-
- List<String> childs = _store.getPropertyNames(path);
- return childs;
- }
- catch(PropertyStoreException e)
- {
- LOG.error("Fail to get child names. clusterName: " + _clusterName +
- ", parentPath:" + path, e);
-
- return Collections.emptyList();
- }
- finally
- {
- _readWriteLock.readLock().unlock();
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
- {
- String path = key.getPath();
- List<T> records = new ArrayList<T>();
- try
- {
- _readWriteLock.readLock().lock();
-
- List<String> childs = _store.getPropertyNames(path);
- if (childs == null || childs.size() == 0)
- {
- return Collections.emptyList();
- }
-
- for (String child : childs)
- {
- ZNRecord record = _store.getProperty(child);
- if (record != null)
- {
- records.add((T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record));
- }
- }
- }
- catch(PropertyStoreException e)
- {
- LOG.error("Fail to get child properties. clusterName:" + _clusterName +
- ", parentPath:" + path, e);
- }
- finally
- {
- _readWriteLock.readLock().unlock();
- }
-
- return records;
- }
-
- @Override
- public <T extends HelixProperty> Map<String, T> getChildValuesMap(
- PropertyKey key)
- {
- List<T> list = getChildValues(key);
- return HelixProperty.convertListToMap(list);
- }
-
- @Override
- public Builder keyBuilder()
- {
- return _propertyKeyBuilder;
- }
-
- @Override
- public <T extends HelixProperty> boolean[] createChildren(
- List<PropertyKey> keys, List<T> children)
- {
- boolean[] success = new boolean[keys.size()];
- for (int i = 0; i < keys.size(); i++)
- {
- success[i] = createProperty(keys.get(i), children.get(i));
- }
- return success;
- }
-
- @Override
- public <T extends HelixProperty> boolean[] setChildren(
- List<PropertyKey> keys, List<T> children)
- {
- boolean[] success = new boolean[keys.size()];
- for (int i = 0; i < keys.size(); i++)
- {
- success[i] = setProperty(keys.get(i), children.get(i));
- }
- return success;
- }
-
- @Override
- public BaseDataAccessor getBaseDataAccessor()
- {
- throw new UnsupportedOperationException("No BaseDataAccessor for FileHelixDataAccessor");
- }
-
- // HACK remove it later
- public PropertyStore<ZNRecord> getStore()
- {
- return _store;
- }
-
- private void createOrUpdate(String path, final ZNRecord record, final boolean mergeOnUpdate)
- throws PropertyStoreException
- {
- final int RETRYLIMIT = 3;
- int retryCount = 0;
- while (retryCount < RETRYLIMIT)
- {
- try
- {
- if (_store.exists(path))
- {
- DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
- {
- @Override
- public ZNRecord update(ZNRecord currentData)
- {
- if(mergeOnUpdate)
- {
- currentData.merge(record);
- return currentData;
- }
- return record;
- }
- };
- _store.updatePropertyUntilSucceed(path, updater);
-
- }
- else
- {
- if(record.getDeltaList().size() > 0)
- {
- ZNRecord newRecord = new ZNRecord(record.getId());
- newRecord.merge(record);
- _store.setProperty(path, newRecord);
- }
- else
- {
- _store.setProperty(path, record);
- }
- }
- break;
- }
- catch (Exception e)
- {
- retryCount = retryCount + 1;
- LOG.warn("Exception trying to update " + path + " Exception:"
- + e.getMessage() + ". Will retry.");
- }
- }
- }
-
- private void updateIfExists(String path, final ZNRecord record, boolean mergeOnUpdate)
- throws PropertyStoreException
- {
- if (_store.exists(path))
- {
- _store.setProperty(path, record);
- }
- }
-
- @Override
- public <T extends HelixProperty> boolean[] updateChildren(List<String> paths,
- List<DataUpdater<ZNRecord>> updaters, int options)
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys)
- {
- List<T> list = new ArrayList<T>();
- for (PropertyKey key : keys)
- {
- list.add((T)getProperty(key));
- }
- return list;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/file/StaticFileHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/file/StaticFileHelixManager.java b/helix-core/src/main/java/com/linkedin/helix/manager/file/StaticFileHelixManager.java
deleted file mode 100644
index 6a54212..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/file/StaticFileHelixManager.java
+++ /dev/null
@@ -1,569 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.file;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ClusterMessagingService;
-import com.linkedin.helix.ClusterView;
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigChangeListener;
-import com.linkedin.helix.ControllerChangeListener;
-import com.linkedin.helix.CurrentStateChangeListener;
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.ExternalViewChangeListener;
-import com.linkedin.helix.HealthStateChangeListener;
-import com.linkedin.helix.HelixAdmin;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.IdealStateChangeListener;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.LiveInstanceChangeListener;
-import com.linkedin.helix.MessageListener;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PreConnectCallback;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.healthcheck.ParticipantHealthReportCollector;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.InstanceConfig.InstanceConfigProperty;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.participant.StateMachineEngine;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-import com.linkedin.helix.store.PropertyStore;
-import com.linkedin.helix.store.zk.ZkHelixPropertyStore;
-import com.linkedin.helix.tools.ClusterViewSerializer;
-import com.linkedin.helix.tools.IdealStateCalculatorByShuffling;
-
-@Deprecated
-public class StaticFileHelixManager implements HelixManager
-{
- private static final Logger LOG = Logger.getLogger(StaticFileHelixManager.class.getName());
- // for backward compatibility
- // TODO remove it later
- private final ClusterView _clusterView;
- private final String _clusterName;
- private final InstanceType _instanceType;
- private final String _instanceName;
- private boolean _isConnected;
- public static final String _sessionId = "12345";
- public static final String configFile = "configFile";
-
- public StaticFileHelixManager(String clusterName, String instanceName, InstanceType instanceType,
- String clusterViewFile)
- {
- _clusterName = clusterName;
- _instanceName = instanceName;
- _instanceType = instanceType;
- _clusterView = ClusterViewSerializer.deserialize(new File(clusterViewFile));
- }
-
- // FIXIT
- // reorder the messages to reduce the possibility that a S->M message for a
- // given
- // db partition gets executed before a O->S message
- private static void addMessageInOrder(List<ZNRecord> msgList, Message newMsg)
- {
- String toState = newMsg.getToState();
- if (toState.equals("MASTER"))
- {
- msgList.add(newMsg.getRecord());
- }
- if (toState.equals("SLAVE"))
- {
- msgList.add(0, newMsg.getRecord());
- }
- }
-
- private static List<Message> computeMessagesForSimpleTransition(ZNRecord idealStateRecord)
- {
- List<Message> msgList = new ArrayList<Message>();
-
- IdealState idealState = new IdealState(idealStateRecord);
- for (String stateUnitKey : idealState.getPartitionSet())
- {
- Map<String, String> instanceStateMap;
- instanceStateMap = idealState.getInstanceStateMap(stateUnitKey);
- }
-
- return msgList;
- }
-
- public static class DBParam
- {
- public String name;
- public int partitions;
-
- public DBParam(String n, int p)
- {
- name = n;
- partitions = p;
- }
- }
-
- public static ClusterView generateStaticConfigClusterView(String[] nodesInfo,
- List<DBParam> dbParams, int replica)
- {
- // create mock cluster view
- ClusterView view = new ClusterView();
-
- // add nodes
- List<ZNRecord> nodeConfigList = new ArrayList<ZNRecord>();
- List<String> instanceNames = new ArrayList<String>();
-
- Arrays.sort(nodesInfo, new Comparator<String>() {
-
- @Override
- public int compare(String str1, String str2)
- {
- return str1.compareTo(str2);
- }
-
- });
-
- // set CONFIGS
- for (String nodeInfo : nodesInfo)
- {
- int lastPos = nodeInfo.lastIndexOf(":");
- if (lastPos == -1)
- {
- throw new IllegalArgumentException("nodeInfo should be in format of host:port, " + nodeInfo);
- }
-
- String host = nodeInfo.substring(0, lastPos);
- String port = nodeInfo.substring(lastPos + 1);
- String nodeId = host + "_" + port;
- ZNRecord nodeConfig = new ZNRecord(nodeId);
-
- nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_ENABLED.toString(),
- Boolean.toString(true));
- nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_HOST.toString(), host);
- nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_PORT.toString(), port);
-
- instanceNames.add(nodeId);
-
- nodeConfigList.add(nodeConfig);
- }
- view.setClusterPropertyList(PropertyType.CONFIGS, nodeConfigList);
-
- // set IDEALSTATES
- // compute ideal states for each db
- List<ZNRecord> idealStates = new ArrayList<ZNRecord>();
- for (DBParam dbParam : dbParams)
- {
- ZNRecord result = IdealStateCalculatorByShuffling.calculateIdealState(instanceNames,
- dbParam.partitions, replica, dbParam.name);
-
- idealStates.add(result);
- }
- view.setClusterPropertyList(PropertyType.IDEALSTATES, idealStates);
-
- // calculate messages for transition using naive algorithm
- Map<String, List<ZNRecord>> msgListForInstance = new HashMap<String, List<ZNRecord>>();
- List<ZNRecord> idealStatesArray = view.getPropertyList(PropertyType.IDEALSTATES);
- for (ZNRecord idealStateRecord : idealStatesArray)
- {
- // IdealState idealState = new IdealState(idealStateRecord);
-
- List<Message> messages = computeMessagesForSimpleTransition(idealStateRecord);
-
- for (Message message : messages)
- {
- // logger.info("Sending message to " + message.getTgtName() +
- // " transition "
- // + message.getStateUnitKey() + " from:" +
- // message.getFromState() +
- // " to:"
- // + message.getToState());
- // client.addMessage(message, message.getTgtName());
- String instance = message.getTgtName();
- List<ZNRecord> msgList = msgListForInstance.get(instance);
- if (msgList == null)
- {
- msgList = new ArrayList<ZNRecord>();
- msgListForInstance.put(instance, msgList);
- }
- // msgList.add(message);
- addMessageInOrder(msgList, message);
- }
- }
-
- // set INSTANCES
- // put message lists into cluster view
- List<ClusterView.MemberInstance> insList = new ArrayList<ClusterView.MemberInstance>();
- for (Map.Entry<String, List<ZNRecord>> entry : msgListForInstance.entrySet())
- {
- String instance = entry.getKey();
- List<ZNRecord> msgList = entry.getValue();
-
- ClusterView.MemberInstance ins = view.getMemberInstance(instance, true);
- ins.setInstanceProperty(PropertyType.MESSAGES, msgList);
- // ins.setInstanceProperty(InstancePropertyType.CURRENTSTATES,
- // null);
- // ins.setInstanceProperty(InstancePropertyType.ERRORS, null);
- // ins.setInstanceProperty(InstancePropertyType.STATUSUPDATES,
- // null);
- insList.add(ins);
- }
-
- // sort it
- ClusterView.MemberInstance[] insArray = new ClusterView.MemberInstance[insList.size()];
- insArray = insList.toArray(insArray);
- Arrays.sort(insArray, new Comparator<ClusterView.MemberInstance>() {
-
- @Override
- public int compare(ClusterView.MemberInstance ins1, ClusterView.MemberInstance ins2)
- {
- return ins1.getInstanceName().compareTo(ins2.getInstanceName());
- }
-
- });
-
- insList = Arrays.asList(insArray);
- view.setInstances(insList);
-
- return view;
- }
-
- @Override
- public void disconnect()
- {
- _isConnected = false;
- }
-
- @Override
- public void addIdealStateChangeListener(IdealStateChangeListener listener)
- {
-
- NotificationContext context = new NotificationContext(this);
- context.setType(NotificationContext.Type.INIT);
- List<ZNRecord> idealStates = _clusterView.getPropertyList(PropertyType.IDEALSTATES);
- listener.onIdealStateChange(
- HelixProperty.convertToTypedList(IdealState.class, idealStates), context);
- }
-
- @Override
- public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener)
- {
- throw new UnsupportedOperationException(
- "addLiveInstanceChangeListener is not supported by File Based cluster manager");
- }
-
- @Override
- public void addConfigChangeListener(ConfigChangeListener listener)
- {
- throw new UnsupportedOperationException(
- "addConfigChangeListener() is NOT supported by File Based cluster manager");
- }
-
- @Override
- public void addMessageListener(MessageListener listener, String instanceName)
- {
- NotificationContext context = new NotificationContext(this);
- context.setType(NotificationContext.Type.INIT);
- List<ZNRecord> messages;
- messages = _clusterView.getMemberInstance(instanceName, true).getInstanceProperty(
- PropertyType.MESSAGES);
- listener.onMessage(instanceName, HelixProperty.convertToTypedList(Message.class, messages),
- context);
- }
-
- @Override
- public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
- String instanceName, String sessionId)
- {
- throw new UnsupportedOperationException(
- "addCurrentStateChangeListener is not supported by File Based cluster manager");
- }
-
- @Override
- public void addExternalViewChangeListener(ExternalViewChangeListener listener)
- {
- throw new UnsupportedOperationException(
- "addExternalViewChangeListener() is NOT supported by File Based cluster manager");
- }
-
- @Override
- public DataAccessor getDataAccessor()
- {
- return null;
- }
-
- @Override
- public String getClusterName()
- {
- return _clusterName;
- }
-
- @Override
- public String getInstanceName()
- {
- return _instanceName;
- }
-
- @Override
- public void connect()
- {
- _isConnected = true;
- }
-
- @Override
- public String getSessionId()
- {
- return _sessionId;
- }
-
- public static ClusterView convertStateModelMapToClusterView(String outFile, String instanceName,
- StateModelFactory<StateModel> stateModelFactory)
- {
- Map<String, StateModel> currentStateMap = stateModelFactory.getStateModelMap();
- ClusterView curView = new ClusterView();
-
- ClusterView.MemberInstance memberInstance = curView.getMemberInstance(instanceName, true);
- List<ZNRecord> curStateList = new ArrayList<ZNRecord>();
-
- for (Map.Entry<String, StateModel> entry : currentStateMap.entrySet())
- {
- String stateUnitKey = entry.getKey();
- String curState = entry.getValue().getCurrentState();
- ZNRecord record = new ZNRecord(stateUnitKey);
- record.setSimpleField(stateUnitKey, curState);
- curStateList.add(record);
- }
-
- memberInstance.setInstanceProperty(PropertyType.CURRENTSTATES, curStateList);
-
- // serialize to file
- // String outFile = "/tmp/curClusterView_" + instanceName +".json";
- if (outFile != null)
- {
- // ClusterViewSerializer serializer = new
- // ClusterViewSerializer(outFile);
- // serializer.serialize(curView);
- ClusterViewSerializer.serialize(curView, new File(outFile));
- }
-
- return curView;
- }
-
- public static boolean verifyFileBasedClusterStates(String instanceName, String expectedFile,
- String curFile)
- {
- boolean ret = true;
- ClusterView expectedView = ClusterViewSerializer.deserialize(new File(expectedFile));
- ClusterView curView = ClusterViewSerializer.deserialize(new File(curFile));
-
- // ideal_state for instance with the given instanceName
- Map<String, String> idealStates = new HashMap<String, String>();
- for (ZNRecord idealStateItem : expectedView.getPropertyList(PropertyType.IDEALSTATES))
- {
- Map<String, Map<String, String>> allIdealStates = idealStateItem.getMapFields();
-
- for (Map.Entry<String, Map<String, String>> entry : allIdealStates.entrySet())
- {
- if (entry.getValue().containsKey(instanceName))
- {
- String state = entry.getValue().get(instanceName);
- idealStates.put(entry.getKey(), state);
- }
- }
- }
-
- ClusterView.MemberInstance memberInstance = curView.getMemberInstance(instanceName, false);
- List<ZNRecord> curStateList = memberInstance.getInstanceProperty(PropertyType.CURRENTSTATES);
-
- if (curStateList == null && idealStates.size() > 0)
- {
- LOG.info("current state is null");
- return false;
- } else if (curStateList == null && idealStates.size() == 0)
- {
- LOG.info("empty current state and ideal state");
- return true;
- } else if (curStateList.size() != idealStates.size())
- {
- LOG.info("Number of current states (" + curStateList.size() + ") mismatch "
- + "number of ideal states (" + idealStates.size() + ")");
- return false;
- }
-
- for (ZNRecord record : curStateList)
- {
- String stateUnitKey = record.getId();
- String curState = record.getSimpleField(stateUnitKey);
-
- // if (!curState.equalsIgnoreCase("offline"))
- // nonOfflineNr++;
-
- if (!idealStates.containsKey(stateUnitKey))
- {
- LOG.error("Current state does not contain " + stateUnitKey);
- ret = false;
- continue;
- }
-
- String idealState = idealStates.get(stateUnitKey);
- if (!curState.equalsIgnoreCase(idealState))
- {
- LOG.error("State mismatch--unit_key:" + stateUnitKey + " cur:" + curState + " ideal:"
- + idealState + " instance_name:" + instanceName);
- ret = false;
- continue;
- }
-
- }
-
- return ret;
- }
-
- @Override
- public boolean isConnected()
- {
- return _isConnected;
- }
-
- @Override
- public long getLastNotificationTime()
- {
- return 0;
- }
-
- @Override
- public void addControllerListener(ControllerChangeListener listener)
- {
- throw new UnsupportedOperationException(
- "addControllerListener() is NOT supported by File Based cluster manager");
- }
-
- @Override
- public boolean removeListener(Object listener)
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public HelixAdmin getClusterManagmentTool()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public PropertyStore<ZNRecord> getPropertyStore()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public ClusterMessagingService getMessagingService()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public ParticipantHealthReportCollector getHealthReportCollector()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public InstanceType getInstanceType()
- {
- return _instanceType;
- }
-
- @Override
- public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
- throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public String getVersion()
- {
- throw new UnsupportedOperationException("getVersion() not implemented in StaticFileClusterManager");
- }
-
- @Override
- public StateMachineEngine getStateMachineEngine()
- {
- throw new UnsupportedOperationException("getStateMachineEngine() not implemented in StaticFileClusterManager");
- }
-
- @Override
- public boolean isLeader()
- {
- throw new UnsupportedOperationException("isLeader() not implemented in StaticFileClusterManager");
- }
-
- @Override
- public ConfigAccessor getConfigAccessor()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void startTimerTasks()
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void stopTimerTasks()
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public HelixDataAccessor getHelixDataAccessor()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void addPreConnectCallback(PreConnectCallback callback)
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
- {
- // TODO Auto-generated method stub
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/file/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/file/package-info.java b/helix-core/src/main/java/com/linkedin/helix/manager/file/package-info.java
deleted file mode 100644
index 3d701b2..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/file/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * A file-based implementation of Helix cluster manager (Deprecated)
- *
- */
-package com.linkedin.helix.manager.file;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/BasicZkSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/BasicZkSerializer.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/BasicZkSerializer.java
deleted file mode 100644
index 2f7dfd7..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/BasicZkSerializer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-/**
- * Basic path based serializer which ignores the path and delegates
- * serialization into a regular {@link ZkSerializer}
- */
-public class BasicZkSerializer implements PathBasedZkSerializer
-{
- private final ZkSerializer _delegate;
-
- public BasicZkSerializer(ZkSerializer delegate)
- {
- _delegate = delegate;
- }
-
- public byte[] serialize(Object data, String path)
- {
- return _delegate.serialize(data);
- }
-
- @Override
- public Object deserialize(byte[] bytes, String path)
- {
- return _delegate.deserialize(bytes);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ByteArraySerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ByteArraySerializer.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ByteArraySerializer.java
deleted file mode 100644
index ac756d4..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ByteArraySerializer.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-public class ByteArraySerializer implements ZkSerializer
-{
- @Override
- public byte[] serialize(Object data) throws ZkMarshallingError
- {
- return (byte[])data;
- }
-
- @Override
- public Object deserialize(byte[] bytes) throws ZkMarshallingError
- {
- return bytes;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/Cache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/Cache.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/Cache.java
deleted file mode 100644
index fdcf909..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/Cache.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.store.zk.ZNode;
-
-public abstract class Cache<T>
-{
- final ReadWriteLock _lock;
- final ConcurrentHashMap<String, ZNode> _cache;
-
- public Cache()
- {
- _lock = new ReentrantReadWriteLock();
- _cache = new ConcurrentHashMap<String, ZNode>();
- }
-
- public void addToParentChildSet(String parentPath, String childName)
- {
- ZNode znode = _cache.get(parentPath);
- if (znode != null)
- {
- znode.addChild(childName);
- }
- }
-
- public void addToParentChildSet(String parentPath, List<String> childNames)
- {
- if (childNames != null && !childNames.isEmpty())
- {
- ZNode znode = _cache.get(parentPath);
- if (znode != null)
- {
- znode.addChildren(childNames);
- }
- }
- }
-
- public void removeFromParentChildSet(String parentPath, String name)
- {
- ZNode zNode = _cache.get(parentPath);
- if (zNode != null)
- {
- zNode.removeChild(name);
- }
- }
-
- public boolean exists(String path)
- {
- return _cache.containsKey(path);
- }
-
- public ZNode get(String path)
- {
- try
- {
- _lock.readLock().lock();
- return _cache.get(path);
- }
- finally
- {
- _lock.readLock().unlock();
- }
- }
-
- public void lockWrite()
- {
- _lock.writeLock().lock();
- }
-
- public void unlockWrite()
- {
- _lock.writeLock().unlock();
- }
-
- public void lockRead()
- {
- _lock.readLock().lock();
- }
-
- public void unlockRead()
- {
- _lock.readLock().unlock();
- }
-
- public void purgeRecursive(String path)
- {
- try
- {
- _lock.writeLock().lock();
-
- String parentPath = new File(path).getParent();
- String name = new File(path).getName();
- removeFromParentChildSet(parentPath, name);
-
- ZNode znode = _cache.remove(path);
- if (znode != null)
- {
- // recursively remove children nodes
- Set<String> childNames = znode.getChildSet();
- for (String childName : childNames)
- {
- String childPath = path + "/" + childName;
- purgeRecursive(childPath);
- }
- }
- }
- finally
- {
- _lock.writeLock().unlock();
- }
- }
-
- public void reset()
- {
- try
- {
- _lock.writeLock().lock();
- _cache.clear();
- }
- finally
- {
- _lock.writeLock().unlock();
- }
- }
-
- public abstract void update(String path, T data, Stat stat);
-
- public abstract void updateRecursive(String path);
-
-
- // debug
- public Map<String, ZNode> getCache()
- {
- return _cache;
- }
-
-}