You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC
[12/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/file/StaticFileHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/file/StaticFileHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/file/StaticFileHelixManager.java
new file mode 100644
index 0000000..9867d13
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/file/StaticFileHelixManager.java
@@ -0,0 +1,569 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.file;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ClusterView;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.tools.ClusterViewSerializer;
+import org.apache.helix.tools.IdealStateCalculatorByShuffling;
+import org.apache.log4j.Logger;
+
+
+@Deprecated
+public class StaticFileHelixManager implements HelixManager
+{
+ private static final Logger LOG = Logger.getLogger(StaticFileHelixManager.class.getName());
+ // for backward compatibility
+ // TODO remove it later
+ private final ClusterView _clusterView;
+ private final String _clusterName;
+ private final InstanceType _instanceType;
+ private final String _instanceName;
+ private boolean _isConnected;
+ public static final String _sessionId = "12345";
+ public static final String configFile = "configFile";
+
+ public StaticFileHelixManager(String clusterName, String instanceName, InstanceType instanceType,
+ String clusterViewFile)
+ {
+ _clusterName = clusterName;
+ _instanceName = instanceName;
+ _instanceType = instanceType;
+ _clusterView = ClusterViewSerializer.deserialize(new File(clusterViewFile));
+ }
+
+ // FIXIT
+ // reorder the messages to reduce the possibility that a S->M message for a
+ // given
+ // db partition gets executed before a O->S message
+ private static void addMessageInOrder(List<ZNRecord> msgList, Message newMsg)
+ {
+ String toState = newMsg.getToState();
+ if (toState.equals("MASTER"))
+ {
+ msgList.add(newMsg.getRecord());
+ }
+ if (toState.equals("SLAVE"))
+ {
+ msgList.add(0, newMsg.getRecord());
+ }
+ }
+
+ private static List<Message> computeMessagesForSimpleTransition(ZNRecord idealStateRecord)
+ {
+ List<Message> msgList = new ArrayList<Message>();
+
+ IdealState idealState = new IdealState(idealStateRecord);
+ for (String stateUnitKey : idealState.getPartitionSet())
+ {
+ Map<String, String> instanceStateMap;
+ instanceStateMap = idealState.getInstanceStateMap(stateUnitKey);
+ }
+
+ return msgList;
+ }
+
+ public static class DBParam
+ {
+ public String name;
+ public int partitions;
+
+ public DBParam(String n, int p)
+ {
+ name = n;
+ partitions = p;
+ }
+ }
+
+ public static ClusterView generateStaticConfigClusterView(String[] nodesInfo,
+ List<DBParam> dbParams, int replica)
+ {
+ // create mock cluster view
+ ClusterView view = new ClusterView();
+
+ // add nodes
+ List<ZNRecord> nodeConfigList = new ArrayList<ZNRecord>();
+ List<String> instanceNames = new ArrayList<String>();
+
+ Arrays.sort(nodesInfo, new Comparator<String>() {
+
+ @Override
+ public int compare(String str1, String str2)
+ {
+ return str1.compareTo(str2);
+ }
+
+ });
+
+ // set CONFIGS
+ for (String nodeInfo : nodesInfo)
+ {
+ int lastPos = nodeInfo.lastIndexOf(":");
+ if (lastPos == -1)
+ {
+ throw new IllegalArgumentException("nodeInfo should be in format of host:port, " + nodeInfo);
+ }
+
+ String host = nodeInfo.substring(0, lastPos);
+ String port = nodeInfo.substring(lastPos + 1);
+ String nodeId = host + "_" + port;
+ ZNRecord nodeConfig = new ZNRecord(nodeId);
+
+ nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_ENABLED.toString(),
+ Boolean.toString(true));
+ nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_HOST.toString(), host);
+ nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_PORT.toString(), port);
+
+ instanceNames.add(nodeId);
+
+ nodeConfigList.add(nodeConfig);
+ }
+ view.setClusterPropertyList(PropertyType.CONFIGS, nodeConfigList);
+
+ // set IDEALSTATES
+ // compute ideal states for each db
+ List<ZNRecord> idealStates = new ArrayList<ZNRecord>();
+ for (DBParam dbParam : dbParams)
+ {
+ ZNRecord result = IdealStateCalculatorByShuffling.calculateIdealState(instanceNames,
+ dbParam.partitions, replica, dbParam.name);
+
+ idealStates.add(result);
+ }
+ view.setClusterPropertyList(PropertyType.IDEALSTATES, idealStates);
+
+ // calculate messages for transition using naive algorithm
+ Map<String, List<ZNRecord>> msgListForInstance = new HashMap<String, List<ZNRecord>>();
+ List<ZNRecord> idealStatesArray = view.getPropertyList(PropertyType.IDEALSTATES);
+ for (ZNRecord idealStateRecord : idealStatesArray)
+ {
+ // IdealState idealState = new IdealState(idealStateRecord);
+
+ List<Message> messages = computeMessagesForSimpleTransition(idealStateRecord);
+
+ for (Message message : messages)
+ {
+ // logger.info("Sending message to " + message.getTgtName() +
+ // " transition "
+ // + message.getStateUnitKey() + " from:" +
+ // message.getFromState() +
+ // " to:"
+ // + message.getToState());
+ // client.addMessage(message, message.getTgtName());
+ String instance = message.getTgtName();
+ List<ZNRecord> msgList = msgListForInstance.get(instance);
+ if (msgList == null)
+ {
+ msgList = new ArrayList<ZNRecord>();
+ msgListForInstance.put(instance, msgList);
+ }
+ // msgList.add(message);
+ addMessageInOrder(msgList, message);
+ }
+ }
+
+ // set INSTANCES
+ // put message lists into cluster view
+ List<ClusterView.MemberInstance> insList = new ArrayList<ClusterView.MemberInstance>();
+ for (Map.Entry<String, List<ZNRecord>> entry : msgListForInstance.entrySet())
+ {
+ String instance = entry.getKey();
+ List<ZNRecord> msgList = entry.getValue();
+
+ ClusterView.MemberInstance ins = view.getMemberInstance(instance, true);
+ ins.setInstanceProperty(PropertyType.MESSAGES, msgList);
+ // ins.setInstanceProperty(InstancePropertyType.CURRENTSTATES,
+ // null);
+ // ins.setInstanceProperty(InstancePropertyType.ERRORS, null);
+ // ins.setInstanceProperty(InstancePropertyType.STATUSUPDATES,
+ // null);
+ insList.add(ins);
+ }
+
+ // sort it
+ ClusterView.MemberInstance[] insArray = new ClusterView.MemberInstance[insList.size()];
+ insArray = insList.toArray(insArray);
+ Arrays.sort(insArray, new Comparator<ClusterView.MemberInstance>() {
+
+ @Override
+ public int compare(ClusterView.MemberInstance ins1, ClusterView.MemberInstance ins2)
+ {
+ return ins1.getInstanceName().compareTo(ins2.getInstanceName());
+ }
+
+ });
+
+ insList = Arrays.asList(insArray);
+ view.setInstances(insList);
+
+ return view;
+ }
+
+ @Override
+ public void disconnect()
+ {
+ _isConnected = false;
+ }
+
+ @Override
+ public void addIdealStateChangeListener(IdealStateChangeListener listener)
+ {
+
+ NotificationContext context = new NotificationContext(this);
+ context.setType(NotificationContext.Type.INIT);
+ List<ZNRecord> idealStates = _clusterView.getPropertyList(PropertyType.IDEALSTATES);
+ listener.onIdealStateChange(
+ HelixProperty.convertToTypedList(IdealState.class, idealStates), context);
+ }
+
+ @Override
+ public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener)
+ {
+ throw new UnsupportedOperationException(
+ "addLiveInstanceChangeListener is not supported by File Based cluster manager");
+ }
+
+ @Override
+ public void addConfigChangeListener(ConfigChangeListener listener)
+ {
+ throw new UnsupportedOperationException(
+ "addConfigChangeListener() is NOT supported by File Based cluster manager");
+ }
+
+ @Override
+ public void addMessageListener(MessageListener listener, String instanceName)
+ {
+ NotificationContext context = new NotificationContext(this);
+ context.setType(NotificationContext.Type.INIT);
+ List<ZNRecord> messages;
+ messages = _clusterView.getMemberInstance(instanceName, true).getInstanceProperty(
+ PropertyType.MESSAGES);
+ listener.onMessage(instanceName, HelixProperty.convertToTypedList(Message.class, messages),
+ context);
+ }
+
+ @Override
+ public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
+ String instanceName, String sessionId)
+ {
+ throw new UnsupportedOperationException(
+ "addCurrentStateChangeListener is not supported by File Based cluster manager");
+ }
+
+ @Override
+ public void addExternalViewChangeListener(ExternalViewChangeListener listener)
+ {
+ throw new UnsupportedOperationException(
+ "addExternalViewChangeListener() is NOT supported by File Based cluster manager");
+ }
+
+ @Override
+ public DataAccessor getDataAccessor()
+ {
+ return null;
+ }
+
+ @Override
+ public String getClusterName()
+ {
+ return _clusterName;
+ }
+
+ @Override
+ public String getInstanceName()
+ {
+ return _instanceName;
+ }
+
+ @Override
+ public void connect()
+ {
+ _isConnected = true;
+ }
+
+ @Override
+ public String getSessionId()
+ {
+ return _sessionId;
+ }
+
+ public static ClusterView convertStateModelMapToClusterView(String outFile, String instanceName,
+ StateModelFactory<StateModel> stateModelFactory)
+ {
+ Map<String, StateModel> currentStateMap = stateModelFactory.getStateModelMap();
+ ClusterView curView = new ClusterView();
+
+ ClusterView.MemberInstance memberInstance = curView.getMemberInstance(instanceName, true);
+ List<ZNRecord> curStateList = new ArrayList<ZNRecord>();
+
+ for (Map.Entry<String, StateModel> entry : currentStateMap.entrySet())
+ {
+ String stateUnitKey = entry.getKey();
+ String curState = entry.getValue().getCurrentState();
+ ZNRecord record = new ZNRecord(stateUnitKey);
+ record.setSimpleField(stateUnitKey, curState);
+ curStateList.add(record);
+ }
+
+ memberInstance.setInstanceProperty(PropertyType.CURRENTSTATES, curStateList);
+
+ // serialize to file
+ // String outFile = "/tmp/curClusterView_" + instanceName +".json";
+ if (outFile != null)
+ {
+ // ClusterViewSerializer serializer = new
+ // ClusterViewSerializer(outFile);
+ // serializer.serialize(curView);
+ ClusterViewSerializer.serialize(curView, new File(outFile));
+ }
+
+ return curView;
+ }
+
+ public static boolean verifyFileBasedClusterStates(String instanceName, String expectedFile,
+ String curFile)
+ {
+ boolean ret = true;
+ ClusterView expectedView = ClusterViewSerializer.deserialize(new File(expectedFile));
+ ClusterView curView = ClusterViewSerializer.deserialize(new File(curFile));
+
+ // ideal_state for instance with the given instanceName
+ Map<String, String> idealStates = new HashMap<String, String>();
+ for (ZNRecord idealStateItem : expectedView.getPropertyList(PropertyType.IDEALSTATES))
+ {
+ Map<String, Map<String, String>> allIdealStates = idealStateItem.getMapFields();
+
+ for (Map.Entry<String, Map<String, String>> entry : allIdealStates.entrySet())
+ {
+ if (entry.getValue().containsKey(instanceName))
+ {
+ String state = entry.getValue().get(instanceName);
+ idealStates.put(entry.getKey(), state);
+ }
+ }
+ }
+
+ ClusterView.MemberInstance memberInstance = curView.getMemberInstance(instanceName, false);
+ List<ZNRecord> curStateList = memberInstance.getInstanceProperty(PropertyType.CURRENTSTATES);
+
+ if (curStateList == null && idealStates.size() > 0)
+ {
+ LOG.info("current state is null");
+ return false;
+ } else if (curStateList == null && idealStates.size() == 0)
+ {
+ LOG.info("empty current state and ideal state");
+ return true;
+ } else if (curStateList.size() != idealStates.size())
+ {
+ LOG.info("Number of current states (" + curStateList.size() + ") mismatch "
+ + "number of ideal states (" + idealStates.size() + ")");
+ return false;
+ }
+
+ for (ZNRecord record : curStateList)
+ {
+ String stateUnitKey = record.getId();
+ String curState = record.getSimpleField(stateUnitKey);
+
+ // if (!curState.equalsIgnoreCase("offline"))
+ // nonOfflineNr++;
+
+ if (!idealStates.containsKey(stateUnitKey))
+ {
+ LOG.error("Current state does not contain " + stateUnitKey);
+ ret = false;
+ continue;
+ }
+
+ String idealState = idealStates.get(stateUnitKey);
+ if (!curState.equalsIgnoreCase(idealState))
+ {
+ LOG.error("State mismatch--unit_key:" + stateUnitKey + " cur:" + curState + " ideal:"
+ + idealState + " instance_name:" + instanceName);
+ ret = false;
+ continue;
+ }
+
+ }
+
+ return ret;
+ }
+
+ @Override
+ public boolean isConnected()
+ {
+ return _isConnected;
+ }
+
+ @Override
+ public long getLastNotificationTime()
+ {
+ return 0;
+ }
+
+ @Override
+ public void addControllerListener(ControllerChangeListener listener)
+ {
+ throw new UnsupportedOperationException(
+ "addControllerListener() is NOT supported by File Based cluster manager");
+ }
+
+ @Override
+ public boolean removeListener(Object listener)
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public HelixAdmin getClusterManagmentTool()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public PropertyStore<ZNRecord> getPropertyStore()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ClusterMessagingService getMessagingService()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ParticipantHealthReportCollector getHealthReportCollector()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public InstanceType getInstanceType()
+ {
+ return _instanceType;
+ }
+
+ @Override
+ public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
+ throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getVersion()
+ {
+ throw new UnsupportedOperationException("getVersion() not implemented in StaticFileClusterManager");
+ }
+
+ @Override
+ public StateMachineEngine getStateMachineEngine()
+ {
+ throw new UnsupportedOperationException("getStateMachineEngine() not implemented in StaticFileClusterManager");
+ }
+
+ @Override
+ public boolean isLeader()
+ {
+ throw new UnsupportedOperationException("isLeader() not implemented in StaticFileClusterManager");
+ }
+
+ @Override
+ public ConfigAccessor getConfigAccessor()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void startTimerTasks()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void stopTimerTasks()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public HelixDataAccessor getHelixDataAccessor()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void addPreConnectCallback(PreConnectCallback callback)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/file/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/file/package-info.java b/helix-core/src/main/java/org/apache/helix/manager/file/package-info.java
new file mode 100644
index 0000000..d490d98
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/file/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * A file-based implementation of Helix cluster manager (Deprecated)
+ *
+ */
+package org.apache.helix.manager.file;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/BasicZkSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/BasicZkSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/BasicZkSerializer.java
new file mode 100644
index 0000000..006fd3f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/BasicZkSerializer.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+/**
+ * Basic path based serializer which ignores the path and delegates
+ * serialization into a regular {@link ZkSerializer}
+ */
+public class BasicZkSerializer implements PathBasedZkSerializer
+{
+ private final ZkSerializer _delegate;
+
+ public BasicZkSerializer(ZkSerializer delegate)
+ {
+ _delegate = delegate;
+ }
+
+ public byte[] serialize(Object data, String path)
+ {
+ return _delegate.serialize(data);
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes, String path)
+ {
+ return _delegate.deserialize(bytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ByteArraySerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ByteArraySerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ByteArraySerializer.java
new file mode 100644
index 0000000..a22d6be
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ByteArraySerializer.java
@@ -0,0 +1,20 @@
+package org.apache.helix.manager.zk;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+public class ByteArraySerializer implements ZkSerializer
+{
+ @Override
+ public byte[] serialize(Object data) throws ZkMarshallingError
+ {
+ return (byte[])data;
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) throws ZkMarshallingError
+ {
+ return bytes;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/Cache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/Cache.java b/helix-core/src/main/java/org/apache/helix/manager/zk/Cache.java
new file mode 100644
index 0000000..0906685
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/Cache.java
@@ -0,0 +1,146 @@
+package org.apache.helix.manager.zk;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.helix.store.zk.ZNode;
+import org.apache.zookeeper.data.Stat;
+
+
+public abstract class Cache<T>
+{
+ final ReadWriteLock _lock;
+ final ConcurrentHashMap<String, ZNode> _cache;
+
+ public Cache()
+ {
+ _lock = new ReentrantReadWriteLock();
+ _cache = new ConcurrentHashMap<String, ZNode>();
+ }
+
+ public void addToParentChildSet(String parentPath, String childName)
+ {
+ ZNode znode = _cache.get(parentPath);
+ if (znode != null)
+ {
+ znode.addChild(childName);
+ }
+ }
+
+ public void addToParentChildSet(String parentPath, List<String> childNames)
+ {
+ if (childNames != null && !childNames.isEmpty())
+ {
+ ZNode znode = _cache.get(parentPath);
+ if (znode != null)
+ {
+ znode.addChildren(childNames);
+ }
+ }
+ }
+
+ public void removeFromParentChildSet(String parentPath, String name)
+ {
+ ZNode zNode = _cache.get(parentPath);
+ if (zNode != null)
+ {
+ zNode.removeChild(name);
+ }
+ }
+
+ public boolean exists(String path)
+ {
+ return _cache.containsKey(path);
+ }
+
+ public ZNode get(String path)
+ {
+ try
+ {
+ _lock.readLock().lock();
+ return _cache.get(path);
+ }
+ finally
+ {
+ _lock.readLock().unlock();
+ }
+ }
+
+ public void lockWrite()
+ {
+ _lock.writeLock().lock();
+ }
+
+ public void unlockWrite()
+ {
+ _lock.writeLock().unlock();
+ }
+
+ public void lockRead()
+ {
+ _lock.readLock().lock();
+ }
+
+ public void unlockRead()
+ {
+ _lock.readLock().unlock();
+ }
+
+ public void purgeRecursive(String path)
+ {
+ try
+ {
+ _lock.writeLock().lock();
+
+ String parentPath = new File(path).getParent();
+ String name = new File(path).getName();
+ removeFromParentChildSet(parentPath, name);
+
+ ZNode znode = _cache.remove(path);
+ if (znode != null)
+ {
+ // recursively remove children nodes
+ Set<String> childNames = znode.getChildSet();
+ for (String childName : childNames)
+ {
+ String childPath = path + "/" + childName;
+ purgeRecursive(childPath);
+ }
+ }
+ }
+ finally
+ {
+ _lock.writeLock().unlock();
+ }
+ }
+
+ public void reset()
+ {
+ try
+ {
+ _lock.writeLock().lock();
+ _cache.clear();
+ }
+ finally
+ {
+ _lock.writeLock().unlock();
+ }
+ }
+
+ public abstract void update(String path, T data, Stat stat);
+
+ public abstract void updateRecursive(String path);
+
+
+ // debug
+ public Map<String, ZNode> getCache()
+ {
+ return _cache;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
new file mode 100644
index 0000000..a67a79e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -0,0 +1,399 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import static org.apache.helix.HelixConstants.ChangeType.CONFIG;
+import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
+import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
+import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
+import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+
+public class CallbackHandler implements IZkChildListener, IZkDataListener
+
+{
+
+ private static Logger logger = Logger.getLogger(CallbackHandler.class);
+
+ private final String _path;
+ private final Object _listener;
+ private final EventType[] _eventTypes;
+ private final HelixDataAccessor _accessor;
+ private final ChangeType _changeType;
+ private final ZkClient _zkClient;
+ private final AtomicLong lastNotificationTimeStamp;
+ private final HelixManager _manager;
+
+ public CallbackHandler(HelixManager manager, ZkClient client, String path,
+ Object listener, EventType[] eventTypes, ChangeType changeType)
+ {
+ this._manager = manager;
+ this._accessor = manager.getHelixDataAccessor();
+ this._zkClient = client;
+ this._path = path;
+ this._listener = listener;
+ this._eventTypes = eventTypes;
+ this._changeType = changeType;
+ lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
+ init();
+ }
+
+ public Object getListener()
+ {
+ return _listener;
+ }
+
+ public String getPath()
+ {
+ return _path;
+ }
+
+ public void invoke(NotificationContext changeContext) throws Exception
+ {
+ // This allows the listener to work with one change at a time
+ synchronized (_manager)
+ {
+ Builder keyBuilder = _accessor.keyBuilder();
+ long start = System.currentTimeMillis();
+ if (logger.isInfoEnabled())
+ {
+ logger.info(Thread.currentThread().getId() + " START:INVOKE "
+ // + changeContext.getPathChanged()
+ + _path + " listener:" + _listener.getClass().getCanonicalName());
+ }
+
+ if (_changeType == IDEAL_STATE)
+ {
+
+ IdealStateChangeListener idealStateChangeListener =
+ (IdealStateChangeListener) _listener;
+ subscribeForChanges(changeContext, _path, true, true);
+ List<IdealState> idealStates = _accessor.getChildValues(keyBuilder.idealStates());
+
+ idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
+
+ }
+ else if (_changeType == CONFIG)
+ {
+
+ ConfigChangeListener configChangeListener = (ConfigChangeListener) _listener;
+ subscribeForChanges(changeContext, _path, true, true);
+ List<InstanceConfig> configs =
+ _accessor.getChildValues(keyBuilder.instanceConfigs());
+
+ configChangeListener.onConfigChange(configs, changeContext);
+
+ }
+ else if (_changeType == LIVE_INSTANCE)
+ {
+ LiveInstanceChangeListener liveInstanceChangeListener =
+ (LiveInstanceChangeListener) _listener;
+ subscribeForChanges(changeContext, _path, true, true);
+ List<LiveInstance> liveInstances =
+ _accessor.getChildValues(keyBuilder.liveInstances());
+
+ liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
+
+ }
+ else if (_changeType == CURRENT_STATE)
+ {
+ CurrentStateChangeListener currentStateChangeListener;
+ currentStateChangeListener = (CurrentStateChangeListener) _listener;
+ subscribeForChanges(changeContext, _path, true, true);
+ String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+ String[] pathParts = _path.split("/");
+
+ // TODO: fix this
+ List<CurrentState> currentStates =
+ _accessor.getChildValues(keyBuilder.currentStates(instanceName,
+ pathParts[pathParts.length - 1]));
+
+ currentStateChangeListener.onStateChange(instanceName,
+ currentStates,
+ changeContext);
+
+ }
+ else if (_changeType == MESSAGE)
+ {
+ MessageListener messageListener = (MessageListener) _listener;
+ subscribeForChanges(changeContext, _path, true, false);
+ String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+ List<Message> messages =
+ _accessor.getChildValues(keyBuilder.messages(instanceName));
+
+ messageListener.onMessage(instanceName, messages, changeContext);
+
+ }
+ else if (_changeType == MESSAGES_CONTROLLER)
+ {
+ MessageListener messageListener = (MessageListener) _listener;
+ subscribeForChanges(changeContext, _path, true, false);
+ List<Message> messages =
+ _accessor.getChildValues(keyBuilder.controllerMessages());
+
+ messageListener.onMessage(_manager.getInstanceName(), messages, changeContext);
+
+ }
+ else if (_changeType == EXTERNAL_VIEW)
+ {
+ ExternalViewChangeListener externalViewListener =
+ (ExternalViewChangeListener) _listener;
+ subscribeForChanges(changeContext, _path, true, true);
+ List<ExternalView> externalViewList =
+ _accessor.getChildValues(keyBuilder.externalViews());
+
+ externalViewListener.onExternalViewChange(externalViewList, changeContext);
+ }
+ else if (_changeType == ChangeType.CONTROLLER)
+ {
+ ControllerChangeListener controllerChangelistener =
+ (ControllerChangeListener) _listener;
+ subscribeForChanges(changeContext, _path, true, false);
+ controllerChangelistener.onControllerChange(changeContext);
+ }
+ else if (_changeType == ChangeType.HEALTH)
+ {
+ HealthStateChangeListener healthStateChangeListener =
+ (HealthStateChangeListener) _listener;
+ subscribeForChanges(changeContext, _path, true, true); // TODO: figure out
+ // settings here
+ String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+
+ List<HealthStat> healthReportList =
+ _accessor.getChildValues(keyBuilder.healthReports(instanceName));
+
+ healthStateChangeListener.onHealthChange(instanceName,
+ healthReportList,
+ changeContext);
+ }
+ long end = System.currentTimeMillis();
+ if (logger.isInfoEnabled())
+ {
+ logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path
+ + " listener:" + _listener.getClass().getCanonicalName() + " Took: "
+ + (end - start));
+ }
+ }
+ }
+
+ private void subscribeForChanges(NotificationContext context,
+ String path,
+ boolean watchParent,
+ boolean watchChild)
+ {
+ NotificationContext.Type type = context.getType();
+ if (watchParent && type == NotificationContext.Type.INIT)
+ {
+ logger.info(_manager.getInstanceName() + " subscribe child change@" + path);
+ _zkClient.subscribeChildChanges(path, this);
+ }
+ else if (watchParent && type == NotificationContext.Type.FINALIZE)
+ {
+ logger.info(_manager.getInstanceName() + " UNsubscribe child change@" + path);
+ _zkClient.unsubscribeChildChanges(path, this);
+ }
+
+ if (watchChild)
+ {
+ try
+ {
+ List<String> childNames = _zkClient.getChildren(path);
+ if (childNames == null || childNames.size() == 0)
+ {
+ return;
+ }
+
+ for (String childName : childNames)
+ {
+ String childPath = path + "/" + childName;
+ if (type == NotificationContext.Type.INIT
+ || type == NotificationContext.Type.CALLBACK)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug(_manager.getInstanceName() + " subscribe data change@" + path);
+ }
+ _zkClient.subscribeDataChanges(childPath, this);
+
+ }
+ else if (type == NotificationContext.Type.FINALIZE)
+ {
+ logger.info(_manager.getInstanceName() + " UNsubscribe data change@" + path);
+ _zkClient.unsubscribeDataChanges(childPath, this);
+ }
+
+ subscribeForChanges(context, childPath, watchParent, watchChild);
+ }
+ }
+ catch (ZkNoNodeException e)
+ {
+ logger.warn("fail to subscribe data change@" + path);
+ }
+ }
+
+ }
+
+ public EventType[] getEventTypes()
+ {
+ return _eventTypes;
+ }
+
+ /**
+ * Invoke the listener so that it sets up the initial values from the zookeeper if any
+ * exists
+ *
+ */
+ public void init()
+ {
+ updateNotificationTime(System.nanoTime());
+ try
+ {
+ NotificationContext changeContext = new NotificationContext(_manager);
+ changeContext.setType(NotificationContext.Type.INIT);
+ invoke(changeContext);
+ }
+ catch (Exception e)
+ {
+ ZKExceptionHandler.getInstance().handle(e);
+ }
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data)
+ {
+ try
+ {
+ updateNotificationTime(System.nanoTime());
+ if (dataPath != null && dataPath.startsWith(_path))
+ {
+ NotificationContext changeContext = new NotificationContext(_manager);
+ changeContext.setType(NotificationContext.Type.CALLBACK);
+ invoke(changeContext);
+ }
+ }
+ catch (Exception e)
+ {
+ ZKExceptionHandler.getInstance().handle(e);
+ }
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath)
+ {
+ try
+ {
+ updateNotificationTime(System.nanoTime());
+ if (dataPath != null && dataPath.startsWith(_path))
+ {
+ NotificationContext changeContext = new NotificationContext(_manager);
+ changeContext.setType(NotificationContext.Type.CALLBACK);
+ _zkClient.unsubscribeChildChanges(dataPath, this);
+ invoke(changeContext);
+ }
+ }
+ catch (Exception e)
+ {
+ ZKExceptionHandler.getInstance().handle(e);
+ }
+ }
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds)
+ {
+ try
+ {
+ updateNotificationTime(System.nanoTime());
+ if (parentPath != null && parentPath.startsWith(_path))
+ {
+ NotificationContext changeContext = new NotificationContext(_manager);
+ changeContext.setType(NotificationContext.Type.CALLBACK);
+ invoke(changeContext);
+ }
+ }
+ catch (Exception e)
+ {
+ ZKExceptionHandler.getInstance().handle(e);
+ }
+ }
+
+ /**
+ * Invoke the listener for the last time so that the listener could clean up resources
+ *
+ */
+ public void reset()
+ {
+ try
+ {
+ NotificationContext changeContext = new NotificationContext(_manager);
+ changeContext.setType(NotificationContext.Type.FINALIZE);
+ invoke(changeContext);
+ }
+ catch (Exception e)
+ {
+ ZKExceptionHandler.getInstance().handle(e);
+ }
+ }
+
+ private void updateNotificationTime(long nanoTime)
+ {
+ long l = lastNotificationTimeStamp.get();
+ while (nanoTime > l)
+ {
+ boolean b = lastNotificationTimeStamp.compareAndSet(l, nanoTime);
+ if (b)
+ {
+ break;
+ }
+ else
+ {
+ l = lastNotificationTimeStamp.get();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ChainedPathZkSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ChainedPathZkSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ChainedPathZkSerializer.java
new file mode 100644
index 0000000..cfd0be0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ChainedPathZkSerializer.java
@@ -0,0 +1,131 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+public class ChainedPathZkSerializer implements PathBasedZkSerializer
+{
+
+ public static class Builder
+ {
+ private final ZkSerializer _defaultSerializer;
+ private List<ChainItem> _items = new ArrayList<ChainItem>();
+
+ private Builder(ZkSerializer defaultSerializer)
+ {
+ _defaultSerializer = defaultSerializer;
+ }
+
+ /**
+ * Add a serializing strategy for the given path prefix
+ * The most specific path will triumph over a more generic (shorter)
+ * one regardless of the ordering of the calls.
+ */
+ public Builder serialize(String path, ZkSerializer withSerializer)
+ {
+ _items.add(new ChainItem(normalize(path), withSerializer));
+ return this;
+ }
+
+ /**
+ * Builds the serializer with the given strategies and default serializer.
+ */
+ public ChainedPathZkSerializer build() {
+ return new ChainedPathZkSerializer(_defaultSerializer, _items);
+ }
+ }
+
+ /**
+ * Create a builder that will use the given serializer by default
+ * if no other strategy is given to solve the path in question.
+ */
+ public static Builder builder(ZkSerializer defaultSerializer)
+ {
+ return new Builder(defaultSerializer);
+ }
+
+ private final List<ChainItem> _items;
+ private final ZkSerializer _defaultSerializer;
+
+ private ChainedPathZkSerializer(ZkSerializer defaultSerializer, List<ChainItem> items)
+ {
+ _items = items;
+ // sort by longest paths first
+ // if two items would match one would be prefix of the other
+ // and the longest must be more specific
+ Collections.sort(_items);
+ _defaultSerializer = defaultSerializer;
+ }
+
+ @Override
+ public byte[] serialize(Object data, String path) throws ZkMarshallingError
+ {
+ for (ChainItem item : _items)
+ {
+ if (item.matches(path)) return item._serializer.serialize(data);
+ }
+ return _defaultSerializer.serialize(data);
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes, String path)
+ throws ZkMarshallingError
+ {
+ for (ChainItem item : _items)
+ {
+ if (item.matches(path)) return item._serializer.deserialize(bytes);
+ }
+ return _defaultSerializer.deserialize(bytes);
+ }
+
+ private static class ChainItem implements Comparable<ChainItem>
+ {
+ final String _path;
+ final ZkSerializer _serializer;
+
+ ChainItem(String path, ZkSerializer serializer)
+ {
+ _path = path;
+ _serializer = serializer;
+ }
+
+ boolean matches(String path)
+ {
+ if (_path.equals(path))
+ {
+ return true;
+ }
+ else if (path.length() > _path.length())
+ {
+ if (path.startsWith(_path) && path.charAt(_path.length()) == '/')
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int compareTo(ChainItem o)
+ {
+ return o._path.length() - _path.length();
+ }
+ }
+
+ private static String normalize(String path) {
+ if (!path.startsWith("/")) {
+ // ensure leading slash
+ path = "/" + path;
+ }
+ if (path.endsWith("/")) {
+ // remove trailing slash
+ path = path.substring(0, path.length()-1);
+ }
+ return path;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
new file mode 100644
index 0000000..7e2a48f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
@@ -0,0 +1,88 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+
+
+public class DefaultControllerMessageHandlerFactory implements
+ MessageHandlerFactory
+{
+ private static Logger _logger = Logger.getLogger(DefaultControllerMessageHandlerFactory.class);
+ @Override
+ public MessageHandler createHandler(Message message,
+ NotificationContext context)
+ {
+ String type = message.getMsgType();
+
+ if(!type.equals(getMessageType()))
+ {
+ throw new HelixException("Unexpected msg type for message "+message.getMsgId()
+ +" type:" + message.getMsgType());
+ }
+
+ return new DefaultControllerMessageHandler(message, context);
+ }
+
+ @Override
+ public String getMessageType()
+ {
+ return MessageType.CONTROLLER_MSG.toString();
+ }
+
+ @Override
+ public void reset()
+ {
+
+ }
+
+ public static class DefaultControllerMessageHandler extends MessageHandler
+ {
+ public DefaultControllerMessageHandler(Message message,
+ NotificationContext context)
+ {
+ super(message, context);
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException
+ {
+ String type = _message.getMsgType();
+ HelixTaskResult result = new HelixTaskResult();
+ if(!type.equals(MessageType.CONTROLLER_MSG.toString()))
+ {
+ throw new HelixException("Unexpected msg type for message "+_message.getMsgId()
+ +" type:" + _message.getMsgType());
+ }
+ result.getTaskResultMap().put("ControllerResult", "msg "+ _message.getMsgId() + " from "+_message.getMsgSrc() + " processed");
+ result.setSuccess(true);
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type)
+ {
+ _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
new file mode 100644
index 0000000..307f219
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
@@ -0,0 +1,128 @@
+package org.apache.helix.manager.zk;
+
+
+import java.util.Arrays;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+
+/**
+ * DefaultParticipantErrorMessageHandlerFactory works on controller side.
+ * When the participant detects a critical error, it will send the PARTICIPANT_ERROR_REPORT
+ * Message to the controller, specifying whether it want to disable the instance or
+ * disable the partition. The controller have a chance to do whatever make sense at that point,
+ * and then disable the corresponding partition or the instance. More configs per resource will
+ * be added to customize the controller behavior.
+ * */
+public class DefaultParticipantErrorMessageHandlerFactory implements
+ MessageHandlerFactory
+{
+ public enum ActionOnError
+ {
+ DISABLE_PARTITION, DISABLE_RESOURCE, DISABLE_INSTANCE
+ }
+
+ public static final String ACTIONKEY = "ActionOnError";
+
+ private static Logger _logger = Logger
+ .getLogger(DefaultParticipantErrorMessageHandlerFactory.class);
+ final HelixManager _manager;
+
+ public DefaultParticipantErrorMessageHandlerFactory(HelixManager manager)
+ {
+ _manager = manager;
+ }
+
+ public static class DefaultParticipantErrorMessageHandler extends MessageHandler
+ {
+ final HelixManager _manager;
+ public DefaultParticipantErrorMessageHandler(Message message,
+ NotificationContext context, HelixManager manager)
+ {
+ super(message, context);
+ _manager = manager;
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException
+ {
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ // TODO : consider unify this with StatsAggregationStage.executeAlertActions()
+ try
+ {
+ ActionOnError actionOnError
+ = ActionOnError.valueOf(_message.getRecord().getSimpleField(ACTIONKEY));
+
+ if(actionOnError == ActionOnError.DISABLE_INSTANCE)
+ {
+ _manager.getClusterManagmentTool().enableInstance(_manager.getClusterName(), _message.getMsgSrc(), false);
+ _logger.info("Instance " + _message.getMsgSrc() + " disabled");
+ }
+ else if(actionOnError == ActionOnError.DISABLE_PARTITION)
+ {
+ _manager.getClusterManagmentTool().enablePartition(false, _manager.getClusterName(), _message.getMsgSrc(),
+ _message.getResourceName(), Arrays.asList( _message.getPartitionName()));
+ _logger.info("partition " + _message.getPartitionName() + " disabled");
+ }
+ else if (actionOnError == ActionOnError.DISABLE_RESOURCE)
+ {
+ // NOT IMPLEMENTED, or we can disable all partitions
+ //_manager.getClusterManagmentTool().en(_manager.getClusterName(), _manager.getInstanceName(),
+ // _message.getResourceName(), _message.getPartitionName(), false);
+ _logger.info("resource " + _message.getResourceName() + " disabled");
+ }
+ }
+ catch(Exception e)
+ {
+ _logger.error("", e);
+ result.setSuccess(false);
+ result.setException(e);
+ }
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type)
+ {
+ _logger.error("Message handling pipeline get an exception. MsgId:"
+ + _message.getMsgId(), e);
+ }
+
+ }
+
+ @Override
+ public MessageHandler createHandler(Message message,
+ NotificationContext context)
+ {
+ String type = message.getMsgType();
+
+ if(!type.equals(getMessageType()))
+ {
+ throw new HelixException("Unexpected msg type for message "+message.getMsgId()
+ +" type:" + message.getMsgType());
+ }
+
+ return new DefaultParticipantErrorMessageHandler(message, context, _manager);
+ }
+
+ @Override
+ public String getMessageType()
+ {
+ return Message.MessageType.PARTICIPANT_ERROR_REPORT.toString();
+ }
+
+ @Override
+ public void reset()
+ {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
new file mode 100644
index 0000000..e964f44
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -0,0 +1,277 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StatusUpdate;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.util.StatusUpdateUtil;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+/*
+ * TODO: The current implementation is temporary for backup handler testing only and it does not
+ * do any throttling.
+ *
+ */
+public class DefaultSchedulerMessageHandlerFactory implements
+ MessageHandlerFactory
+{
+ public static final String WAIT_ALL = "WAIT_ALL";
+ public static final String SCHEDULER_MSG_ID = "SchedulerMessageId";
+ public static class SchedulerAsyncCallback extends AsyncCallback
+ {
+ StatusUpdateUtil _statusUpdateUtil = new StatusUpdateUtil();
+ Message _originalMessage;
+ HelixManager _manager;
+ final Map<String, Map<String, String>> _resultSummaryMap = new ConcurrentHashMap<String, Map<String, String>>();
+
+ public SchedulerAsyncCallback(Message originalMessage, HelixManager manager)
+ {
+ _originalMessage = originalMessage;
+ _manager = manager;
+ }
+
+ @Override
+ public void onTimeOut()
+ {
+ _logger.info("Scheduler msg timeout " + _originalMessage.getMsgId()
+ + " timout with " + _timeout + " Ms");
+
+ _statusUpdateUtil.logError(_originalMessage,
+ SchedulerAsyncCallback.class, "Task timeout",
+ _manager.getHelixDataAccessor());
+ addSummary(_resultSummaryMap, _originalMessage, _manager, true);
+ }
+
+ @Override
+ public void onReplyMessage(Message message)
+ {
+ _logger.info("Update for scheduler msg " + _originalMessage.getMsgId()
+ + " Message " + message.getMsgSrc() + " id "
+ + message.getCorrelationId() + " completed");
+ String key = "MessageResult " + message.getMsgSrc() + " "
+ + UUID.randomUUID();
+ _resultSummaryMap.put(key, message.getResultMap());
+
+ if (this.isDone())
+ {
+ _logger.info("Scheduler msg " + _originalMessage.getMsgId()
+ + " completed");
+ _statusUpdateUtil.logInfo(_originalMessage,
+ SchedulerAsyncCallback.class, "Scheduler task completed",
+ _manager.getHelixDataAccessor());
+ addSummary(_resultSummaryMap, _originalMessage, _manager, false);
+ }
+ }
+
+ private void addSummary(Map<String, Map<String, String>> _resultSummaryMap,
+ Message originalMessage, HelixManager manager, boolean timeOut)
+ {
+ Map<String, String> summary = new TreeMap<String, String>();
+ summary.put("TotalMessages:", "" + _resultSummaryMap.size());
+ summary.put("Timeout", "" + timeOut);
+ _resultSummaryMap.put("Summary", summary);
+
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+ ZNRecord statusUpdate = accessor.getProperty(
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+ originalMessage.getMsgId())).getRecord();
+
+ statusUpdate.getMapFields().putAll(_resultSummaryMap);
+ accessor.setProperty(keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), originalMessage.getMsgId()),
+ new StatusUpdate(statusUpdate));
+
+ }
+ }
+
+ private static Logger _logger = Logger
+ .getLogger(DefaultSchedulerMessageHandlerFactory.class);
+ HelixManager _manager;
+
+ public DefaultSchedulerMessageHandlerFactory(HelixManager manager)
+ {
+ _manager = manager;
+ }
+
+ @Override
+ public MessageHandler createHandler(Message message,
+ NotificationContext context)
+ {
+ String type = message.getMsgType();
+
+ if (!type.equals(getMessageType()))
+ {
+ throw new HelixException("Unexpected msg type for message "
+ + message.getMsgId() + " type:" + message.getMsgType());
+ }
+
+ return new DefaultSchedulerMessageHandler(message, context, _manager);
+ }
+
+ @Override
+ public String getMessageType()
+ {
+ return MessageType.SCHEDULER_MSG.toString();
+ }
+
+ @Override
+ public void reset()
+ {
+ }
+
+ public static class DefaultSchedulerMessageHandler extends MessageHandler
+ {
+ HelixManager _manager;
+
+ public DefaultSchedulerMessageHandler(Message message,
+ NotificationContext context, HelixManager manager)
+ {
+ super(message, context);
+ _manager = manager;
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException
+ {
+ String type = _message.getMsgType();
+ HelixTaskResult result = new HelixTaskResult();
+ if (!type.equals(MessageType.SCHEDULER_MSG.toString()))
+ {
+ throw new HelixException("Unexpected msg type for message "
+ + _message.getMsgId() + " type:" + _message.getMsgType());
+ }
+ // Parse timeout value
+ int timeOut = -1;
+ if (_message.getRecord().getSimpleFields().containsKey("TIMEOUT"))
+ {
+ try
+ {
+ timeOut = Integer.parseInt(_message.getRecord().getSimpleFields()
+ .get("TIMEOUT"));
+ } catch (Exception e)
+ {
+ }
+ }
+
+ // Parse the message template
+ ZNRecord record = new ZNRecord("templateMessage");
+ record.getSimpleFields().putAll(
+ _message.getRecord().getMapField("MessageTemplate"));
+ Message messageTemplate = new Message(record);
+
+ // Parse the criteria
+ StringReader sr = new StringReader(_message.getRecord().getSimpleField(
+ "Criteria"));
+ ObjectMapper mapper = new ObjectMapper();
+ Criteria recipientCriteria;
+ try
+ {
+ recipientCriteria = mapper.readValue(sr, Criteria.class);
+ } catch (Exception e)
+ {
+ _logger.error("", e);
+ result.setException(e);
+ result.setSuccess(false);
+ return result;
+ }
+ _logger.info("Scheduler sending message, criteria:" + recipientCriteria);
+
+ boolean waitAll = false;
+ if(_message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL) !=null)
+ {
+ try
+ {
+ waitAll = Boolean.parseBoolean(_message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL));
+ }
+ catch(Exception e)
+ {
+ _logger.warn("",e);
+ }
+ }
+ // Send all messages.
+
+ int nMsgsSent = 0;
+ SchedulerAsyncCallback callback = new SchedulerAsyncCallback(_message, _manager);
+ if(waitAll)
+ {
+ nMsgsSent = _manager.getMessagingService().sendAndWait(recipientCriteria,
+ messageTemplate,
+ callback,
+ timeOut);
+ }
+ else
+ {
+ nMsgsSent = _manager.getMessagingService().send(recipientCriteria,
+ messageTemplate,
+ callback,
+ timeOut);
+ }
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ // Record the number of messages sent into status updates
+ Map<String, String> sendSummary = new HashMap<String, String>();
+ sendSummary.put("MessageCount", "" + nMsgsSent);
+
+ ZNRecord statusUpdate = accessor.getProperty(
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+ _message.getMsgId())).getRecord();
+
+ statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
+
+ accessor.setProperty(keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), _message.getMsgId()),
+ new StatusUpdate(statusUpdate));
+
+ result.getTaskResultMap().put(
+ "ControllerResult",
+ "msg " + _message.getMsgId() + " from " + _message.getMsgSrc()
+ + " processed");
+ result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMsgId());
+ result.setSuccess(true);
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type)
+ {
+ _logger.error("Message handling pipeline get an exception. MsgId:"
+ + _message.getMsgId(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
new file mode 100644
index 0000000..d1e9402
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
@@ -0,0 +1,174 @@
+package org.apache.helix.manager.zk;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+public class HelixGroupCommit<T>
+{
+ private static Logger LOG = Logger.getLogger(HelixGroupCommit.class);
+
+ private static class Queue<T>
+ {
+ final AtomicReference<Thread> _running = new AtomicReference<Thread>();
+ final ConcurrentLinkedQueue<Entry<T>> _pending = new ConcurrentLinkedQueue<Entry<T>>();
+ }
+
+ private static class Entry<T>
+ {
+ final String _key;
+ final DataUpdater<T> _updater;
+ AtomicBoolean _sent = new AtomicBoolean(false);
+
+ Entry(String key, DataUpdater<T> updater)
+ {
+ _key = key;
+ _updater = updater;
+ }
+ }
+
+ private final Queue<T>[] _queues = new Queue[100];
+
+ public HelixGroupCommit()
+ {
+ // Don't use Arrays.fill();
+ for (int i = 0; i < _queues.length; ++i)
+ {
+ _queues[i] = new Queue<T>();
+ }
+ }
+
+ private Queue<T> getQueue(String key)
+ {
+ return _queues[(key.hashCode() & Integer.MAX_VALUE) % _queues.length];
+ }
+
+ public boolean commit(ZkBaseDataAccessor<T> accessor,
+ int options,
+ String key,
+ DataUpdater<T> updater)
+ {
+ Queue<T> queue = getQueue(key);
+ Entry<T> entry = new Entry<T>(key, updater);
+
+ queue._pending.add(entry);
+
+ while (!entry._sent.get())
+ {
+ if (queue._running.compareAndSet(null, Thread.currentThread()))
+ {
+ ArrayList<Entry<T>> processed = new ArrayList<Entry<T>>();
+ try
+ {
+ Entry<T> first = queue._pending.peek();
+ if (first == null)
+ {
+ return true;
+ }
+
+ // remove from queue
+ // Entry first = queue._pending.poll();
+ // processed.add(first);
+
+ String mergedKey = first._key;
+
+ boolean retry;
+ do
+ {
+ retry = false;
+
+ try
+ {
+ T merged = null;
+
+ Stat readStat = new Stat();
+ try
+ {
+ // accessor will fallback to zk if not found in cache
+ merged = accessor.get(mergedKey, readStat, options);
+ }
+ catch (ZkNoNodeException e)
+ {
+ // OK.
+ }
+
+ // updater should handler merged == null
+ // merged = first._updater.update(merged);
+
+ // iterate over processed if we are retrying
+ Iterator<Entry<T>> it = processed.iterator();
+ while (it.hasNext())
+ {
+ Entry<T> ent = it.next();
+ if (!ent._key.equals(mergedKey))
+ {
+ continue;
+ }
+ merged = ent._updater.update(merged);
+ // System.out.println("After merging:" + merged);
+ }
+
+ // iterate over queue._pending for newly coming requests
+ it = queue._pending.iterator();
+ while (it.hasNext())
+ {
+ Entry<T> ent = it.next();
+ if (!ent._key.equals(mergedKey))
+ {
+ continue;
+ }
+ processed.add(ent);
+ merged = ent._updater.update(merged);
+ // System.out.println("After merging:" + merged);
+ it.remove();
+ }
+ // System.out.println("size:"+ processed.size());
+ accessor.set(mergedKey, merged, null, null, readStat.getVersion(), options);
+ }
+ catch (ZkBadVersionException e)
+ {
+ retry = true;
+ }
+ }
+ while (retry);
+ }
+ finally
+ {
+ queue._running.set(null);
+ for (Entry<T> e : processed)
+ {
+ synchronized (e)
+ {
+ e._sent.set(true);
+ e.notify();
+ }
+ }
+ }
+ }
+ else
+ {
+ synchronized (entry)
+ {
+ try
+ {
+ entry.wait(10);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/PathBasedZkSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/PathBasedZkSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/PathBasedZkSerializer.java
new file mode 100644
index 0000000..5a0aec7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/PathBasedZkSerializer.java
@@ -0,0 +1,28 @@
+package org.apache.helix.manager.zk;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+
+public interface PathBasedZkSerializer
+{
+
+ /**
+ * Serialize data differently according to different paths
+ *
+ * @param data
+ * @param path
+ * @return
+ * @throws ZkMarshallingError
+ */
+ public byte[] serialize(Object data, String path) throws ZkMarshallingError;
+
+ /**
+ * Deserialize data differently according to different paths
+ *
+ * @param bytes
+ * @param path
+ * @return
+ * @throws ZkMarshallingError
+ */
+ public Object deserialize(byte[] bytes, String path) throws ZkMarshallingError;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/WriteThroughCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/WriteThroughCache.java b/helix-core/src/main/java/org/apache/helix/manager/zk/WriteThroughCache.java
new file mode 100644
index 0000000..9b7974e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/WriteThroughCache.java
@@ -0,0 +1,114 @@
+package org.apache.helix.manager.zk;
+
+import java.io.File;
+import java.util.List;
+
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.store.zk.ZNode;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+
+public class WriteThroughCache<T> extends Cache<T>
+{
+ private static Logger LOG = Logger.getLogger(WriteThroughCache.class);
+
+ final BaseDataAccessor<T> _accessor;
+
+ public WriteThroughCache(BaseDataAccessor<T> accessor, List<String> paths)
+ {
+ super();
+ _accessor = accessor;
+
+ // init cache
+ if (paths != null && !paths.isEmpty())
+ {
+ for (String path : paths)
+ {
+ updateRecursive(path);
+ }
+ }
+ }
+
+ @Override
+ public void update(String path, T data, Stat stat)
+ {
+ String parentPath = new File(path).getParent();
+ String childName = new File(path).getName();
+ addToParentChildSet(parentPath, childName);
+
+ ZNode znode = _cache.get(path);
+ if (znode == null)
+ {
+ _cache.put(path, new ZNode(path, data, stat));
+ }
+ else
+ {
+ znode.setData(data);
+ znode.setStat(stat);
+ }
+ }
+
+ @Override
+ public void updateRecursive(String path)
+ {
+ if (path == null)
+ {
+ return;
+ }
+
+ try
+ {
+ _lock.writeLock().lock();
+
+// // update parent's childSet
+// String parentPath = new File(path).getParent();
+// String name = new File(path).getName();
+// addToParentChildSet(parentPath, name);
+
+ // update this node
+ Stat stat = new Stat();
+ T readData = _accessor.get(path, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+
+ update(path, readData, stat);
+
+// ZNode znode = _cache.get(path);
+// if (znode == null)
+// {
+// znode = new ZNode(path, readData, stat);
+// _cache.put(path, znode);
+// }
+// else
+// {
+// znode.setData(readData);
+// znode.setStat(stat);
+// }
+
+ // recursively update children nodes if not exists
+ ZNode znode = _cache.get(path);
+ List<String> childNames = _accessor.getChildNames(path, 0);
+ if (childNames != null && childNames.size() > 0)
+ {
+ for (String childName : childNames)
+ {
+ String childPath = path + "/" + childName;
+ if (!znode.hasChild(childName))
+ {
+ znode.addChild(childName);
+ updateRecursive(childPath);
+ }
+ }
+ }
+ }
+ catch (ZkNoNodeException e)
+ {
+ // OK. someone delete znode while we are updating cache
+ }
+ finally
+ {
+ _lock.writeLock().unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZKDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKDataAccessor.java
new file mode 100644
index 0000000..c4cdf78
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKDataAccessor.java
@@ -0,0 +1,329 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+
+@Deprecated
+public class ZKDataAccessor implements DataAccessor
+{
+ private static Logger logger = Logger.getLogger(ZKDataAccessor.class);
+
+ protected final String _clusterName;
+ protected final ZkClient _zkClient;
+
+ /**
+ * If a PropertyType has children (e.g. CONFIGS), then the parent path is the
+ * first key and child path is the second key; If a PropertyType has no child
+ * (e.g. LEADER), then no cache
+ */
+ private final Map<String, Map<String, ZNRecord>> _cache = new ConcurrentHashMap<String, Map<String, ZNRecord>>();
+
+ public ZKDataAccessor(String clusterName, ZkClient zkClient)
+ {
+ _clusterName = clusterName;
+ _zkClient = zkClient;
+ }
+
+ @Override
+ public boolean setProperty(PropertyType type, HelixProperty value, String... keys)
+ {
+ if (!value.isValid())
+ {
+ throw new HelixException("The ZNRecord for " + type + " is not valid.");
+ }
+ return setProperty(type, value.getRecord(), keys);
+ }
+
+ @Override
+ public boolean setProperty(PropertyType type, ZNRecord value, String... keys)
+ {
+ String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+
+ String parent = new File(path).getParent();
+ if (!_zkClient.exists(parent))
+ {
+ _zkClient.createPersistent(parent, true);
+ }
+
+ if (_zkClient.exists(path))
+ {
+ if (type.isCreateOnlyIfAbsent())
+ {
+ return false;
+ } else
+ {
+ ZKUtil.createOrUpdate(_zkClient, path, value, type.isPersistent(), false);
+ }
+ } else
+ {
+ try
+ {
+ if (type.isPersistent())
+ {
+ _zkClient.createPersistent(path, value);
+ } else
+ {
+ _zkClient.createEphemeral(path, value);
+ }
+ } catch (Exception e)
+ {
+ logger.warn("Exception while creating path:" + path
+ + " Most likely due to race condition(Ignorable).", e);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean updateProperty(PropertyType type, HelixProperty value, String... keys)
+ {
+ return updateProperty(type, value.getRecord(), keys);
+ }
+
+ @Override
+ public boolean updateProperty(PropertyType type, ZNRecord value, String... keys)
+ {
+ String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+ if (type.isUpdateOnlyOnExists())
+ {
+ ZKUtil.updateIfExists(_zkClient, path, value, type.isMergeOnUpdate());
+ } else
+ {
+ String parent = new File(path).getParent();
+
+ if (!_zkClient.exists(parent))
+ {
+ _zkClient.createPersistent(parent, true);
+ }
+
+ if (!type.usePropertyTransferServer())
+ {
+ ZKUtil.createOrUpdate(_zkClient, path, value, type.isPersistent(), type.isMergeOnUpdate());
+ } else
+ {
+ ZKUtil.asyncCreateOrUpdate(_zkClient, path, value, type.isPersistent(), type.isMergeOnUpdate());
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public <T extends HelixProperty>
+ T getProperty(Class<T> clazz, PropertyType type, String... keys)
+ {
+ return HelixProperty.convertToTypedInstance(clazz, getProperty(type, keys));
+ }
+
+ @Override
+ public ZNRecord getProperty(PropertyType type, String... keys)
+ {
+ String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+
+ if (!type.isCached())
+ {
+ return _zkClient.readData(path, true);
+ } else
+ {
+ int len = keys.length;
+ if (len == 0)
+ {
+ return _zkClient.readData(path, true);
+ } else
+ {
+ String[] subkeys = Arrays.copyOfRange(keys, 0, len - 1);
+ Map<String, ZNRecord> newChilds = refreshChildValuesCache(type, subkeys);
+ return newChilds.get(keys[len - 1]);
+ }
+ }
+
+ }
+
+ @Override
+ public boolean removeProperty(PropertyType type, String... keys)
+ {
+ String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+ return _zkClient.delete(path);
+ }
+
+ @Override
+ public List<String> getChildNames(PropertyType type, String... keys)
+ {
+ String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+ if (_zkClient.exists(path))
+ {
+ return _zkClient.getChildren(path);
+ } else
+ {
+ return Collections.emptyList();
+ }
+ }
+
+ @Override
+ public <T extends HelixProperty>
+ List<T> getChildValues(Class<T> clazz, PropertyType type, String... keys)
+ {
+ List<ZNRecord> newChilds = getChildValues(type, keys);
+ if (newChilds.size() > 0)
+ {
+ return HelixProperty.convertToTypedList(clazz, newChilds);
+ }
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<ZNRecord> getChildValues(PropertyType type, String... keys)
+
+ {
+ String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+ // if (path == null)
+ // {
+ // System.err.println("path is null");
+ // }
+
+ if (_zkClient.exists(path))
+ {
+ if (!type.isCached())
+ {
+ return ZKUtil.getChildren(_zkClient, path);
+ } else
+ {
+ Map<String, ZNRecord> newChilds = refreshChildValuesCache(type, keys);
+ return new ArrayList<ZNRecord>(newChilds.values());
+ }
+ }
+
+ return Collections.emptyList();
+ }
+
+ public void reset()
+ {
+ _cache.clear();
+ }
+
+ private Map<String, ZNRecord> refreshChildValuesCache(PropertyType type, String... keys)
+ {
+ if (!type.isCached())
+ {
+ throw new IllegalArgumentException("Type:" + type + " is NOT cached");
+ }
+
+ String path = PropertyPathConfig.getPath(type, _clusterName, keys);
+
+ Map<String, ZNRecord> newChilds = refreshChildValues(path, _cache.get(path));
+ if (newChilds != null && newChilds.size() > 0)
+ {
+ _cache.put(path, newChilds);
+ return newChilds;
+ } else
+ {
+ _cache.remove(path);
+ return Collections.emptyMap();
+ }
+ }
+
+ /**
+ * Read a zookeeper node only if it's data has been changed since last read
+ *
+ * @param parentPath
+ * @param oldChildRecords
+ * @return newChildRecords
+ */
+ private Map<String, ZNRecord> refreshChildValues(String parentPath,
+ Map<String, ZNRecord> oldChildRecords)
+ {
+ List<String> childs = _zkClient.getChildren(parentPath);
+ if (childs == null || childs.size() == 0)
+ {
+ return Collections.emptyMap();
+ }
+
+ Stat newStat = new Stat();
+ Map<String, ZNRecord> newChildRecords = new HashMap<String, ZNRecord>();
+ for (String child : childs)
+ {
+ String childPath = parentPath + "/" + child;
+
+ // assume record.id should be the last part of zookeeper path
+ if (oldChildRecords == null || !oldChildRecords.containsKey(child))
+ {
+ ZNRecord record = _zkClient.readDataAndStat(childPath, newStat, true);
+ if (record != null)
+ {
+ record.setVersion(newStat.getVersion());
+ newChildRecords.put(child, record);
+ }
+ } else
+ {
+ ZNRecord oldChild = oldChildRecords.get(child);
+
+ int oldVersion = oldChild.getVersion();
+ long oldCtime = oldChild.getCreationTime();
+ newStat = _zkClient.getStat(childPath);
+ if (newStat != null)
+ {
+ // System.out.print(child + " oldStat:" + oldStat);
+ // System.out.print(child + " newStat:" + newStat);
+
+ if (oldCtime < newStat.getCtime() ||
+ oldVersion < newStat.getVersion())
+ {
+ ZNRecord record = _zkClient.readDataAndStat(childPath, newStat, true);
+ if (record != null)
+ {
+ record.setVersion(newStat.getVersion());
+ record.setCreationTime(newStat.getCtime());
+ record.setModifiedTime(newStat.getMtime());
+ newChildRecords.put(child, record);
+ }
+ } else
+ {
+ newChildRecords.put(child, oldChild);
+ }
+ }
+ }
+ }
+
+ return Collections.unmodifiableMap(newChildRecords);
+ }
+
+ @Override
+ public <T extends HelixProperty>
+ Map<String, T> getChildValuesMap(Class<T> clazz, PropertyType type, String... keys)
+ {
+ List<T> list = getChildValues(clazz, type, keys);
+ return Collections.unmodifiableMap(HelixProperty.convertListToMap(list));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/ZKExceptionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKExceptionHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKExceptionHandler.java
new file mode 100644
index 0000000..340c3e5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKExceptionHandler.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager.zk;
+
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.log4j.Logger;
+
+public class ZKExceptionHandler
+{
+ private static ZKExceptionHandler instance = new ZKExceptionHandler();
+ private static Logger logger = Logger.getLogger(ZKExceptionHandler.class);
+ private ZKExceptionHandler()
+ {
+
+ }
+
+ void handle(Exception e)
+ {
+ logger.error(Thread.currentThread().getName() + ". isThreadInterruped: " + Thread.currentThread().isInterrupted());
+
+ if (e instanceof ZkInterruptedException)
+ {
+ logger.error("zk connection is interrupted.", e);
+ }
+ else
+ {
+ logger.error(e.getMessage(), e);
+ // e.printStackTrace();
+ }
+ }
+
+ public static ZKExceptionHandler getInstance()
+ {
+ return instance;
+ }
+}