You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC
[17/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
new file mode 100644
index 0000000..702270e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging.handling;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
+import org.apache.helix.model.Message;
+
+/**
+ * Provides the base class for all message handlers.
+ *
+ */
+public abstract class MessageHandler
+{
+ public enum ErrorType
+ {
+ FRAMEWORK, INTERNAL
+ }
+
+ public enum ErrorCode
+ {
+ ERROR, CANCEL, TIMEOUT
+ }
+ /**
+ * The message to be handled
+ */
+ protected final Message _message;
+
+ /**
+ * The context for handling the message. The cluster manager interface can be
+ * accessed from NotificationContext
+ */
+ protected final NotificationContext _notificationContext;
+
+ /**
+ * The constructor. The message and notification context must be provided via
+ * creation.
+ */
+ public MessageHandler(Message message, NotificationContext context)
+ {
+ _message = message;
+ _notificationContext = context;
+ }
+
+ /**
+ * Message handling routine. The function is called in a thread pool task in
+ * CMTaskExecutor
+ *
+ * @return returns the CMTaskResult which contains info about the message processing.
+ */
+ public abstract HelixTaskResult handleMessage() throws InterruptedException;
+
+ /**
+ * Callback when error happens in the message handling pipeline.
+ * @param type TODO
+ * @param retryCountLeft - The number of retries that the framework will
+ * continue trying to handle the message
+ * @param ErrorType - denote if the exception happens in framework or happens in the
+ * customer's code
+ */
+ public abstract void onError(Exception e, ErrorCode code, ErrorType type);
+
+ /**
+ * Callback when the framework is about to interrupt the message handler
+ */
+ public void onTimeout()
+ {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
new file mode 100644
index 0000000..473e618
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging.handling;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+
+public interface MessageHandlerFactory
+{
+ public MessageHandler createHandler(Message message, NotificationContext context);
+
+ public String getMessageType();
+
+ public void reset();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/package-info.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/package-info.java
new file mode 100644
index 0000000..0a39f5a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix default implementation for handling state transition messages and controller messages
+ *
+ */
+package org.apache.helix.messaging.handling;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/package-info.java b/helix-core/src/main/java/org/apache/helix/messaging/package-info.java
new file mode 100644
index 0000000..97e0397
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix message handling classes
+ *
+ */
+package org.apache.helix.messaging;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/AlertHistory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/AlertHistory.java b/helix-core/src/main/java/org/apache/helix/model/AlertHistory.java
new file mode 100644
index 0000000..e62dee1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/AlertHistory.java
@@ -0,0 +1,22 @@
+package org.apache.helix.model;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+public class AlertHistory extends HelixProperty
+{
+
+ public AlertHistory(ZNRecord record)
+ {
+ super(record);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public boolean isValid()
+ {
+ // TODO Auto-generated method stub
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/AlertStatus.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/AlertStatus.java b/helix-core/src/main/java/org/apache/helix/model/AlertStatus.java
new file mode 100644
index 0000000..aef339e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/AlertStatus.java
@@ -0,0 +1,86 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import java.util.Map;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+
+public class AlertStatus extends HelixProperty
+{
+
+ public final static String nodeName = "AlertStatus";
+
+ public enum AlertsProperty
+ {
+ SESSION_ID, FIELDS
+ }
+
+ public AlertStatus(String id)
+ {
+ super(id);
+ }
+
+ public AlertStatus(ZNRecord record)
+ {
+ // _record = record;
+ super(record);
+
+ }
+
+ /*
+ * public Alerts(ZNRecord record, Stat stat) { super(record, stat); }
+ */
+
+ public void setSessionId(String sessionId)
+ {
+ _record.setSimpleField(AlertsProperty.SESSION_ID.toString(), sessionId);
+ }
+
+ public String getSessionId()
+ {
+ return _record.getSimpleField(AlertsProperty.SESSION_ID.toString());
+ }
+
+ public String getInstanceName()
+ {
+ return _record.getId();
+ }
+
+ /*
+ * public String getVersion() { return
+ * _record.getSimpleField(AlertsProperty.CLUSTER_MANAGER_VERSION.toString()); }
+ */
+
+ public Map<String, Map<String, String>> getMapFields()
+ {
+ return _record.getMapFields();
+ }
+
+ public Map<String, String> getStatFields(String statName)
+ {
+ return _record.getMapField(statName);
+ }
+
+ @Override
+ public boolean isValid()
+ {
+ // TODO Auto-generated method stub
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/Alerts.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Alerts.java b/helix-core/src/main/java/org/apache/helix/model/Alerts.java
new file mode 100644
index 0000000..9b4e1d2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/Alerts.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import java.util.Map;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+
+public class Alerts extends HelixProperty
+{
+
+ // private final ZNRecord _record;
+
+ public final static String nodeName = "Alerts";
+
+ public enum AlertsProperty
+ {
+ SESSION_ID, FIELDS
+ }
+
+ // private final ZNRecord _record;
+
+ public Alerts(String id)
+ {
+ super(id);
+ }
+
+ public Alerts(ZNRecord record)
+ {
+ // _record = record;
+ super(record);
+
+ }
+
+ /*
+ * public Alerts(ZNRecord record, Stat stat) { super(record, stat); }
+ */
+
+ public void setSessionId(String sessionId)
+ {
+ _record.setSimpleField(AlertsProperty.SESSION_ID.toString(), sessionId);
+ }
+
+ public String getSessionId()
+ {
+ return _record.getSimpleField(AlertsProperty.SESSION_ID.toString());
+ }
+
+ public String getInstanceName()
+ {
+ return _record.getId();
+ }
+
+ /*
+ * public String getVersion() { return
+ * _record.getSimpleField(AlertsProperty.CLUSTER_MANAGER_VERSION.toString());
+ * }
+ */
+
+ public Map<String, Map<String, String>> getMapFields()
+ {
+ return _record.getMapFields();
+ }
+
+ public Map<String, String> getStatFields(String statName)
+ {
+ return _record.getMapField(statName);
+ }
+
+ @Override
+ public boolean isValid()
+ {
+ // TODO Auto-generated method stub
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
new file mode 100644
index 0000000..7ff0818
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
@@ -0,0 +1,217 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+
+
+public class ClusterConstraints extends HelixProperty
+{
+ private static Logger LOG = Logger.getLogger(ClusterConstraints.class);
+
+ public enum ConstraintAttribute
+ {
+ STATE, MESSAGE_TYPE, TRANSITION, RESOURCE, INSTANCE, CONSTRAINT_VALUE
+ }
+
+ public enum ConstraintValue
+ {
+ ANY
+ }
+
+ public enum ConstraintType
+ {
+ STATE_CONSTRAINT, MESSAGE_CONSTRAINT
+ }
+
+ static public class ConstraintItem
+ {
+ // attributes e.g. {STATE:MASTER, RESOURCEG:TestDB, INSTANCE:localhost_12918}
+ final Map<ConstraintAttribute, String> _attributes;
+ String _constraintValue;
+
+ public ConstraintItem(Map<String, String> attributes)
+ {
+ _attributes = new TreeMap<ConstraintAttribute, String>();
+ _constraintValue = null;
+
+ if (attributes != null)
+ {
+ for (String key : attributes.keySet())
+ {
+ try
+ {
+ ConstraintAttribute attr = ConstraintAttribute.valueOf(key);
+ if (attr == ConstraintAttribute.CONSTRAINT_VALUE)
+ {
+ String value = attributes.get(key);
+ try
+ {
+ ConstraintValue.valueOf(value);
+ } catch (Exception e)
+ {
+ try
+ {
+ Integer.parseInt(value);
+ }
+ catch (NumberFormatException ne)
+ {
+ LOG.error("Invalid constraintValue " + key + ":" + value);
+ continue;
+ }
+ }
+ _constraintValue = attributes.get(key);
+ } else
+ {
+ _attributes.put(attr, attributes.get(key));
+ }
+ } catch (Exception e)
+ {
+ LOG.error("Invalid constraintAttribute " + key + ":" + attributes.get(key));
+ continue;
+ }
+ }
+ }
+ }
+
+ public boolean match(Map<ConstraintAttribute, String> attributes)
+ {
+ for (ConstraintAttribute key : _attributes.keySet())
+ {
+ if (!attributes.containsKey(key))
+ {
+ return false;
+ }
+
+ if (!attributes.get(key).matches(_attributes.get(key)))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // filter out attributes that are not specified by this constraint
+ public Map<ConstraintAttribute, String> filter(Map<ConstraintAttribute, String> attributes)
+ {
+ Map<ConstraintAttribute, String> ret = new HashMap<ConstraintAttribute, String>();
+ for (ConstraintAttribute key : _attributes.keySet())
+ {
+ // TODO: what if attributes.get(key)==null? might need match function at constrait level
+ ret.put(key, attributes.get(key));
+ }
+
+ return ret;
+ }
+
+ public String getConstraintValue()
+ {
+ return _constraintValue;
+ }
+
+ public Map<ConstraintAttribute, String> getAttributes()
+ {
+ return _attributes;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+ sb.append(_attributes + ":" + _constraintValue);
+ return sb.toString();
+ }
+ }
+
+ private final List<ConstraintItem> _constraints = new ArrayList<ConstraintItem>();
+
+ public ClusterConstraints(ZNRecord record)
+ {
+ super(record);
+
+ for (String key : _record.getMapFields().keySet())
+ {
+ ConstraintItem item = new ConstraintItem(_record.getMapField(key));
+ if (item.getAttributes().size() > 0 && item.getConstraintValue() != null)
+ {
+ _constraints.add(item);
+ } else
+ {
+ LOG.error("Invalid constraint " + key + ":" + _record.getMapField(key));
+ }
+ }
+ }
+
+ /**
+ * return a set of constraints that match the attribute pairs
+ */
+ public Set<ConstraintItem> match(Map<ConstraintAttribute, String> attributes)
+ {
+ Set<ConstraintItem> matches = new HashSet<ConstraintItem>();
+ for (ConstraintItem item : _constraints)
+ {
+ if (item.match(attributes))
+ {
+ matches.add(item);
+ }
+ }
+ return matches;
+ }
+
+ // convert a message to constraint attribute pairs
+ public static Map<ConstraintAttribute, String> toConstraintAttributes(Message msg)
+ {
+ Map<ConstraintAttribute, String> attributes = new TreeMap<ConstraintAttribute, String>();
+ String msgType = msg.getMsgType();
+ attributes.put(ConstraintAttribute.MESSAGE_TYPE, msgType);
+ if (MessageType.STATE_TRANSITION.toString().equals(msgType))
+ {
+ if (msg.getFromState() != null && msg.getToState() != null)
+ {
+ attributes.put(ConstraintAttribute.TRANSITION,
+ msg.getFromState() + "-" + msg.getToState());
+ }
+ if (msg.getResourceName() != null)
+ {
+ attributes.put(ConstraintAttribute.RESOURCE, msg.getResourceName());
+ }
+ if (msg.getTgtName() != null)
+ {
+ attributes.put(ConstraintAttribute.INSTANCE, msg.getTgtName());
+ }
+ }
+ return attributes;
+ }
+
+ @Override
+ public boolean isValid()
+ {
+ // TODO Auto-generated method stub
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
new file mode 100644
index 0000000..3250dfd
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
@@ -0,0 +1,169 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Current states of partitions in a resource
+ */
+public class CurrentState extends HelixProperty
+{
+ private static Logger LOG = Logger.getLogger(CurrentState.class);
+
+ public enum CurrentStateProperty
+ {
+ SESSION_ID, CURRENT_STATE, STATE_MODEL_DEF, STATE_MODEL_FACTORY_NAME, RESOURCE // ,
+ // BUCKET_SIZE
+ }
+
+ public CurrentState(String resourceName)
+ {
+ super(resourceName);
+ }
+
+ public CurrentState(ZNRecord record)
+ {
+ super(record);
+ }
+
+ public String getResourceName()
+ {
+ return _record.getId();
+ }
+
+ public Map<String, String> getPartitionStateMap()
+ {
+ Map<String, String> map = new HashMap<String, String>();
+ Map<String, Map<String, String>> mapFields = _record.getMapFields();
+ for (String partitionName : mapFields.keySet())
+ {
+ Map<String, String> tempMap = mapFields.get(partitionName);
+ if (tempMap != null)
+ {
+ map.put(partitionName, tempMap.get(CurrentStateProperty.CURRENT_STATE.toString()));
+ }
+ }
+ return map;
+ }
+
+ public String getSessionId()
+ {
+ return _record.getSimpleField(CurrentStateProperty.SESSION_ID.toString());
+ }
+
+ public void setSessionId(String sessionId)
+ {
+ _record.setSimpleField(CurrentStateProperty.SESSION_ID.toString(), sessionId);
+ }
+
+ public String getState(String partitionName)
+ {
+ Map<String, Map<String, String>> mapFields = _record.getMapFields();
+ Map<String, String> mapField = mapFields.get(partitionName);
+ if (mapField != null)
+ {
+ return mapField.get(CurrentStateProperty.CURRENT_STATE.toString());
+ }
+ return null;
+ }
+
+ public void setStateModelDefRef(String stateModelName)
+ {
+ _record.setSimpleField(CurrentStateProperty.STATE_MODEL_DEF.toString(),
+ stateModelName);
+ }
+
+ public String getStateModelDefRef()
+ {
+ return _record.getSimpleField(CurrentStateProperty.STATE_MODEL_DEF.toString());
+ }
+
+ public void setState(String partitionName, String state)
+ {
+ Map<String, Map<String, String>> mapFields = _record.getMapFields();
+ if (mapFields.get(partitionName) == null)
+ {
+ mapFields.put(partitionName, new TreeMap<String, String>());
+ }
+ mapFields.get(partitionName)
+ .put(CurrentStateProperty.CURRENT_STATE.toString(), state);
+ }
+
+ public void setStateModelFactoryName(String factoryName)
+ {
+ _record.setSimpleField(CurrentStateProperty.STATE_MODEL_FACTORY_NAME.toString(),
+ factoryName);
+ }
+
+ public String getStateModelFactoryName()
+ {
+ return _record.getSimpleField(CurrentStateProperty.STATE_MODEL_FACTORY_NAME.toString());
+ }
+
+// @Override
+// public int getBucketSize()
+// {
+// String bucketSizeStr =
+// _record.getSimpleField(CurrentStateProperty.BUCKET_SIZE.toString());
+// int bucketSize = 0;
+// if (bucketSizeStr != null)
+// {
+// try
+// {
+// bucketSize = Integer.parseInt(bucketSizeStr);
+// }
+// catch (NumberFormatException e)
+// {
+// // OK
+// }
+// }
+// return bucketSize;
+// }
+//
+// @Override
+// public void setBucketSize(int bucketSize)
+// {
+// if (bucketSize > 0)
+// {
+// _record.setSimpleField(CurrentStateProperty.BUCKET_SIZE.toString(), "" + bucketSize);
+// }
+// }
+
+ @Override
+ public boolean isValid()
+ {
+ if (getStateModelDefRef() == null)
+ {
+ LOG.error("Current state does not contain state model ref. id:" + getResourceName());
+ return false;
+ }
+ if (getSessionId() == null)
+ {
+ LOG.error("CurrentState does not contain session id, id : " + getResourceName());
+ return false;
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/Error.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Error.java b/helix-core/src/main/java/org/apache/helix/model/Error.java
new file mode 100644
index 0000000..6905d16
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/Error.java
@@ -0,0 +1,20 @@
+package org.apache.helix.model;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+public class Error extends HelixProperty
+{
+ public Error(ZNRecord record)
+ {
+ super(record);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public boolean isValid()
+ {
+ // TODO Auto-generated method stub
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
new file mode 100644
index 0000000..1e0eaaf
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+
+/**
+ * External view is an aggregation (across all instances)
+ * of current states for the partitions in a resource
+ */
+public class ExternalView extends HelixProperty
+{
+ public ExternalView(String resource)
+ {
+ super(new ZNRecord(resource));
+ }
+
+ public ExternalView(ZNRecord record)
+ {
+ super(record);
+ }
+
+ public void setState(String partition, String instance, String state)
+ {
+ if(_record.getMapField(partition) == null)
+ {
+ _record.setMapField(partition, new TreeMap<String, String>());
+ }
+ _record.getMapField(partition).put(instance, state);
+ }
+
+ public void setStateMap(String partitionName,
+ Map<String, String> currentStateMap)
+ {
+ _record.setMapField(partitionName, currentStateMap);
+ }
+
+ public Set<String> getPartitionSet()
+ {
+ return _record.getMapFields().keySet();
+ }
+
+ public Map<String, String> getStateMap(String partitionName)
+ {
+ return _record.getMapField(partitionName);
+ }
+
+ public String getResourceName()
+ {
+ return _record.getId();
+ }
+
+ @Override
+ public boolean isValid()
+ {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/HealthStat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/HealthStat.java b/helix-core/src/main/java/org/apache/helix/model/HealthStat.java
new file mode 100644
index 0000000..40ea4d2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/HealthStat.java
@@ -0,0 +1,126 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.alerts.ExpressionParser;
+import org.apache.helix.alerts.StatsHolder;
+import org.apache.helix.model.Message.Attributes;
+import org.apache.log4j.Logger;
+
+
+public class HealthStat extends HelixProperty
+{
+ public enum HealthStatProperty
+ {
+ FIELDS
+ }
+ private static final Logger _logger = Logger.getLogger(HealthStat.class.getName());
+
+ public HealthStat(String id)
+ {
+ super(id);
+ }
+
+ public HealthStat(ZNRecord record)
+ {
+ super(record);
+ if(getCreateTimeStamp() == 0)
+ {
+ _record.setSimpleField(Attributes.CREATE_TIMESTAMP.toString(), ""
+ + new Date().getTime());
+ }
+ }
+
+ public long getLastModifiedTimeStamp()
+ {
+ return _record.getModifiedTime();
+ }
+
+ public long getCreateTimeStamp()
+ {
+ if (_record.getSimpleField(Attributes.CREATE_TIMESTAMP.toString()) == null)
+ {
+ return 0;
+ }
+ try
+ {
+ return Long.parseLong(_record.getSimpleField(Attributes.CREATE_TIMESTAMP.toString()));
+ } catch (Exception e)
+ {
+ return 0;
+ }
+ }
+
+ public String getTestField()
+ {
+ return _record.getSimpleField("requestCountStat");
+ }
+
+ public void setHealthFields(Map<String, Map<String, String>> healthFields)
+ {
+ _record.setMapFields(healthFields);
+ }
+
+ public String buildCompositeKey(String instance, String parentKey, String statName ) {
+ String delim = ExpressionParser.statFieldDelim;
+ return instance+delim+parentKey+delim+statName;
+ }
+
+ public Map<String, Map<String, String>> getHealthFields(String instanceName) // ,
+ // String
+ // timestamp)
+ {
+ // XXX: need to do some conversion of input format to the format that stats
+ // computation wants
+ Map<String, Map<String, String>> currMapFields = _record.getMapFields();
+ Map<String, Map<String, String>> convertedMapFields = new HashMap<String, Map<String, String>>();
+ for (String key : currMapFields.keySet())
+ {
+ Map<String, String> currMap = currMapFields.get(key);
+ String timestamp = "-1";
+ if (_record.getSimpleFields().keySet().contains(StatsHolder.TIMESTAMP_NAME))
+ {
+ timestamp = _record.getSimpleField(StatsHolder.TIMESTAMP_NAME);
+ }
+ for (String subKey : currMap.keySet())
+ {
+ if (subKey.equals("StatsHolder.TIMESTAMP_NAME"))
+ { // don't want to get timestamp again
+ continue;
+ }
+ String compositeKey = buildCompositeKey(instanceName, key, subKey);
+ String value = currMap.get(subKey);
+ Map<String, String> convertedMap = new HashMap<String, String>();
+ convertedMap.put(StatsHolder.VALUE_NAME, value);
+ convertedMap.put(StatsHolder.TIMESTAMP_NAME, timestamp);
+ convertedMapFields.put(compositeKey, convertedMap);
+ }
+ }
+ return convertedMapFields;
+ }
+
+ @Override
+ public boolean isValid() {
+ // TODO Auto-generated method stub
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
new file mode 100644
index 0000000..57b0592
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -0,0 +1,312 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+
+
+/**
+ * The ideal states of all partition in a resource
+ */
+public class IdealState extends HelixProperty
+{
+ public enum IdealStateProperty
+ {
+ NUM_PARTITIONS,
+ STATE_MODEL_DEF_REF,
+ STATE_MODEL_FACTORY_NAME,
+ REPLICAS,
+ IDEAL_STATE_MODE,
+ REBALANCE_TIMER_PERIOD
+ }
+
+ public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
+
+ public enum IdealStateModeProperty
+ {
+ AUTO, CUSTOMIZED, AUTO_REBALANCE
+ }
+
+ private static final Logger logger = Logger.getLogger(IdealState.class.getName());
+
+ public IdealState(String resourceName)
+ {
+ super(resourceName);
+ }
+
+ public IdealState(ZNRecord record)
+ {
+ super(record);
+ }
+
+ public String getResourceName()
+ {
+ return _record.getId();
+ }
+
+ public void setIdealStateMode(String mode)
+ {
+ _record.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(), mode);
+ }
+
+ public IdealStateModeProperty getIdealStateMode()
+ {
+ String mode = _record.getSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString());
+ try
+ {
+ return IdealStateModeProperty.valueOf(mode);
+ }
+ catch (Exception e)
+ {
+ return IdealStateModeProperty.AUTO;
+ }
+ }
+
+ public void setPartitionState(String partitionName, String instanceName, String state)
+ {
+ Map<String, String> mapField = _record.getMapField(partitionName);
+ if (mapField == null)
+ {
+ _record.setMapField(partitionName, new TreeMap<String, String>());
+ }
+ _record.getMapField(partitionName).put(instanceName, state);
+ }
+
+ public Set<String> getPartitionSet()
+ {
+ if (getIdealStateMode() == IdealStateModeProperty.AUTO
+ || getIdealStateMode() == IdealStateModeProperty.AUTO_REBALANCE)
+ {
+ return _record.getListFields().keySet();
+ }
+ else if (getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
+ {
+ return _record.getMapFields().keySet();
+ }
+ else
+ {
+ logger.error("Invalid ideal state mode:" + getResourceName());
+ return Collections.emptySet();
+ }
+ }
+
+ public Map<String, String> getInstanceStateMap(String partitionName)
+ {
+ return _record.getMapField(partitionName);
+ }
+
+ public Set<String> getInstanceSet(String partitionName)
+ {
+ if (getIdealStateMode() == IdealStateModeProperty.AUTO
+ || getIdealStateMode() == IdealStateModeProperty.AUTO_REBALANCE)
+ {
+ List<String> prefList = _record.getListField(partitionName);
+ if (prefList != null)
+ {
+ return new TreeSet<String>(prefList);
+ }
+ else
+ {
+ logger.warn(partitionName + " does NOT exist");
+ return Collections.emptySet();
+ }
+ }
+ else if (getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
+ {
+ Map<String, String> stateMap = _record.getMapField(partitionName);
+ if (stateMap != null)
+ {
+ return new TreeSet<String>(stateMap.keySet());
+ }
+ else
+ {
+ logger.warn(partitionName + " does NOT exist");
+ return Collections.emptySet();
+ }
+ }
+ else
+ {
+ logger.error("Invalid ideal state mode: " + getResourceName());
+ return Collections.emptySet();
+ }
+
+ }
+
+ public List<String> getPreferenceList(String partitionName)
+ {
+ List<String> instanceStateList = _record.getListField(partitionName);
+
+ if (instanceStateList != null)
+ {
+ return instanceStateList;
+ }
+ logger.warn("Resource key:" + partitionName
+ + " does not have a pre-computed preference list.");
+ return null;
+ }
+
+ public String getStateModelDefRef()
+ {
+ return _record.getSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString());
+ }
+
+ public void setStateModelDefRef(String stateModel)
+ {
+ _record.setSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString(), stateModel);
+ }
+
+ public void setNumPartitions(int numPartitions)
+ {
+ _record.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
+ String.valueOf(numPartitions));
+ }
+
+ public int getNumPartitions()
+ {
+ String numPartitionStr =
+ _record.getSimpleField(IdealStateProperty.NUM_PARTITIONS.toString());
+
+ try
+ {
+ return Integer.parseInt(numPartitionStr);
+ }
+ catch (Exception e)
+ {
+ logger.error("Can't parse number of partitions: " + numPartitionStr, e);
+ return -1;
+ }
+ }
+
+ public void setReplicas(String replicas)
+ {
+ _record.setSimpleField(IdealStateProperty.REPLICAS.toString(), replicas);
+ }
+
+ public String getReplicas()
+ {
+ // HACK: if replica doesn't exists, use the length of the first list field instead
+ // TODO: remove it when Dbus fixed the IdealState writer
+ String replica = _record.getSimpleField(IdealStateProperty.REPLICAS.toString());
+ if (replica == null)
+ {
+ String firstPartition = null;
+ switch (getIdealStateMode())
+ {
+ case AUTO:
+ if (_record.getListFields().size() == 0)
+ {
+ replica = "0";
+ }
+ else
+ {
+ firstPartition = new ArrayList<String>(_record.getListFields().keySet()).get(0);
+ replica = Integer.toString(firstPartition == null ? 0 : _record.getListField(firstPartition).size());
+ }
+ logger.warn("could NOT found replicas in idealState. Use size of the first list instead. replica: "
+ + replica + ", 1st partition: " + firstPartition);
+ break;
+ case CUSTOMIZED:
+ if (_record.getMapFields().size() == 0)
+ {
+ replica = "0";
+ }
+ else
+ {
+ firstPartition = new ArrayList<String>(_record.getMapFields().keySet()).get(0);
+ replica = Integer.toString(firstPartition == null ? 0 : _record.getMapField(firstPartition).size());
+ }
+ logger.warn("could NOT found replicas in idealState. Use size of the first map instead. replica: "
+ + replica + ", 1st partition: " + firstPartition);
+ break;
+ default:
+ replica = "0";
+ logger.error("could NOT determine replicas. set to 0");
+ break;
+ }
+ }
+
+ return replica;
+ }
+
+ public void setStateModelFactoryName(String name)
+ {
+ _record.setSimpleField(IdealStateProperty.STATE_MODEL_FACTORY_NAME.toString(), name);
+ }
+
+ public String getStateModelFactoryName()
+ {
+ String ftyName = _record.getSimpleField(IdealStateProperty.STATE_MODEL_FACTORY_NAME.toString());
+ if (ftyName == null)
+ {
+ ftyName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
+ }
+ return ftyName;
+ }
+
+ public int getRebalanceTimerPeriod()
+ {
+ if (_record.getSimpleFields()
+ .containsKey(IdealStateProperty.REBALANCE_TIMER_PERIOD.toString()))
+ {
+ try
+ {
+ int result =
+ Integer.parseInt(_record.getSimpleField(IdealStateProperty.REBALANCE_TIMER_PERIOD.toString()));
+ return result;
+ }
+ catch (Exception e)
+ {
+ logger.error("", e);
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ public boolean isValid()
+ {
+ if (getNumPartitions() < 0)
+ {
+ logger.error("idealState:" + _record + " does not have number of partitions (was "
+ + getNumPartitions() + ").");
+ return false;
+ }
+
+ if (getStateModelDefRef() == null)
+ {
+ logger.error("idealStates:" + _record + " does not have state model definition.");
+ return false;
+ }
+
+ if (getIdealStateMode() == IdealStateModeProperty.AUTO && getReplicas() == null)
+ {
+ logger.error("idealStates:" + _record + " does not have replica.");
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
new file mode 100644
index 0000000..fefa5d9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -0,0 +1,190 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Instance configurations
+ */
+public class InstanceConfig extends HelixProperty
+{
+ public enum InstanceConfigProperty
+ {
+ HELIX_HOST,
+ HELIX_PORT,
+ HELIX_ENABLED,
+ HELIX_DISABLED_PARTITION
+ }
+ private static final Logger _logger = Logger.getLogger(InstanceConfig.class.getName());
+
+ public InstanceConfig(String id)
+ {
+ super(id);
+ }
+
+ public InstanceConfig(ZNRecord record)
+ {
+ super(record);
+ }
+
+ public String getHostName()
+ {
+ return _record.getSimpleField(InstanceConfigProperty.HELIX_HOST.toString());
+ }
+
+ public void setHostName(String hostName)
+ {
+ _record.setSimpleField(InstanceConfigProperty.HELIX_HOST.toString(), hostName);
+ }
+
+ public String getPort()
+ {
+ return _record.getSimpleField(InstanceConfigProperty.HELIX_PORT.toString());
+ }
+
+ public void setPort(String port)
+ {
+ _record.setSimpleField(InstanceConfigProperty.HELIX_PORT.toString(), port);
+ }
+
+ public boolean getInstanceEnabled()
+ {
+ String isEnabled = _record.getSimpleField(InstanceConfigProperty.HELIX_ENABLED.toString());
+ return Boolean.parseBoolean(isEnabled);
+ }
+
+ public void setInstanceEnabled(boolean enabled)
+ {
+ _record.setSimpleField(InstanceConfigProperty.HELIX_ENABLED.toString(), Boolean.toString(enabled));
+ }
+
+
+ public boolean getInstanceEnabledForPartition(String partition)
+ {
+ // Map<String, String> disabledPartitionMap = _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+ List<String> disabledPartitions = _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+ if (disabledPartitions != null && disabledPartitions.contains(partition))
+ {
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ public Map<String, String> getDisabledPartitionMap()
+ {
+ return _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+ }
+
+ public void setInstanceEnabledForPartition(String partitionName, boolean enabled)
+ {
+// if (_record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString()) == null)
+// {
+// _record.setMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(),
+// new TreeMap<String, String>());
+// }
+// if (enabled == true)
+// {
+// _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString()).remove(partition);
+// }
+// else
+// {
+// _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString()).put(partition, Boolean.toString(false));
+// }
+
+ List<String> list =
+ _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+ Set<String> disabledPartitions = new HashSet<String>();
+ if (list != null)
+ {
+ disabledPartitions.addAll(list);
+ }
+
+ if (enabled)
+ {
+ disabledPartitions.remove(partitionName);
+ }
+ else
+ {
+ disabledPartitions.add(partitionName);
+ }
+
+ list = new ArrayList<String>(disabledPartitions);
+ Collections.sort(list);
+ _record.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(),
+ list);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj instanceof InstanceConfig)
+ {
+ InstanceConfig that = (InstanceConfig) obj;
+
+ if (this.getHostName().equals(that.getHostName()) && this.getPort().equals(that.getPort()))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode()
+ {
+
+ StringBuffer sb = new StringBuffer();
+ sb.append(this.getHostName());
+ sb.append("_");
+ sb.append(this.getPort());
+ return sb.toString().hashCode();
+ }
+
+ public String getInstanceName()
+ {
+ return _record.getId();
+ }
+
+ @Override
+ public boolean isValid()
+ {
+ if(getHostName() == null)
+ {
+ _logger.error("instanceconfig does not have host name. id:" + _record.getId());
+ return false;
+ }
+ if(getPort() == null)
+ {
+ _logger.error("instanceconfig does not have host port. id:" + _record.getId());
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/LeaderHistory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/LeaderHistory.java b/helix-core/src/main/java/org/apache/helix/model/LeaderHistory.java
new file mode 100644
index 0000000..d3db260
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/LeaderHistory.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+
+public class LeaderHistory extends HelixProperty
+{
+ private final static int HISTORY_SIZE = 8;
+
+ public LeaderHistory(String id)
+ {
+ super(id);
+ }
+
+ public LeaderHistory(ZNRecord record)
+ {
+ super(record);
+ }
+
+ /**
+ * Save up to HISTORY_SIZE number of leaders in FIFO order
+ * @param clusterName
+ * @param instanceName
+ */
+ public void updateHistory(String clusterName, String instanceName)
+ {
+ List<String> list = _record.getListField(clusterName);
+ if (list == null)
+ {
+ list = new ArrayList<String>();
+ _record.setListField(clusterName, list);
+ }
+
+ if (list.size() == HISTORY_SIZE)
+ {
+ list.remove(0);
+ }
+ list.add(instanceName);
+ }
+
+ @Override
+ public boolean isValid()
+ {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
new file mode 100644
index 0000000..7bf5348
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
@@ -0,0 +1,115 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import java.util.Date;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.Message.Attributes;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Instance that connects to zookeeper
+ */
+public class LiveInstance extends HelixProperty
+{
+ public enum LiveInstanceProperty
+ {
+ SESSION_ID,
+ HELIX_VERSION,
+ LIVE_INSTANCE,
+ ZKPROPERTYTRANSFERURL
+ }
+
+ private static final Logger _logger = Logger.getLogger(LiveInstance.class.getName());
+
+ public LiveInstance(String id)
+ {
+ super(id);
+ }
+
+ public LiveInstance(ZNRecord record)
+ {
+ super(record);
+ }
+
+ public void setSessionId(String sessionId)
+ {
+ _record.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), sessionId);
+ }
+
+ public String getSessionId()
+ {
+ return _record.getSimpleField(LiveInstanceProperty.SESSION_ID.toString());
+ }
+
+ public String getInstanceName()
+ {
+ return _record.getId();
+ }
+
+ public String getHelixVersion()
+ {
+ return _record.getSimpleField(LiveInstanceProperty.HELIX_VERSION.toString());
+ }
+
+ public void setHelixVersion(String helixVersion)
+ {
+ _record.setSimpleField(LiveInstanceProperty.HELIX_VERSION.toString(), helixVersion);
+ }
+
+ public String getLiveInstance()
+ {
+ return _record.getSimpleField(LiveInstanceProperty.LIVE_INSTANCE.toString());
+ }
+
+ public void setLiveInstance(String leader)
+ {
+ _record.setSimpleField(LiveInstanceProperty.LIVE_INSTANCE.toString(), leader);
+ }
+
+ public long getModifiedTime()
+ {
+ return _record.getModifiedTime();
+ }
+
+ public String getWebserviceUrl()
+ {
+ return _record.getSimpleField(LiveInstanceProperty.ZKPROPERTYTRANSFERURL.toString());
+ }
+
+ public void setWebserviceUrl(String url)
+ {
+ _record.setSimpleField(LiveInstanceProperty.ZKPROPERTYTRANSFERURL.toString(), url);
+ }
+ @Override
+ public boolean isValid()
+ {
+ if(getSessionId() == null)
+ {
+ _logger.error("liveInstance does not have session id. id:" + _record.getId());
+ return false;
+ }
+ if(getHelixVersion() == null)
+ {
+ _logger.error("liveInstance does not have CLM verion. id:" + _record.getId());
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
new file mode 100644
index 0000000..8c2f839
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -0,0 +1,584 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+
+
+/**
+ * Message class basically extends ZNRecord but provides additional fields
+ *
+ * @author kgopalak
+ */
+
+public class Message extends HelixProperty
+{
+ public enum MessageType
+ {
+ STATE_TRANSITION,
+ SCHEDULER_MSG,
+ USER_DEFINE_MSG,
+ CONTROLLER_MSG,
+ TASK_REPLY,
+ NO_OP,
+ PARTICIPANT_ERROR_REPORT
+ };
+
+ public enum Attributes
+ {
+ MSG_ID,
+ SRC_SESSION_ID,
+ TGT_SESSION_ID,
+ SRC_NAME,
+ TGT_NAME,
+ SRC_INSTANCE_TYPE,
+ MSG_STATE,
+ PARTITION_NAME,
+ RESOURCE_NAME,
+ FROM_STATE,
+ TO_STATE,
+ STATE_MODEL_DEF,
+ CREATE_TIMESTAMP,
+ READ_TIMESTAMP,
+ EXECUTE_START_TIMESTAMP,
+ MSG_TYPE,
+ MSG_SUBTYPE,
+ CORRELATION_ID,
+ MESSAGE_RESULT,
+ EXE_SESSION_ID,
+ TIMEOUT,
+ RETRY_COUNT,
+ STATE_MODEL_FACTORY_NAME,
+ BUCKET_SIZE,
+ PARENT_MSG_ID // used for group message mode
+ }
+
+ public enum MessageState
+ {
+ NEW,
+ READ, // not used
+ UNPROCESSABLE // get exception when create handler
+ }
+
+ public static final Comparator<Message> CREATE_TIME_COMPARATOR = new Comparator<Message>(){
+ @Override
+ public int compare(Message m1, Message m2)
+ {
+// long t1 = m1.getCreateTimeStamp();
+// long t2 = m2.getCreateTimeStamp();
+ return (int) (m1.getCreateTimeStamp() - m2.getCreateTimeStamp());
+ }
+ };
+
+ // AtomicInteger _groupMsgCountDown = new AtomicInteger(1);
+
+ public Message(MessageType type, String msgId)
+ {
+ this(type.toString(), msgId);
+ }
+
+ public Message(String type, String msgId)
+ {
+ super(new ZNRecord(msgId));
+ _record.setSimpleField(Attributes.MSG_TYPE.toString(), type);
+ setMsgId(msgId);
+ setMsgState(MessageState.NEW);
+ _record.setSimpleField(Attributes.CREATE_TIMESTAMP.toString(),
+ "" + new Date().getTime());
+ }
+
+ public Message(ZNRecord record)
+ {
+ super(record);
+ if (getMsgState() == null)
+ {
+ setMsgState(MessageState.NEW);
+ }
+ if (getCreateTimeStamp() == 0)
+ {
+ _record.setSimpleField(Attributes.CREATE_TIMESTAMP.toString(),
+ "" + new Date().getTime());
+ }
+ }
+
+ public void setCreateTimeStamp(long timestamp)
+ {
+ _record.setSimpleField(Attributes.CREATE_TIMESTAMP.toString(), "" + timestamp);
+ }
+
+ public Message(ZNRecord record, String id)
+ {
+ super(new ZNRecord(record, id));
+ setMsgId(id);
+ }
+
+ public void setMsgSubType(String subType)
+ {
+ _record.setSimpleField(Attributes.MSG_SUBTYPE.toString(), subType);
+ }
+
+ public String getMsgSubType()
+ {
+ return _record.getSimpleField(Attributes.MSG_SUBTYPE.toString());
+ }
+
+ void setMsgType(MessageType type)
+ {
+ _record.setSimpleField(Attributes.MSG_TYPE.toString(), type.toString());
+ }
+
+ public String getMsgType()
+ {
+ return _record.getSimpleField(Attributes.MSG_TYPE.toString());
+ }
+
+ public String getTgtSessionId()
+ {
+ return _record.getSimpleField(Attributes.TGT_SESSION_ID.toString());
+ }
+
+ public void setTgtSessionId(String tgtSessionId)
+ {
+ _record.setSimpleField(Attributes.TGT_SESSION_ID.toString(), tgtSessionId);
+ }
+
+ public String getSrcSessionId()
+ {
+ return _record.getSimpleField(Attributes.SRC_SESSION_ID.toString());
+ }
+
+ public void setSrcSessionId(String srcSessionId)
+ {
+ _record.setSimpleField(Attributes.SRC_SESSION_ID.toString(), srcSessionId);
+ }
+
+ public String getExecutionSessionId()
+ {
+ return _record.getSimpleField(Attributes.EXE_SESSION_ID.toString());
+ }
+
+ public void setExecuteSessionId(String exeSessionId)
+ {
+ _record.setSimpleField(Attributes.EXE_SESSION_ID.toString(), exeSessionId);
+ }
+
+ public String getMsgSrc()
+ {
+ return _record.getSimpleField(Attributes.SRC_NAME.toString());
+ }
+
+ public void setSrcInstanceType(InstanceType type)
+ {
+ _record.setSimpleField(Attributes.SRC_INSTANCE_TYPE.toString(), type.toString());
+ }
+
+ public InstanceType getSrcInstanceType()
+ {
+ if (_record.getSimpleFields().containsKey(Attributes.SRC_INSTANCE_TYPE.toString()))
+ {
+ return InstanceType.valueOf(_record.getSimpleField(Attributes.SRC_INSTANCE_TYPE.toString()));
+ }
+ return InstanceType.PARTICIPANT;
+ }
+
+ public void setSrcName(String msgSrc)
+ {
+ _record.setSimpleField(Attributes.SRC_NAME.toString(), msgSrc);
+ }
+
+ public String getTgtName()
+ {
+ return _record.getSimpleField(Attributes.TGT_NAME.toString());
+ }
+
+ public void setMsgState(MessageState msgState)
+ { // HACK: The "tolowerCase()" call is to make the change backward compatible
+ _record.setSimpleField(Attributes.MSG_STATE.toString(), msgState.toString()
+ .toLowerCase());
+ }
+
+ public MessageState getMsgState()
+ {
+ // HACK: The "toUpperCase()" call is to make the change backward compatible
+ return MessageState.valueOf(_record.getSimpleField(Attributes.MSG_STATE.toString())
+ .toUpperCase());
+ }
+
+ public void setPartitionName(String partitionName)
+ {
+ _record.setSimpleField(Attributes.PARTITION_NAME.toString(), partitionName);
+ }
+
+ public String getMsgId()
+ {
+ return _record.getSimpleField(Attributes.MSG_ID.toString());
+ }
+
+ public void setMsgId(String msgId)
+ {
+ _record.setSimpleField(Attributes.MSG_ID.toString(), msgId);
+ }
+
+ public void setFromState(String state)
+ {
+ _record.setSimpleField(Attributes.FROM_STATE.toString(), state);
+ }
+
+ public String getFromState()
+ {
+ return _record.getSimpleField(Attributes.FROM_STATE.toString());
+ }
+
+ public void setToState(String state)
+ {
+ _record.setSimpleField(Attributes.TO_STATE.toString(), state);
+ }
+
+ public String getToState()
+ {
+ return _record.getSimpleField(Attributes.TO_STATE.toString());
+ }
+
+ public void setTgtName(String msgTgt)
+ {
+ _record.setSimpleField(Attributes.TGT_NAME.toString(), msgTgt);
+ }
+
+ public Boolean getDebug()
+ {
+ return false;
+ }
+
+ public Integer getGeneration()
+ {
+ return 1;
+ }
+
+ public void setResourceName(String resourceName)
+ {
+ _record.setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceName);
+ }
+
+ public String getResourceName()
+ {
+ return _record.getSimpleField(Attributes.RESOURCE_NAME.toString());
+ }
+
+ public String getPartitionName()
+ {
+ return _record.getSimpleField(Attributes.PARTITION_NAME.toString());
+ }
+
+ public String getStateModelDef()
+ {
+ return _record.getSimpleField(Attributes.STATE_MODEL_DEF.toString());
+ }
+
+ public void setStateModelDef(String stateModelDefName)
+ {
+ _record.setSimpleField(Attributes.STATE_MODEL_DEF.toString(), stateModelDefName);
+ }
+
+ public void setReadTimeStamp(long time)
+ {
+ _record.setSimpleField(Attributes.READ_TIMESTAMP.toString(), "" + time);
+ }
+
+ public void setExecuteStartTimeStamp(long time)
+ {
+ _record.setSimpleField(Attributes.EXECUTE_START_TIMESTAMP.toString(), "" + time);
+ }
+
+ public long getReadTimeStamp()
+ {
+ String timestamp = _record.getSimpleField(Attributes.READ_TIMESTAMP.toString());
+ if (timestamp == null)
+ {
+ return 0;
+ }
+ try
+ {
+ return Long.parseLong(timestamp);
+ }
+ catch (Exception e)
+ {
+ return 0;
+ }
+
+ }
+
+ public long getExecuteStartTimeStamp()
+ {
+ String timestamp =
+ _record.getSimpleField(Attributes.EXECUTE_START_TIMESTAMP.toString());
+ if (timestamp == null)
+ {
+ return 0;
+ }
+ try
+ {
+ return Long.parseLong(timestamp);
+ }
+ catch (Exception e)
+ {
+ return 0;
+ }
+ }
+
+ public long getCreateTimeStamp()
+ {
+ if (_record.getSimpleField(Attributes.CREATE_TIMESTAMP.toString()) == null)
+ {
+ return 0;
+ }
+ try
+ {
+ return Long.parseLong(_record.getSimpleField(Attributes.CREATE_TIMESTAMP.toString()));
+ }
+ catch (Exception e)
+ {
+ return 0;
+ }
+ }
+
+ public void setCorrelationId(String correlationId)
+ {
+ _record.setSimpleField(Attributes.CORRELATION_ID.toString(), correlationId);
+ }
+
+ public String getCorrelationId()
+ {
+ return _record.getSimpleField(Attributes.CORRELATION_ID.toString());
+ }
+
+ public int getExecutionTimeout()
+ {
+ if (!_record.getSimpleFields().containsKey(Attributes.TIMEOUT.toString()))
+ {
+ return -1;
+ }
+ try
+ {
+ return Integer.parseInt(_record.getSimpleField(Attributes.TIMEOUT.toString()));
+ }
+ catch (Exception e)
+ {
+ }
+ return -1;
+ }
+
+ public void setExecutionTimeout(int timeout)
+ {
+ _record.setSimpleField(Attributes.TIMEOUT.toString(), "" + timeout);
+ }
+
+ public void setRetryCount(int retryCount)
+ {
+ _record.setSimpleField(Attributes.RETRY_COUNT.toString(), "" + retryCount);
+ }
+
+ public int getRetryCount()
+ {
+ try
+ {
+ return Integer.parseInt(_record.getSimpleField(Attributes.RETRY_COUNT.toString()));
+ }
+ catch (Exception e)
+ {
+ }
+ // Default to 0, and there is no retry if timeout happens
+ return 0;
+ }
+
+ public Map<String, String> getResultMap()
+ {
+ return _record.getMapField(Attributes.MESSAGE_RESULT.toString());
+ }
+
+ public void setResultMap(Map<String, String> resultMap)
+ {
+ _record.setMapField(Attributes.MESSAGE_RESULT.toString(), resultMap);
+ }
+
+ public String getStateModelFactoryName()
+ {
+ return _record.getSimpleField(Attributes.STATE_MODEL_FACTORY_NAME.toString());
+ }
+
+ public void setStateModelFactoryName(String factoryName)
+ {
+ _record.setSimpleField(Attributes.STATE_MODEL_FACTORY_NAME.toString(), factoryName);
+ }
+
+ // TODO: remove this. impl in HelixProperty
+ @Override
+ public int getBucketSize()
+ {
+ String bucketSizeStr = _record.getSimpleField(Attributes.BUCKET_SIZE.toString());
+ int bucketSize = 0;
+ if (bucketSizeStr != null)
+ {
+ try
+ {
+ bucketSize = Integer.parseInt(bucketSizeStr);
+ }
+ catch (NumberFormatException e)
+ {
+ // OK
+ }
+ }
+ return bucketSize;
+ }
+
+ @Override
+ public void setBucketSize(int bucketSize)
+ {
+ if (bucketSize > 0)
+ {
+ _record.setSimpleField(Attributes.BUCKET_SIZE.toString(), "" + bucketSize);
+ }
+ }
+
+ public void setAttribute(Attributes attr, String val)
+ {
+ _record.setSimpleField(attr.toString(), val);
+ }
+
+ public String getAttribute(Attributes attr)
+ {
+ return _record.getSimpleField(attr.toString());
+ }
+
+ public static Message createReplyMessage(Message srcMessage,
+ String instanceName,
+ Map<String, String> taskResultMap)
+ {
+ if (srcMessage.getCorrelationId() == null)
+ {
+ throw new HelixException("Message " + srcMessage.getMsgId()
+ + " does not contain correlation id");
+ }
+ Message replyMessage =
+ new Message(MessageType.TASK_REPLY, UUID.randomUUID().toString());
+ replyMessage.setCorrelationId(srcMessage.getCorrelationId());
+ replyMessage.setResultMap(taskResultMap);
+ replyMessage.setTgtSessionId("*");
+ replyMessage.setMsgState(MessageState.NEW);
+ replyMessage.setSrcName(instanceName);
+ if (srcMessage.getSrcInstanceType() == InstanceType.CONTROLLER)
+ {
+ replyMessage.setTgtName("Controller");
+ }
+ else
+ {
+ replyMessage.setTgtName(srcMessage.getMsgSrc());
+ }
+ return replyMessage;
+ }
+
+ public void addPartitionName(String partitionName)
+ {
+ if (_record.getListField(Attributes.PARTITION_NAME.toString()) == null)
+ {
+ _record.setListField(Attributes.PARTITION_NAME.toString(), new ArrayList<String>());
+ }
+
+ List<String> partitionNames = _record.getListField(Attributes.PARTITION_NAME.toString());
+ if (!partitionNames.contains(partitionName))
+ {
+ partitionNames.add(partitionName);
+ }
+ }
+
+ public List<String> getPartitionNames()
+ {
+ List<String> partitionNames =
+ _record.getListField(Attributes.PARTITION_NAME.toString());
+ if (partitionNames == null)
+ {
+ return Collections.emptyList();
+ }
+
+ return partitionNames;
+ }
+
+// public AtomicInteger getGroupMsgCountDown()
+// {
+// return _groupMsgCountDown;
+// }
+//
+// public void setGroupMsgCountDown(AtomicInteger countDown)
+// {
+// _groupMsgCountDown = countDown;
+// }
+
+ public boolean isControlerMsg()
+ {
+ return getTgtName().equalsIgnoreCase("controller");
+ }
+
+ public PropertyKey getKey(Builder keyBuilder, String instanceName)
+ {
+ if (isControlerMsg())
+ {
+ return keyBuilder.controllerMessage(getId());
+ }
+ else
+ {
+ return keyBuilder.message(instanceName, getId());
+ }
+ }
+
+ // TODO replace with util from espresso or linkedin
+ private boolean isNullOrEmpty(String data)
+ {
+ return data == null || data.length() == 0 || data.trim().length() == 0;
+ }
+
+ @Override
+ public boolean isValid()
+ {
+ // TODO: refactor message to state transition message and task-message and
+ // implement this function separately
+
+ if (getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
+ {
+ boolean isNotValid =
+ isNullOrEmpty(getTgtName()) || isNullOrEmpty(getPartitionName())
+ || isNullOrEmpty(getResourceName()) || isNullOrEmpty(getStateModelDef())
+ || isNullOrEmpty(getToState()) || isNullOrEmpty(getStateModelFactoryName())
+ || isNullOrEmpty(getFromState());
+
+ return !isNotValid;
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/Partition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Partition.java b/helix-core/src/main/java/org/apache/helix/model/Partition.java
new file mode 100644
index 0000000..5512c6b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/Partition.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+/**
+ * The name of a resource
+ */
+public class Partition
+{
+ private final String _partitionName;
+
+ public String getPartitionName()
+ {
+ return _partitionName;
+ }
+
+ public Partition(String partitionName)
+ {
+ this._partitionName = partitionName;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if(obj == null || !(obj instanceof Partition)){
+ return false;
+ }
+
+ Partition that = (Partition)obj;
+ return _partitionName.equals(that.getPartitionName());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _partitionName.hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return _partitionName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/PauseSignal.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/PauseSignal.java b/helix-core/src/main/java/org/apache/helix/model/PauseSignal.java
new file mode 100644
index 0000000..a68af6b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/PauseSignal.java
@@ -0,0 +1,38 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+public class PauseSignal extends HelixProperty
+{
+ public PauseSignal(String id)
+ {
+ super(id);
+ }
+
+ public PauseSignal(ZNRecord record)
+ {
+ super(record);
+ }
+
+ @Override
+ public boolean isValid()
+ {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/PersistentStats.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/PersistentStats.java b/helix-core/src/main/java/org/apache/helix/model/PersistentStats.java
new file mode 100644
index 0000000..41adecf
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/PersistentStats.java
@@ -0,0 +1,98 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+
+import java.util.Map;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+
+
+public class PersistentStats extends HelixProperty
+{
+ private static final Logger _logger = Logger.getLogger(PersistentStats.class.getName());
+
+ public enum PersistentStatsProperty
+ {
+ SESSION_ID,
+ FIELDS
+ }
+
+// private final ZNRecord _record;
+
+ public final static String nodeName = "PersistentStats";
+
+
+ public PersistentStats(String id)
+ {
+ super(id);
+ }
+
+ public PersistentStats(ZNRecord record)
+ {
+// _record = record;
+ super(record);
+
+ }
+
+ /*
+ public PersistentStats(ZNRecord record, Stat stat)
+ {
+ super(record, stat);
+ }
+*/
+
+ public void setSessionId(String sessionId){
+ _record.setSimpleField(PersistentStatsProperty.SESSION_ID.toString(), sessionId);
+ }
+ public String getSessionId()
+ {
+ return _record.getSimpleField(PersistentStatsProperty.SESSION_ID.toString());
+ }
+
+ public String getInstanceName()
+ {
+ return _record.getId();
+ }
+
+ /*
+ public String getVersion()
+ {
+ return _record.getSimpleField(CLUSTER_MANAGER_VERSION.toString());
+ }
+ */
+
+
+ public Map<String, Map<String, String>> getMapFields() {
+ return _record.getMapFields();
+ }
+
+
+ public Map<String, String> getStatFields(String statName) {
+ return _record.getMapField(statName);
+ }
+
+@Override
+public boolean isValid() {
+ // TODO Auto-generated method stub
+ return true;
+}
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Resource.java b/helix-core/src/main/java/org/apache/helix/model/Resource.java
new file mode 100644
index 0000000..247cbeb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/Resource.java
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixConstants;
+import org.apache.log4j.Logger;
+
+
+/**
+ * A resource contains a set of partitions
+ */
+public class Resource
+{
+ private static Logger LOG = Logger.getLogger(Resource.class);
+
+ private final String _resourceName;
+ private final Map<String, Partition> _partitionMap;
+ private String _stateModelDefRef;
+ private String _stateModelFactoryName;
+ private int _bucketSize = 0;
+ private boolean _groupMessageMode = false;
+
+ public Resource(String resourceName)
+ {
+ this._resourceName = resourceName;
+ this._partitionMap = new LinkedHashMap<String, Partition>();
+ }
+
+ public String getStateModelDefRef()
+ {
+ return _stateModelDefRef;
+ }
+
+ public void setStateModelDefRef(String stateModelDefRef)
+ {
+ _stateModelDefRef = stateModelDefRef;
+ }
+
+ public void setStateModelFactoryName(String factoryName)
+ {
+ if (factoryName == null)
+ {
+ _stateModelFactoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
+ } else
+ {
+ _stateModelFactoryName = factoryName;
+ }
+ }
+
+ public String getStateModelFactoryname()
+ {
+ return _stateModelFactoryName;
+ }
+
+ public String getResourceName()
+ {
+ return _resourceName;
+ }
+
+ public Collection<Partition> getPartitions()
+ {
+ return _partitionMap.values();
+ }
+
+ public void addPartition(String partitionName)
+ {
+ _partitionMap.put(partitionName, new Partition(partitionName));
+ }
+
+ public Partition getPartition(String partitionName)
+ {
+ return _partitionMap.get(partitionName);
+ }
+
+ public int getBucketSize()
+ {
+ return _bucketSize;
+ }
+
+ public void setBucketSize(int bucketSize)
+ {
+ _bucketSize = bucketSize;
+ }
+
+ public void setGroupMessageMode(boolean mode)
+ {
+ _groupMessageMode = mode;
+ }
+
+ public boolean getGroupMessageMode()
+ {
+ return _groupMessageMode;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("resourceName:").append(_resourceName);
+ sb.append(", stateModelDef:").append(_stateModelDefRef);
+ sb.append(", bucketSize:").append(_bucketSize);
+ sb.append(", partitionStateMap:").append(_partitionMap);
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
new file mode 100644
index 0000000..4681726
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -0,0 +1,21 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+public class ResourceAssignment
+{
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
new file mode 100644
index 0000000..693863f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -0,0 +1,143 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Describe the state model
+ */
+public class StateModelDefinition extends HelixProperty
+{
+ public enum StateModelDefinitionProperty
+ {
+ INITIAL_STATE,
+ STATE_TRANSITION_PRIORITYLIST,
+ STATE_PRIORITY_LIST
+ }
+ private static final Logger _logger = Logger.getLogger(StateModelDefinition.class.getName());
+ /**
+ * State Names in priority order. Indicates the order in which states are
+ * fulfilled
+ */
+ private final List<String> _statesPriorityList;
+
+ /**
+ * Specifies the number of instances for a given state <br>
+ * -1 don't care, don't try to keep any resource in this state on any instance <br>
+ * >0 any integer number greater than 0 specifies the number of instances
+ * needed to be in this state <br>
+ * R all instances in the preference list can be in this state <br>
+ * N all instances in the cluster will be put in this state.PreferenceList
+ * must be denoted as '*'
+ */
+ private final Map<String, String> _statesCountMap;
+
+ private final List<String> _stateTransitionPriorityList;
+
+ /**
+ * StateTransition which is used to find the nextState given StartState and
+ * FinalState
+ */
+ private final Map<String, Map<String, String>> _stateTransitionTable;
+
+ public StateModelDefinition(ZNRecord record)
+ {
+ super(record);
+
+ _statesPriorityList = record.getListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString());
+ _stateTransitionPriorityList = record
+ .getListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString());
+ _stateTransitionTable = new HashMap<String, Map<String, String>>();
+ _statesCountMap = new HashMap<String, String>();
+ if (_statesPriorityList != null)
+ {
+ for (String state : _statesPriorityList)
+ {
+ Map<String, String> metaData = record.getMapField(state + ".meta");
+ if (metaData != null)
+ {
+ if (metaData.get("count") != null)
+ {
+ _statesCountMap.put(state, metaData.get("count"));
+ }
+ }
+ Map<String, String> nextData = record.getMapField(state + ".next");
+ _stateTransitionTable.put(state, nextData);
+ }
+ }
+ }
+
+ public List<String> getStateTransitionPriorityList()
+ {
+ return _stateTransitionPriorityList;
+ }
+
+ public List<String> getStatesPriorityList()
+ {
+ return _statesPriorityList;
+ }
+
+ public String getNextStateForTransition(String fromState, String toState)
+ {
+ Map<String, String> map = _stateTransitionTable.get(fromState);
+ if (map != null)
+ {
+ return map.get(toState);
+ }
+ return null;
+ }
+
+ public String getInitialState()
+ {
+ return _record.getSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString());
+ }
+
+ public String getNumInstancesPerState(String state)
+ {
+ return _statesCountMap.get(state);
+ }
+
+ @Override
+ public boolean isValid()
+ {
+ if(getInitialState() == null)
+ {
+ _logger.error("State model does not contain init state, statemodel:" + _record.getId());
+ return false;
+ }
+ if(_record.getListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString()) == null)
+ {
+ _logger.error("CurrentState does not contain StatesPriorityList, state model : " + _record.getId());
+ return false;
+ }
+
+ // STATE_TRANSITION_PRIORITYLIST is optional
+// if(_record.getListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString()) == null)
+// {
+// _logger.error("CurrentState does not contain StateTransitionPriorityList, state model : " + _record.getId());
+// return false;
+// }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/StatusUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/StatusUpdate.java b/helix-core/src/main/java/org/apache/helix/model/StatusUpdate.java
new file mode 100644
index 0000000..a79e9ee
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/StatusUpdate.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.model;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+public class StatusUpdate extends HelixProperty
+{
+
+ public StatusUpdate(ZNRecord record)
+ {
+ super(record);
+ }
+
+ @Override
+ public boolean isValid()
+ {
+ return true;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/model/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/package-info.java b/helix-core/src/main/java/org/apache/helix/model/package-info.java
new file mode 100644
index 0000000..91e6e73
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix classes for all types of znodes
+ *
+ */
+package org.apache.helix.model;
\ No newline at end of file