You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[15/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/CallbackHandler.java
deleted file mode 100644
index 5cb6724..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/CallbackHandler.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import static com.linkedin.helix.HelixConstants.ChangeType.CONFIG;
-import static com.linkedin.helix.HelixConstants.ChangeType.CURRENT_STATE;
-import static com.linkedin.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
-import static com.linkedin.helix.HelixConstants.ChangeType.IDEAL_STATE;
-import static com.linkedin.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
-import static com.linkedin.helix.HelixConstants.ChangeType.MESSAGE;
-import static com.linkedin.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-import com.linkedin.helix.ConfigChangeListener;
-import com.linkedin.helix.ControllerChangeListener;
-import com.linkedin.helix.CurrentStateChangeListener;
-import com.linkedin.helix.ExternalViewChangeListener;
-import com.linkedin.helix.HealthStateChangeListener;
-import com.linkedin.helix.HelixConstants.ChangeType;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.IdealStateChangeListener;
-import com.linkedin.helix.LiveInstanceChangeListener;
-import com.linkedin.helix.MessageListener;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.HealthStat;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-
-public class CallbackHandler implements IZkChildListener, IZkDataListener
-
-{
-
- private static Logger logger = Logger.getLogger(CallbackHandler.class);
-
- private final String _path;
- private final Object _listener;
- private final EventType[] _eventTypes;
- private final HelixDataAccessor _accessor;
- private final ChangeType _changeType;
- private final ZkClient _zkClient;
- private final AtomicLong lastNotificationTimeStamp;
- private final HelixManager _manager;
-
- public CallbackHandler(HelixManager manager, ZkClient client, String path,
- Object listener, EventType[] eventTypes, ChangeType changeType)
- {
- this._manager = manager;
- this._accessor = manager.getHelixDataAccessor();
- this._zkClient = client;
- this._path = path;
- this._listener = listener;
- this._eventTypes = eventTypes;
- this._changeType = changeType;
- lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
- init();
- }
-
- public Object getListener()
- {
- return _listener;
- }
-
- public String getPath()
- {
- return _path;
- }
-
- public void invoke(NotificationContext changeContext) throws Exception
- {
- // This allows the listener to work with one change at a time
- synchronized (_manager)
- {
- Builder keyBuilder = _accessor.keyBuilder();
- long start = System.currentTimeMillis();
- if (logger.isInfoEnabled())
- {
- logger.info(Thread.currentThread().getId() + " START:INVOKE "
- // + changeContext.getPathChanged()
- + _path + " listener:" + _listener.getClass().getCanonicalName());
- }
-
- if (_changeType == IDEAL_STATE)
- {
-
- IdealStateChangeListener idealStateChangeListener =
- (IdealStateChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true);
- List<IdealState> idealStates = _accessor.getChildValues(keyBuilder.idealStates());
-
- idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
-
- }
- else if (_changeType == CONFIG)
- {
-
- ConfigChangeListener configChangeListener = (ConfigChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true);
- List<InstanceConfig> configs =
- _accessor.getChildValues(keyBuilder.instanceConfigs());
-
- configChangeListener.onConfigChange(configs, changeContext);
-
- }
- else if (_changeType == LIVE_INSTANCE)
- {
- LiveInstanceChangeListener liveInstanceChangeListener =
- (LiveInstanceChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true);
- List<LiveInstance> liveInstances =
- _accessor.getChildValues(keyBuilder.liveInstances());
-
- liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
-
- }
- else if (_changeType == CURRENT_STATE)
- {
- CurrentStateChangeListener currentStateChangeListener;
- currentStateChangeListener = (CurrentStateChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true);
- String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
- String[] pathParts = _path.split("/");
-
- // TODO: fix this
- List<CurrentState> currentStates =
- _accessor.getChildValues(keyBuilder.currentStates(instanceName,
- pathParts[pathParts.length - 1]));
-
- currentStateChangeListener.onStateChange(instanceName,
- currentStates,
- changeContext);
-
- }
- else if (_changeType == MESSAGE)
- {
- MessageListener messageListener = (MessageListener) _listener;
- subscribeForChanges(changeContext, _path, true, false);
- String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
- List<Message> messages =
- _accessor.getChildValues(keyBuilder.messages(instanceName));
-
- messageListener.onMessage(instanceName, messages, changeContext);
-
- }
- else if (_changeType == MESSAGES_CONTROLLER)
- {
- MessageListener messageListener = (MessageListener) _listener;
- subscribeForChanges(changeContext, _path, true, false);
- List<Message> messages =
- _accessor.getChildValues(keyBuilder.controllerMessages());
-
- messageListener.onMessage(_manager.getInstanceName(), messages, changeContext);
-
- }
- else if (_changeType == EXTERNAL_VIEW)
- {
- ExternalViewChangeListener externalViewListener =
- (ExternalViewChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true);
- List<ExternalView> externalViewList =
- _accessor.getChildValues(keyBuilder.externalViews());
-
- externalViewListener.onExternalViewChange(externalViewList, changeContext);
- }
- else if (_changeType == ChangeType.CONTROLLER)
- {
- ControllerChangeListener controllerChangelistener =
- (ControllerChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, false);
- controllerChangelistener.onControllerChange(changeContext);
- }
- else if (_changeType == ChangeType.HEALTH)
- {
- HealthStateChangeListener healthStateChangeListener =
- (HealthStateChangeListener) _listener;
- subscribeForChanges(changeContext, _path, true, true); // TODO: figure out
- // settings here
- String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
-
- List<HealthStat> healthReportList =
- _accessor.getChildValues(keyBuilder.healthReports(instanceName));
-
- healthStateChangeListener.onHealthChange(instanceName,
- healthReportList,
- changeContext);
- }
- long end = System.currentTimeMillis();
- if (logger.isInfoEnabled())
- {
- logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path
- + " listener:" + _listener.getClass().getCanonicalName() + " Took: "
- + (end - start));
- }
- }
- }
-
- private void subscribeForChanges(NotificationContext context,
- String path,
- boolean watchParent,
- boolean watchChild)
- {
- NotificationContext.Type type = context.getType();
- if (watchParent && type == NotificationContext.Type.INIT)
- {
- logger.info(_manager.getInstanceName() + " subscribe child change@" + path);
- _zkClient.subscribeChildChanges(path, this);
- }
- else if (watchParent && type == NotificationContext.Type.FINALIZE)
- {
- logger.info(_manager.getInstanceName() + " UNsubscribe child change@" + path);
- _zkClient.unsubscribeChildChanges(path, this);
- }
-
- if (watchChild)
- {
- try
- {
- List<String> childNames = _zkClient.getChildren(path);
- if (childNames == null || childNames.size() == 0)
- {
- return;
- }
-
- for (String childName : childNames)
- {
- String childPath = path + "/" + childName;
- if (type == NotificationContext.Type.INIT
- || type == NotificationContext.Type.CALLBACK)
- {
- if (logger.isDebugEnabled())
- {
- logger.debug(_manager.getInstanceName() + " subscribe data change@" + path);
- }
- _zkClient.subscribeDataChanges(childPath, this);
-
- }
- else if (type == NotificationContext.Type.FINALIZE)
- {
- logger.info(_manager.getInstanceName() + " UNsubscribe data change@" + path);
- _zkClient.unsubscribeDataChanges(childPath, this);
- }
-
- subscribeForChanges(context, childPath, watchParent, watchChild);
- }
- }
- catch (ZkNoNodeException e)
- {
- logger.warn("fail to subscribe data change@" + path);
- }
- }
-
- }
-
- public EventType[] getEventTypes()
- {
- return _eventTypes;
- }
-
- /**
- * Invoke the listener so that it sets up the initial values from the zookeeper if any
- * exists
- *
- */
- public void init()
- {
- updateNotificationTime(System.nanoTime());
- try
- {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.INIT);
- invoke(changeContext);
- }
- catch (Exception e)
- {
- ZKExceptionHandler.getInstance().handle(e);
- }
- }
-
- @Override
- public void handleDataChange(String dataPath, Object data)
- {
- try
- {
- updateNotificationTime(System.nanoTime());
- if (dataPath != null && dataPath.startsWith(_path))
- {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.CALLBACK);
- invoke(changeContext);
- }
- }
- catch (Exception e)
- {
- ZKExceptionHandler.getInstance().handle(e);
- }
- }
-
- @Override
- public void handleDataDeleted(String dataPath)
- {
- try
- {
- updateNotificationTime(System.nanoTime());
- if (dataPath != null && dataPath.startsWith(_path))
- {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.CALLBACK);
- _zkClient.unsubscribeChildChanges(dataPath, this);
- invoke(changeContext);
- }
- }
- catch (Exception e)
- {
- ZKExceptionHandler.getInstance().handle(e);
- }
- }
-
- @Override
- public void handleChildChange(String parentPath, List<String> currentChilds)
- {
- try
- {
- updateNotificationTime(System.nanoTime());
- if (parentPath != null && parentPath.startsWith(_path))
- {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.CALLBACK);
- invoke(changeContext);
- }
- }
- catch (Exception e)
- {
- ZKExceptionHandler.getInstance().handle(e);
- }
- }
-
- /**
- * Invoke the listener for the last time so that the listener could clean up resources
- *
- */
- public void reset()
- {
- try
- {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.FINALIZE);
- invoke(changeContext);
- }
- catch (Exception e)
- {
- ZKExceptionHandler.getInstance().handle(e);
- }
- }
-
- private void updateNotificationTime(long nanoTime)
- {
- long l = lastNotificationTimeStamp.get();
- while (nanoTime > l)
- {
- boolean b = lastNotificationTimeStamp.compareAndSet(l, nanoTime);
- if (b)
- {
- break;
- }
- else
- {
- l = lastNotificationTimeStamp.get();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ChainedPathZkSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ChainedPathZkSerializer.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ChainedPathZkSerializer.java
deleted file mode 100644
index 659eaec..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ChainedPathZkSerializer.java
+++ /dev/null
@@ -1,131 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-public class ChainedPathZkSerializer implements PathBasedZkSerializer
-{
-
- public static class Builder
- {
- private final ZkSerializer _defaultSerializer;
- private List<ChainItem> _items = new ArrayList<ChainItem>();
-
- private Builder(ZkSerializer defaultSerializer)
- {
- _defaultSerializer = defaultSerializer;
- }
-
- /**
- * Add a serializing strategy for the given path prefix
- * The most specific path will triumph over a more generic (shorter)
- * one regardless of the ordering of the calls.
- */
- public Builder serialize(String path, ZkSerializer withSerializer)
- {
- _items.add(new ChainItem(normalize(path), withSerializer));
- return this;
- }
-
- /**
- * Builds the serializer with the given strategies and default serializer.
- */
- public ChainedPathZkSerializer build() {
- return new ChainedPathZkSerializer(_defaultSerializer, _items);
- }
- }
-
- /**
- * Create a builder that will use the given serializer by default
- * if no other strategy is given to solve the path in question.
- */
- public static Builder builder(ZkSerializer defaultSerializer)
- {
- return new Builder(defaultSerializer);
- }
-
- private final List<ChainItem> _items;
- private final ZkSerializer _defaultSerializer;
-
- private ChainedPathZkSerializer(ZkSerializer defaultSerializer, List<ChainItem> items)
- {
- _items = items;
- // sort by longest paths first
- // if two items would match one would be prefix of the other
- // and the longest must be more specific
- Collections.sort(_items);
- _defaultSerializer = defaultSerializer;
- }
-
- @Override
- public byte[] serialize(Object data, String path) throws ZkMarshallingError
- {
- for (ChainItem item : _items)
- {
- if (item.matches(path)) return item._serializer.serialize(data);
- }
- return _defaultSerializer.serialize(data);
- }
-
- @Override
- public Object deserialize(byte[] bytes, String path)
- throws ZkMarshallingError
- {
- for (ChainItem item : _items)
- {
- if (item.matches(path)) return item._serializer.deserialize(bytes);
- }
- return _defaultSerializer.deserialize(bytes);
- }
-
- private static class ChainItem implements Comparable<ChainItem>
- {
- final String _path;
- final ZkSerializer _serializer;
-
- ChainItem(String path, ZkSerializer serializer)
- {
- _path = path;
- _serializer = serializer;
- }
-
- boolean matches(String path)
- {
- if (_path.equals(path))
- {
- return true;
- }
- else if (path.length() > _path.length())
- {
- if (path.startsWith(_path) && path.charAt(_path.length()) == '/')
- {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public int compareTo(ChainItem o)
- {
- return o._path.length() - _path.length();
- }
- }
-
- private static String normalize(String path) {
- if (!path.startsWith("/")) {
- // ensure leading slash
- path = "/" + path;
- }
- if (path.endsWith("/")) {
- // remove trailing slash
- path = path.substring(0, path.length()-1);
- }
- return path;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultControllerMessageHandlerFactory.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
deleted file mode 100644
index 4fc6b98..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.messaging.handling.HelixTaskResult;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-
-public class DefaultControllerMessageHandlerFactory implements
- MessageHandlerFactory
-{
- private static Logger _logger = Logger.getLogger(DefaultControllerMessageHandlerFactory.class);
- @Override
- public MessageHandler createHandler(Message message,
- NotificationContext context)
- {
- String type = message.getMsgType();
-
- if(!type.equals(getMessageType()))
- {
- throw new HelixException("Unexpected msg type for message "+message.getMsgId()
- +" type:" + message.getMsgType());
- }
-
- return new DefaultControllerMessageHandler(message, context);
- }
-
- @Override
- public String getMessageType()
- {
- return MessageType.CONTROLLER_MSG.toString();
- }
-
- @Override
- public void reset()
- {
-
- }
-
- public static class DefaultControllerMessageHandler extends MessageHandler
- {
- public DefaultControllerMessageHandler(Message message,
- NotificationContext context)
- {
- super(message, context);
- }
-
- @Override
- public HelixTaskResult handleMessage() throws InterruptedException
- {
- String type = _message.getMsgType();
- HelixTaskResult result = new HelixTaskResult();
- if(!type.equals(MessageType.CONTROLLER_MSG.toString()))
- {
- throw new HelixException("Unexpected msg type for message "+_message.getMsgId()
- +" type:" + _message.getMsgType());
- }
- result.getTaskResultMap().put("ControllerResult", "msg "+ _message.getMsgId() + " from "+_message.getMsgSrc() + " processed");
- result.setSuccess(true);
- return result;
- }
-
- @Override
- public void onError(Exception e, ErrorCode code, ErrorType type)
- {
- _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
deleted file mode 100644
index ec7c2f7..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-
-import java.util.Arrays;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.messaging.handling.HelixTaskResult;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.model.Message;
-
-/**
- * DefaultParticipantErrorMessageHandlerFactory works on controller side.
- * When the participant detects a critical error, it will send the PARTICIPANT_ERROR_REPORT
- * Message to the controller, specifying whether it want to disable the instance or
- * disable the partition. The controller have a chance to do whatever make sense at that point,
- * and then disable the corresponding partition or the instance. More configs per resource will
- * be added to customize the controller behavior.
- * */
-public class DefaultParticipantErrorMessageHandlerFactory implements
- MessageHandlerFactory
-{
- public enum ActionOnError
- {
- DISABLE_PARTITION, DISABLE_RESOURCE, DISABLE_INSTANCE
- }
-
- public static final String ACTIONKEY = "ActionOnError";
-
- private static Logger _logger = Logger
- .getLogger(DefaultParticipantErrorMessageHandlerFactory.class);
- final HelixManager _manager;
-
- public DefaultParticipantErrorMessageHandlerFactory(HelixManager manager)
- {
- _manager = manager;
- }
-
- public static class DefaultParticipantErrorMessageHandler extends MessageHandler
- {
- final HelixManager _manager;
- public DefaultParticipantErrorMessageHandler(Message message,
- NotificationContext context, HelixManager manager)
- {
- super(message, context);
- _manager = manager;
- }
-
- @Override
- public HelixTaskResult handleMessage() throws InterruptedException
- {
- HelixTaskResult result = new HelixTaskResult();
- result.setSuccess(true);
- // TODO : consider unify this with StatsAggregationStage.executeAlertActions()
- try
- {
- ActionOnError actionOnError
- = ActionOnError.valueOf(_message.getRecord().getSimpleField(ACTIONKEY));
-
- if(actionOnError == ActionOnError.DISABLE_INSTANCE)
- {
- _manager.getClusterManagmentTool().enableInstance(_manager.getClusterName(), _message.getMsgSrc(), false);
- _logger.info("Instance " + _message.getMsgSrc() + " disabled");
- }
- else if(actionOnError == ActionOnError.DISABLE_PARTITION)
- {
- _manager.getClusterManagmentTool().enablePartition(false, _manager.getClusterName(), _message.getMsgSrc(),
- _message.getResourceName(), Arrays.asList( _message.getPartitionName()));
- _logger.info("partition " + _message.getPartitionName() + " disabled");
- }
- else if (actionOnError == ActionOnError.DISABLE_RESOURCE)
- {
- // NOT IMPLEMENTED, or we can disable all partitions
- //_manager.getClusterManagmentTool().en(_manager.getClusterName(), _manager.getInstanceName(),
- // _message.getResourceName(), _message.getPartitionName(), false);
- _logger.info("resource " + _message.getResourceName() + " disabled");
- }
- }
- catch(Exception e)
- {
- _logger.error("", e);
- result.setSuccess(false);
- result.setException(e);
- }
- return result;
- }
-
- @Override
- public void onError(Exception e, ErrorCode code, ErrorType type)
- {
- _logger.error("Message handling pipeline get an exception. MsgId:"
- + _message.getMsgId(), e);
- }
-
- }
-
- @Override
- public MessageHandler createHandler(Message message,
- NotificationContext context)
- {
- String type = message.getMsgType();
-
- if(!type.equals(getMessageType()))
- {
- throw new HelixException("Unexpected msg type for message "+message.getMsgId()
- +" type:" + message.getMsgType());
- }
-
- return new DefaultParticipantErrorMessageHandler(message, context, _manager);
- }
-
- @Override
- public String getMessageType()
- {
- return Message.MessageType.PARTICIPANT_ERROR_REPORT.toString();
- }
-
- @Override
- public void reset()
- {
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
deleted file mode 100644
index 0b19eca..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.io.StringReader;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.messaging.AsyncCallback;
-import com.linkedin.helix.messaging.handling.HelixTaskResult;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.model.StatusUpdate;
-import com.linkedin.helix.util.StatusUpdateUtil;
-
-/*
- * TODO: The current implementation is temporary for backup handler testing only and it does not
- * do any throttling.
- *
- */
-public class DefaultSchedulerMessageHandlerFactory implements
- MessageHandlerFactory
-{
- public static final String WAIT_ALL = "WAIT_ALL";
- public static final String SCHEDULER_MSG_ID = "SchedulerMessageId";
- public static class SchedulerAsyncCallback extends AsyncCallback
- {
- StatusUpdateUtil _statusUpdateUtil = new StatusUpdateUtil();
- Message _originalMessage;
- HelixManager _manager;
- final Map<String, Map<String, String>> _resultSummaryMap = new ConcurrentHashMap<String, Map<String, String>>();
-
- public SchedulerAsyncCallback(Message originalMessage, HelixManager manager)
- {
- _originalMessage = originalMessage;
- _manager = manager;
- }
-
- @Override
- public void onTimeOut()
- {
- _logger.info("Scheduler msg timeout " + _originalMessage.getMsgId()
- + " timout with " + _timeout + " Ms");
-
- _statusUpdateUtil.logError(_originalMessage,
- SchedulerAsyncCallback.class, "Task timeout",
- _manager.getHelixDataAccessor());
- addSummary(_resultSummaryMap, _originalMessage, _manager, true);
- }
-
- @Override
- public void onReplyMessage(Message message)
- {
- _logger.info("Update for scheduler msg " + _originalMessage.getMsgId()
- + " Message " + message.getMsgSrc() + " id "
- + message.getCorrelationId() + " completed");
- String key = "MessageResult " + message.getMsgSrc() + " "
- + UUID.randomUUID();
- _resultSummaryMap.put(key, message.getResultMap());
-
- if (this.isDone())
- {
- _logger.info("Scheduler msg " + _originalMessage.getMsgId()
- + " completed");
- _statusUpdateUtil.logInfo(_originalMessage,
- SchedulerAsyncCallback.class, "Scheduler task completed",
- _manager.getHelixDataAccessor());
- addSummary(_resultSummaryMap, _originalMessage, _manager, false);
- }
- }
-
- private void addSummary(Map<String, Map<String, String>> _resultSummaryMap,
- Message originalMessage, HelixManager manager, boolean timeOut)
- {
- Map<String, String> summary = new TreeMap<String, String>();
- summary.put("TotalMessages:", "" + _resultSummaryMap.size());
- summary.put("Timeout", "" + timeOut);
- _resultSummaryMap.put("Summary", summary);
-
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
- ZNRecord statusUpdate = accessor.getProperty(
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
- originalMessage.getMsgId())).getRecord();
-
- statusUpdate.getMapFields().putAll(_resultSummaryMap);
- accessor.setProperty(keyBuilder.controllerTaskStatus(
- MessageType.SCHEDULER_MSG.toString(), originalMessage.getMsgId()),
- new StatusUpdate(statusUpdate));
-
- }
- }
-
- private static Logger _logger = Logger
- .getLogger(DefaultSchedulerMessageHandlerFactory.class);
- HelixManager _manager;
-
- public DefaultSchedulerMessageHandlerFactory(HelixManager manager)
- {
- _manager = manager;
- }
-
- @Override
- public MessageHandler createHandler(Message message,
- NotificationContext context)
- {
- String type = message.getMsgType();
-
- if (!type.equals(getMessageType()))
- {
- throw new HelixException("Unexpected msg type for message "
- + message.getMsgId() + " type:" + message.getMsgType());
- }
-
- return new DefaultSchedulerMessageHandler(message, context, _manager);
- }
-
- @Override
- public String getMessageType()
- {
- return MessageType.SCHEDULER_MSG.toString();
- }
-
- @Override
- public void reset()
- {
- }
-
- public static class DefaultSchedulerMessageHandler extends MessageHandler
- {
- HelixManager _manager;
-
- public DefaultSchedulerMessageHandler(Message message,
- NotificationContext context, HelixManager manager)
- {
- super(message, context);
- _manager = manager;
- }
-
- @Override
- public HelixTaskResult handleMessage() throws InterruptedException
- {
- String type = _message.getMsgType();
- HelixTaskResult result = new HelixTaskResult();
- if (!type.equals(MessageType.SCHEDULER_MSG.toString()))
- {
- throw new HelixException("Unexpected msg type for message "
- + _message.getMsgId() + " type:" + _message.getMsgType());
- }
- // Parse timeout value
- int timeOut = -1;
- if (_message.getRecord().getSimpleFields().containsKey("TIMEOUT"))
- {
- try
- {
- timeOut = Integer.parseInt(_message.getRecord().getSimpleFields()
- .get("TIMEOUT"));
- } catch (Exception e)
- {
- }
- }
-
- // Parse the message template
- ZNRecord record = new ZNRecord("templateMessage");
- record.getSimpleFields().putAll(
- _message.getRecord().getMapField("MessageTemplate"));
- Message messageTemplate = new Message(record);
-
- // Parse the criteria
- StringReader sr = new StringReader(_message.getRecord().getSimpleField(
- "Criteria"));
- ObjectMapper mapper = new ObjectMapper();
- Criteria recipientCriteria;
- try
- {
- recipientCriteria = mapper.readValue(sr, Criteria.class);
- } catch (Exception e)
- {
- _logger.error("", e);
- result.setException(e);
- result.setSuccess(false);
- return result;
- }
- _logger.info("Scheduler sending message, criteria:" + recipientCriteria);
-
- boolean waitAll = false;
- if(_message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL) !=null)
- {
- try
- {
- waitAll = Boolean.parseBoolean(_message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL));
- }
- catch(Exception e)
- {
- _logger.warn("",e);
- }
- }
- // Send all messages.
-
- int nMsgsSent = 0;
- SchedulerAsyncCallback callback = new SchedulerAsyncCallback(_message, _manager);
- if(waitAll)
- {
- nMsgsSent = _manager.getMessagingService().sendAndWait(recipientCriteria,
- messageTemplate,
- callback,
- timeOut);
- }
- else
- {
- nMsgsSent = _manager.getMessagingService().send(recipientCriteria,
- messageTemplate,
- callback,
- timeOut);
- }
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- // Record the number of messages sent into status updates
- Map<String, String> sendSummary = new HashMap<String, String>();
- sendSummary.put("MessageCount", "" + nMsgsSent);
-
- ZNRecord statusUpdate = accessor.getProperty(
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
- _message.getMsgId())).getRecord();
-
- statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
-
- accessor.setProperty(keyBuilder.controllerTaskStatus(
- MessageType.SCHEDULER_MSG.toString(), _message.getMsgId()),
- new StatusUpdate(statusUpdate));
-
- result.getTaskResultMap().put(
- "ControllerResult",
- "msg " + _message.getMsgId() + " from " + _message.getMsgSrc()
- + " processed");
- result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMsgId());
- result.setSuccess(true);
- return result;
- }
-
- @Override
- public void onError(Exception e, ErrorCode code, ErrorType type)
- {
- _logger.error("Message handling pipeline get an exception. MsgId:"
- + _message.getMsgId(), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/HelixGroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/HelixGroupCommit.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/HelixGroupCommit.java
deleted file mode 100644
index 7edc8ab..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/HelixGroupCommit.java
+++ /dev/null
@@ -1,174 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.exception.ZkBadVersionException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-public class HelixGroupCommit<T>
-{
- private static Logger LOG = Logger.getLogger(HelixGroupCommit.class);
-
- private static class Queue<T>
- {
- final AtomicReference<Thread> _running = new AtomicReference<Thread>();
- final ConcurrentLinkedQueue<Entry<T>> _pending = new ConcurrentLinkedQueue<Entry<T>>();
- }
-
- private static class Entry<T>
- {
- final String _key;
- final DataUpdater<T> _updater;
- AtomicBoolean _sent = new AtomicBoolean(false);
-
- Entry(String key, DataUpdater<T> updater)
- {
- _key = key;
- _updater = updater;
- }
- }
-
- private final Queue<T>[] _queues = new Queue[100];
-
- public HelixGroupCommit()
- {
- // Don't use Arrays.fill();
- for (int i = 0; i < _queues.length; ++i)
- {
- _queues[i] = new Queue<T>();
- }
- }
-
- private Queue<T> getQueue(String key)
- {
- return _queues[(key.hashCode() & Integer.MAX_VALUE) % _queues.length];
- }
-
- public boolean commit(ZkBaseDataAccessor<T> accessor,
- int options,
- String key,
- DataUpdater<T> updater)
- {
- Queue<T> queue = getQueue(key);
- Entry<T> entry = new Entry<T>(key, updater);
-
- queue._pending.add(entry);
-
- while (!entry._sent.get())
- {
- if (queue._running.compareAndSet(null, Thread.currentThread()))
- {
- ArrayList<Entry<T>> processed = new ArrayList<Entry<T>>();
- try
- {
- Entry<T> first = queue._pending.peek();
- if (first == null)
- {
- return true;
- }
-
- // remove from queue
- // Entry first = queue._pending.poll();
- // processed.add(first);
-
- String mergedKey = first._key;
-
- boolean retry;
- do
- {
- retry = false;
-
- try
- {
- T merged = null;
-
- Stat readStat = new Stat();
- try
- {
- // accessor will fallback to zk if not found in cache
- merged = accessor.get(mergedKey, readStat, options);
- }
- catch (ZkNoNodeException e)
- {
- // OK.
- }
-
- // updater should handler merged == null
- // merged = first._updater.update(merged);
-
- // iterate over processed if we are retrying
- Iterator<Entry<T>> it = processed.iterator();
- while (it.hasNext())
- {
- Entry<T> ent = it.next();
- if (!ent._key.equals(mergedKey))
- {
- continue;
- }
- merged = ent._updater.update(merged);
- // System.out.println("After merging:" + merged);
- }
-
- // iterate over queue._pending for newly coming requests
- it = queue._pending.iterator();
- while (it.hasNext())
- {
- Entry<T> ent = it.next();
- if (!ent._key.equals(mergedKey))
- {
- continue;
- }
- processed.add(ent);
- merged = ent._updater.update(merged);
- // System.out.println("After merging:" + merged);
- it.remove();
- }
- // System.out.println("size:"+ processed.size());
- accessor.set(mergedKey, merged, null, null, readStat.getVersion(), options);
- }
- catch (ZkBadVersionException e)
- {
- retry = true;
- }
- }
- while (retry);
- }
- finally
- {
- queue._running.set(null);
- for (Entry<T> e : processed)
- {
- synchronized (e)
- {
- e._sent.set(true);
- e.notify();
- }
- }
- }
- }
- else
- {
- synchronized (entry)
- {
- try
- {
- entry.wait(10);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- return false;
- }
- }
- }
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/PathBasedZkSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/PathBasedZkSerializer.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/PathBasedZkSerializer.java
deleted file mode 100644
index f52047d..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/PathBasedZkSerializer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-
-public interface PathBasedZkSerializer
-{
-
- /**
- * Serialize data differently according to different paths
- *
- * @param data
- * @param path
- * @return
- * @throws ZkMarshallingError
- */
- public byte[] serialize(Object data, String path) throws ZkMarshallingError;
-
- /**
- * Deserialize data differently according to different paths
- *
- * @param bytes
- * @param path
- * @return
- * @throws ZkMarshallingError
- */
- public Object deserialize(byte[] bytes, String path) throws ZkMarshallingError;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/WriteThroughCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/WriteThroughCache.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/WriteThroughCache.java
deleted file mode 100644
index 410c23c..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/WriteThroughCache.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package com.linkedin.helix.manager.zk;
-
-import java.io.File;
-import java.util.List;
-
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.AccessOption;
-import com.linkedin.helix.BaseDataAccessor;
-import com.linkedin.helix.store.zk.ZNode;
-
-public class WriteThroughCache<T> extends Cache<T>
-{
- private static Logger LOG = Logger.getLogger(WriteThroughCache.class);
-
- final BaseDataAccessor<T> _accessor;
-
- public WriteThroughCache(BaseDataAccessor<T> accessor, List<String> paths)
- {
- super();
- _accessor = accessor;
-
- // init cache
- if (paths != null && !paths.isEmpty())
- {
- for (String path : paths)
- {
- updateRecursive(path);
- }
- }
- }
-
- @Override
- public void update(String path, T data, Stat stat)
- {
- String parentPath = new File(path).getParent();
- String childName = new File(path).getName();
- addToParentChildSet(parentPath, childName);
-
- ZNode znode = _cache.get(path);
- if (znode == null)
- {
- _cache.put(path, new ZNode(path, data, stat));
- }
- else
- {
- znode.setData(data);
- znode.setStat(stat);
- }
- }
-
- @Override
- public void updateRecursive(String path)
- {
- if (path == null)
- {
- return;
- }
-
- try
- {
- _lock.writeLock().lock();
-
-// // update parent's childSet
-// String parentPath = new File(path).getParent();
-// String name = new File(path).getName();
-// addToParentChildSet(parentPath, name);
-
- // update this node
- Stat stat = new Stat();
- T readData = _accessor.get(path, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
-
- update(path, readData, stat);
-
-// ZNode znode = _cache.get(path);
-// if (znode == null)
-// {
-// znode = new ZNode(path, readData, stat);
-// _cache.put(path, znode);
-// }
-// else
-// {
-// znode.setData(readData);
-// znode.setStat(stat);
-// }
-
- // recursively update children nodes if not exists
- ZNode znode = _cache.get(path);
- List<String> childNames = _accessor.getChildNames(path, 0);
- if (childNames != null && childNames.size() > 0)
- {
- for (String childName : childNames)
- {
- String childPath = path + "/" + childName;
- if (!znode.hasChild(childName))
- {
- znode.addChild(childName);
- updateRecursive(childPath);
- }
- }
- }
- }
- catch (ZkNoNodeException e)
- {
- // OK. someone delete znode while we are updating cache
- }
- finally
- {
- _lock.writeLock().unlock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKDataAccessor.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKDataAccessor.java
deleted file mode 100644
index 14a7f6d..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKDataAccessor.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-
-@Deprecated
-public class ZKDataAccessor implements DataAccessor
-{
- private static Logger logger = Logger.getLogger(ZKDataAccessor.class);
-
- protected final String _clusterName;
- protected final ZkClient _zkClient;
-
- /**
- * If a PropertyType has children (e.g. CONFIGS), then the parent path is the
- * first key and child path is the second key; If a PropertyType has no child
- * (e.g. LEADER), then no cache
- */
- private final Map<String, Map<String, ZNRecord>> _cache = new ConcurrentHashMap<String, Map<String, ZNRecord>>();
-
- public ZKDataAccessor(String clusterName, ZkClient zkClient)
- {
- _clusterName = clusterName;
- _zkClient = zkClient;
- }
-
- @Override
- public boolean setProperty(PropertyType type, HelixProperty value, String... keys)
- {
- if (!value.isValid())
- {
- throw new HelixException("The ZNRecord for " + type + " is not valid.");
- }
- return setProperty(type, value.getRecord(), keys);
- }
-
- @Override
- public boolean setProperty(PropertyType type, ZNRecord value, String... keys)
- {
- String path = PropertyPathConfig.getPath(type, _clusterName, keys);
-
- String parent = new File(path).getParent();
- if (!_zkClient.exists(parent))
- {
- _zkClient.createPersistent(parent, true);
- }
-
- if (_zkClient.exists(path))
- {
- if (type.isCreateOnlyIfAbsent())
- {
- return false;
- } else
- {
- ZKUtil.createOrUpdate(_zkClient, path, value, type.isPersistent(), false);
- }
- } else
- {
- try
- {
- if (type.isPersistent())
- {
- _zkClient.createPersistent(path, value);
- } else
- {
- _zkClient.createEphemeral(path, value);
- }
- } catch (Exception e)
- {
- logger.warn("Exception while creating path:" + path
- + " Most likely due to race condition(Ignorable).", e);
- return false;
- }
- }
- return true;
- }
-
- @Override
- public boolean updateProperty(PropertyType type, HelixProperty value, String... keys)
- {
- return updateProperty(type, value.getRecord(), keys);
- }
-
- @Override
- public boolean updateProperty(PropertyType type, ZNRecord value, String... keys)
- {
- String path = PropertyPathConfig.getPath(type, _clusterName, keys);
- if (type.isUpdateOnlyOnExists())
- {
- ZKUtil.updateIfExists(_zkClient, path, value, type.isMergeOnUpdate());
- } else
- {
- String parent = new File(path).getParent();
-
- if (!_zkClient.exists(parent))
- {
- _zkClient.createPersistent(parent, true);
- }
-
- if (!type.usePropertyTransferServer())
- {
- ZKUtil.createOrUpdate(_zkClient, path, value, type.isPersistent(), type.isMergeOnUpdate());
- } else
- {
- ZKUtil.asyncCreateOrUpdate(_zkClient, path, value, type.isPersistent(), type.isMergeOnUpdate());
- }
- }
-
- return true;
- }
-
- @Override
- public <T extends HelixProperty>
- T getProperty(Class<T> clazz, PropertyType type, String... keys)
- {
- return HelixProperty.convertToTypedInstance(clazz, getProperty(type, keys));
- }
-
- @Override
- public ZNRecord getProperty(PropertyType type, String... keys)
- {
- String path = PropertyPathConfig.getPath(type, _clusterName, keys);
-
- if (!type.isCached())
- {
- return _zkClient.readData(path, true);
- } else
- {
- int len = keys.length;
- if (len == 0)
- {
- return _zkClient.readData(path, true);
- } else
- {
- String[] subkeys = Arrays.copyOfRange(keys, 0, len - 1);
- Map<String, ZNRecord> newChilds = refreshChildValuesCache(type, subkeys);
- return newChilds.get(keys[len - 1]);
- }
- }
-
- }
-
- @Override
- public boolean removeProperty(PropertyType type, String... keys)
- {
- String path = PropertyPathConfig.getPath(type, _clusterName, keys);
- return _zkClient.delete(path);
- }
-
- @Override
- public List<String> getChildNames(PropertyType type, String... keys)
- {
- String path = PropertyPathConfig.getPath(type, _clusterName, keys);
- if (_zkClient.exists(path))
- {
- return _zkClient.getChildren(path);
- } else
- {
- return Collections.emptyList();
- }
- }
-
- @Override
- public <T extends HelixProperty>
- List<T> getChildValues(Class<T> clazz, PropertyType type, String... keys)
- {
- List<ZNRecord> newChilds = getChildValues(type, keys);
- if (newChilds.size() > 0)
- {
- return HelixProperty.convertToTypedList(clazz, newChilds);
- }
- return Collections.emptyList();
- }
-
- @Override
- public List<ZNRecord> getChildValues(PropertyType type, String... keys)
-
- {
- String path = PropertyPathConfig.getPath(type, _clusterName, keys);
- // if (path == null)
- // {
- // System.err.println("path is null");
- // }
-
- if (_zkClient.exists(path))
- {
- if (!type.isCached())
- {
- return ZKUtil.getChildren(_zkClient, path);
- } else
- {
- Map<String, ZNRecord> newChilds = refreshChildValuesCache(type, keys);
- return new ArrayList<ZNRecord>(newChilds.values());
- }
- }
-
- return Collections.emptyList();
- }
-
- public void reset()
- {
- _cache.clear();
- }
-
- private Map<String, ZNRecord> refreshChildValuesCache(PropertyType type, String... keys)
- {
- if (!type.isCached())
- {
- throw new IllegalArgumentException("Type:" + type + " is NOT cached");
- }
-
- String path = PropertyPathConfig.getPath(type, _clusterName, keys);
-
- Map<String, ZNRecord> newChilds = refreshChildValues(path, _cache.get(path));
- if (newChilds != null && newChilds.size() > 0)
- {
- _cache.put(path, newChilds);
- return newChilds;
- } else
- {
- _cache.remove(path);
- return Collections.emptyMap();
- }
- }
-
- /**
- * Read a zookeeper node only if it's data has been changed since last read
- *
- * @param parentPath
- * @param oldChildRecords
- * @return newChildRecords
- */
- private Map<String, ZNRecord> refreshChildValues(String parentPath,
- Map<String, ZNRecord> oldChildRecords)
- {
- List<String> childs = _zkClient.getChildren(parentPath);
- if (childs == null || childs.size() == 0)
- {
- return Collections.emptyMap();
- }
-
- Stat newStat = new Stat();
- Map<String, ZNRecord> newChildRecords = new HashMap<String, ZNRecord>();
- for (String child : childs)
- {
- String childPath = parentPath + "/" + child;
-
- // assume record.id should be the last part of zookeeper path
- if (oldChildRecords == null || !oldChildRecords.containsKey(child))
- {
- ZNRecord record = _zkClient.readDataAndStat(childPath, newStat, true);
- if (record != null)
- {
- record.setVersion(newStat.getVersion());
- newChildRecords.put(child, record);
- }
- } else
- {
- ZNRecord oldChild = oldChildRecords.get(child);
-
- int oldVersion = oldChild.getVersion();
- long oldCtime = oldChild.getCreationTime();
- newStat = _zkClient.getStat(childPath);
- if (newStat != null)
- {
- // System.out.print(child + " oldStat:" + oldStat);
- // System.out.print(child + " newStat:" + newStat);
-
- if (oldCtime < newStat.getCtime() ||
- oldVersion < newStat.getVersion())
- {
- ZNRecord record = _zkClient.readDataAndStat(childPath, newStat, true);
- if (record != null)
- {
- record.setVersion(newStat.getVersion());
- record.setCreationTime(newStat.getCtime());
- record.setModifiedTime(newStat.getMtime());
- newChildRecords.put(child, record);
- }
- } else
- {
- newChildRecords.put(child, oldChild);
- }
- }
- }
- }
-
- return Collections.unmodifiableMap(newChildRecords);
- }
-
- @Override
- public <T extends HelixProperty>
- Map<String, T> getChildValuesMap(Class<T> clazz, PropertyType type, String... keys)
- {
- List<T> list = getChildValues(clazz, type, keys);
- return Collections.unmodifiableMap(HelixProperty.convertListToMap(list));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKExceptionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKExceptionHandler.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKExceptionHandler.java
deleted file mode 100644
index 0a5b52b..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/ZKExceptionHandler.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.manager.zk;
-
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.apache.log4j.Logger;
-
-public class ZKExceptionHandler
-{
- private static ZKExceptionHandler instance = new ZKExceptionHandler();
- private static Logger logger = Logger.getLogger(ZKExceptionHandler.class);
- private ZKExceptionHandler()
- {
-
- }
-
- void handle(Exception e)
- {
- logger.error(Thread.currentThread().getName() + ". isThreadInterruped: " + Thread.currentThread().isInterrupted());
-
- if (e instanceof ZkInterruptedException)
- {
- logger.error("zk connection is interrupted.", e);
- }
- else
- {
- logger.error(e.getMessage(), e);
- // e.printStackTrace();
- }
- }
-
- public static ZKExceptionHandler getInstance()
- {
- return instance;
- }
-}