You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC

[6/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ZNRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecord.java b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
new file mode 100644
index 0000000..b17bc87
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
@@ -0,0 +1,364 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecordDelta.MergeOperation;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Generic Record Format to store data at a Node This can be used to store
+ * simpleFields mapFields listFields
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ZNRecord
+{
+  static Logger _logger = Logger.getLogger(ZNRecord.class);
+  private final String id;
+
+  @JsonIgnore(true)
+  public static final String LIST_FIELD_BOUND = "listField.bound";
+
+  @JsonIgnore(true)
+  public static final int SIZE_LIMIT = 1000 * 1024;  // leave a margin out of 1M
+
+  // We don't want the _deltaList to be serialized and deserialized
+  private List<ZNRecordDelta> _deltaList = new ArrayList<ZNRecordDelta>();
+
+  private Map<String, String> simpleFields;
+  private Map<String, Map<String, String>> mapFields;
+  private Map<String, List<String>> listFields;
+
+  // the version field of zookeeper Stat
+  private int _version;
+
+  private long _creationTime;
+
+  private long _modifiedTime;
+
+  @JsonCreator
+  public ZNRecord(@JsonProperty("id") String id)
+  {
+    this.id = id;
+    simpleFields = new TreeMap<String, String>();
+    mapFields = new TreeMap<String, Map<String, String>>();
+    listFields = new TreeMap<String, List<String>>();
+  }
+
+  public ZNRecord(ZNRecord record)
+  {
+    this(record, record.getId());
+  }
+
+  public ZNRecord(ZNRecord record, String id)
+  {
+    this(id);
+    simpleFields.putAll(record.getSimpleFields());
+    mapFields.putAll(record.getMapFields());
+    listFields.putAll(record.getListFields());
+    _version = record.getVersion();
+    _creationTime = record.getCreationTime();
+    _modifiedTime = record.getModifiedTime();
+  }
+
+  public ZNRecord(ZNRecord record, int version)
+  {
+    this(record);
+    _version = version;
+  }
+
+  @JsonIgnore(true)
+  public void setDeltaList(List<ZNRecordDelta> deltaList)
+  {
+    _deltaList = deltaList;
+  }
+
+  @JsonIgnore(true)
+  public List<ZNRecordDelta> getDeltaList()
+  {
+    return _deltaList;
+  }
+
+  @JsonProperty
+  public Map<String, String> getSimpleFields()
+  {
+    return simpleFields;
+  }
+
+  @JsonProperty
+  public void setSimpleFields(Map<String, String> simpleFields)
+  {
+    this.simpleFields = simpleFields;
+  }
+
+  @JsonProperty
+  public Map<String, Map<String, String>> getMapFields()
+  {
+    return mapFields;
+  }
+
+  @JsonProperty
+  public void setMapFields(Map<String, Map<String, String>> mapFields)
+  {
+    this.mapFields = mapFields;
+  }
+
+  @JsonProperty
+  public Map<String, List<String>> getListFields()
+  {
+    return listFields;
+  }
+
+  @JsonProperty
+  public void setListFields(Map<String, List<String>> listFields)
+  {
+    this.listFields = listFields;
+  }
+
+  @JsonProperty
+  public void setSimpleField(String k, String v)
+  {
+    simpleFields.put(k, v);
+  }
+
+  @JsonProperty
+  public String getId()
+  {
+    return id;
+  }
+
+  public void setMapField(String k, Map<String, String> v)
+  {
+    mapFields.put(k, v);
+  }
+
+  public void setListField(String k, List<String> v)
+  {
+    listFields.put(k, v);
+  }
+
+  public String getSimpleField(String k)
+  {
+    return simpleFields.get(k);
+  }
+
+  public Map<String, String> getMapField(String k)
+  {
+    return mapFields.get(k);
+  }
+
+  public List<String> getListField(String k)
+  {
+    return listFields.get(k);
+  }
+
+  @Override
+  public String toString()
+  {
+    StringBuffer sb = new StringBuffer();
+    sb.append(id + ", ");
+    if (simpleFields != null)
+    {
+      sb.append(simpleFields);
+    }
+    if (mapFields != null)
+    {
+      sb.append(mapFields);
+    }
+    if (listFields != null)
+    {
+      sb.append(listFields);
+    }
+    return sb.toString();
+  }
+
+  /**
+   * merge functionality is used to merge multiple znrecord into a single one.
+   * This will make use of the id of each ZNRecord and append it to every key
+   * thus making key unique. This is needed to optimize on the watches.
+   *
+   * @param record
+   */
+  public void merge(ZNRecord record)
+  {
+    if (record == null)
+    {
+      return;
+    }
+
+    if (record.getDeltaList().size() > 0)
+    {
+      _logger.info("Merging with delta list, recordId = " + id + " other:"
+          + record.getId());
+      merge(record.getDeltaList());
+      return;
+    }
+    simpleFields.putAll(record.simpleFields);
+    for (String key : record.mapFields.keySet())
+    {
+      Map<String, String> map = mapFields.get(key);
+      if (map != null)
+      {
+        map.putAll(record.mapFields.get(key));
+      } else
+      {
+        mapFields.put(key, record.mapFields.get(key));
+      }
+    }
+    for (String key : record.listFields.keySet())
+    {
+      List<String> list = listFields.get(key);
+      if (list != null)
+      {
+        list.addAll(record.listFields.get(key));
+      } else
+      {
+        listFields.put(key, record.listFields.get(key));
+      }
+    }
+  }
+
+  void merge(ZNRecordDelta delta)
+  {
+    if (delta.getMergeOperation() == MergeOperation.ADD)
+    {
+      merge(delta.getRecord());
+    } else if (delta.getMergeOperation() == MergeOperation.SUBTRACT)
+    {
+      subtract(delta.getRecord());
+    }
+  }
+
+  void merge(List<ZNRecordDelta> deltaList)
+  {
+    for (ZNRecordDelta delta : deltaList)
+    {
+      merge(delta);
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (!(obj instanceof ZNRecord))
+    {
+      return false;
+    }
+    ZNRecord that = (ZNRecord) obj;
+    if (this.getSimpleFields().size() != that.getSimpleFields().size())
+    {
+      return false;
+    }
+    if (this.getMapFields().size() != that.getMapFields().size())
+    {
+      return false;
+    }
+    if (this.getListFields().size() != that.getListFields().size())
+    {
+      return false;
+    }
+    if (!this.getSimpleFields().equals(that.getSimpleFields()))
+    {
+      return false;
+    }
+    if (!this.getMapFields().equals(that.getMapFields()))
+    {
+      return false;
+    }
+    if (!this.getListFields().equals(that.getListFields()))
+    {
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Note: does not support subtract in each list in list fields or map in
+   * mapFields
+   */
+  public void subtract(ZNRecord value)
+  {
+    for (String key : value.getSimpleFields().keySet())
+    {
+      if (simpleFields.containsKey(key))
+      {
+        simpleFields.remove(key);
+      }
+    }
+
+    for (String key : value.getListFields().keySet())
+    {
+      if (listFields.containsKey(key))
+      {
+        listFields.remove(key);
+      }
+    }
+
+    for (String key : value.getMapFields().keySet())
+    {
+      if (mapFields.containsKey(key))
+      {
+        mapFields.remove(key);
+      }
+    }
+  }
+
+  @JsonIgnore(true)
+  public int getVersion()
+  {
+    return _version;
+  }
+
+  @JsonIgnore(true)
+  public void setVersion(int version)
+  {
+    _version = version;
+  }
+
+  @JsonIgnore(true)
+  public long getCreationTime()
+  {
+    return _creationTime;
+  }
+
+  @JsonIgnore(true)
+  public void setCreationTime(long creationTime)
+  {
+    _creationTime = creationTime;
+  }
+
+  @JsonIgnore(true)
+  public long getModifiedTime()
+  {
+    return _modifiedTime;
+  }
+
+  @JsonIgnore(true)
+  public void setModifiedTime(long modifiedTime)
+  {
+    _modifiedTime = modifiedTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ZNRecordAssembler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecordAssembler.java b/helix-core/src/main/java/org/apache/helix/ZNRecordAssembler.java
new file mode 100644
index 0000000..4958f90
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecordAssembler.java
@@ -0,0 +1,30 @@
+package org.apache.helix;
+
+import java.util.List;
+
+public class ZNRecordAssembler
+{
+  public ZNRecord assemble(List<ZNRecord> records)
+  {
+    ZNRecord assembledRecord = null;
+    if (records != null && records.size() > 0)
+    {
+      for (ZNRecord record : records)
+      {
+        if (record == null)
+        {
+          continue;
+        }
+
+        if (assembledRecord == null)
+        {
+          assembledRecord = new ZNRecord(record.getId());
+        }
+
+        assembledRecord.merge(record);
+      }
+    }
+    return assembledRecord;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ZNRecordBucketizer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecordBucketizer.java b/helix-core/src/main/java/org/apache/helix/ZNRecordBucketizer.java
new file mode 100644
index 0000000..52704df
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecordBucketizer.java
@@ -0,0 +1,116 @@
+package org.apache.helix;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class ZNRecordBucketizer
+{
+  private static Logger LOG = Logger.getLogger(ZNRecordBucketizer.class);
+  final int             _bucketSize;
+
+  public ZNRecordBucketizer(int bucketSize)
+  {
+    if (bucketSize <= 0)
+    {
+      LOG.debug("bucketSize <= 0 (was " + bucketSize
+          + "). Set to 0 to use non-bucketized HelixProperty.");
+      bucketSize = 0;
+    }
+
+    _bucketSize = bucketSize;
+  }
+
+  /**
+   * Calculate bucketName in form of "resourceName_p{startPartition}-p{endPartition}
+   * 
+   * @param partitionName
+   * @return
+   */
+  public String getBucketName(String key)
+  {
+    if (_bucketSize == 0)
+    {
+      // no bucketize
+      return null;
+    }
+
+    int idx = key.lastIndexOf('_');
+    if (idx < 0)
+    {
+      throw new IllegalArgumentException("Could NOT find partition# in " + key
+          + ". partitionName should be in format of resourceName_partition#");
+    }
+
+    try
+    {
+      int partitionNb = Integer.parseInt(key.substring(idx + 1));
+      int bucketNb = partitionNb / _bucketSize;
+      int startPartition = bucketNb * _bucketSize;
+      int endPartition = bucketNb * _bucketSize + (_bucketSize - 1);
+      return key.substring(0, idx) + "_p" + startPartition + "-p" + endPartition;
+    }
+    catch (NumberFormatException e)
+    {
+      throw new IllegalArgumentException("Could NOT parse partition# ("
+          + key.substring(idx + 1) + ") in " + key);
+    }
+  }
+
+  public Map<String, ZNRecord> bucketize(ZNRecord record)
+  {
+    Map<String, ZNRecord> map = new HashMap<String, ZNRecord>();
+    if (_bucketSize == 0)
+    {
+      map.put(record.getId(), record);
+      return map;
+    }
+    
+    // bucketize list field
+    for (String partitionName : record.getListFields().keySet())
+    {
+      String bucketName = getBucketName(partitionName);
+      if (bucketName != null)
+      {
+        if (!map.containsKey(bucketName))
+        {
+          map.put(bucketName, new ZNRecord(bucketName));
+        }
+        ZNRecord bucketizedRecord = map.get(bucketName);
+        bucketizedRecord.setListField(partitionName, record.getListField(partitionName));
+      }
+      else
+      {
+        LOG.error("Can't bucketize " + partitionName + " in list field");
+      }
+    }
+
+    // bucketize map field
+    for (String partitionName : record.getMapFields().keySet())
+    {
+      String bucketName = getBucketName(partitionName);
+      if (bucketName != null)
+      {
+        if (!map.containsKey(bucketName))
+        {
+          map.put(bucketName, new ZNRecord(bucketName));
+        }
+        ZNRecord bucketizedRecord = map.get(bucketName);
+        bucketizedRecord.setMapField(partitionName, record.getMapField(partitionName));
+      }
+      else
+      {
+        LOG.error("Can't bucketize " + partitionName + " in map field");
+      }
+    }
+    
+    // copy all simple fields
+    for (ZNRecord bucketizedRecord : map.values())
+    {
+      bucketizedRecord.setSimpleFields(record.getSimpleFields());
+    }
+    return map;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java b/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java
new file mode 100644
index 0000000..f2a4636
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+public class ZNRecordDelta
+{
+  public enum MergeOperation {ADD, SUBTRACT};
+  public ZNRecord _record;
+  public MergeOperation _mergeOperation;
+
+  public ZNRecordDelta(ZNRecord record, MergeOperation _mergeOperation)
+  {
+    _record = new ZNRecord(record);
+    this._mergeOperation = _mergeOperation;
+  }
+
+  public ZNRecordDelta(ZNRecord record)
+  {
+    this(record, MergeOperation.ADD);
+  }
+
+  public ZNRecordDelta()
+  {
+    this(new ZNRecord(""), MergeOperation.ADD);
+  }
+
+  public ZNRecord getRecord()
+  {
+    return _record;
+  }
+
+  public MergeOperation getMergeOperation()
+  {
+    return _mergeOperation;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ZNRecordUpdater.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecordUpdater.java b/helix-core/src/main/java/org/apache/helix/ZNRecordUpdater.java
new file mode 100644
index 0000000..2a2bb7b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecordUpdater.java
@@ -0,0 +1,24 @@
+package org.apache.helix;
+
+import org.I0Itec.zkclient.DataUpdater;
+
+public class ZNRecordUpdater implements DataUpdater<ZNRecord>
+{
+  final ZNRecord _record;
+
+  public ZNRecordUpdater(ZNRecord record)
+  {
+    _record = record;
+  }
+
+  @Override
+  public ZNRecord update(ZNRecord current)
+  {
+    if (current != null)
+    {
+      current.merge(_record);
+      return current;
+    }
+    return _record;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/AccumulateAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/AccumulateAggregator.java b/helix-core/src/main/java/org/apache/helix/alerts/AccumulateAggregator.java
new file mode 100644
index 0000000..ba2c847
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/AccumulateAggregator.java
@@ -0,0 +1,71 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Iterator;
+
+import org.apache.helix.HelixException;
+
+
+public class AccumulateAggregator extends Aggregator {
+
+	
+	public AccumulateAggregator() 
+	{
+		_numArgs = 0;
+	}
+	
+	@Override
+	public void merge(Tuple<String> currValTup, Tuple<String> newValTup,
+			Tuple<String> currTimeTup, Tuple<String> newTimeTup, String... args) {
+	
+		double currVal = 0;
+		double currTime = -1;
+		double newVal;
+		double newTime;
+		double mergedVal;
+		double mergedTime;
+		
+		if (currValTup == null || newValTup == null || currTimeTup == null ||
+				newTimeTup == null) {
+			throw new HelixException("Tuples cannot be null");
+		}
+		
+		//old tuples may be empty, indicating no value/time exist
+		if (currValTup.size() > 0 && currTimeTup.size() > 0) {
+			currVal = Double.parseDouble(currValTup.iterator().next());
+			currTime = Double.parseDouble(currTimeTup.iterator().next());
+		}
+		newVal = Double.parseDouble(newValTup.iterator().next());
+		newTime = Double.parseDouble(newTimeTup.iterator().next());
+		
+		if (newTime > currTime) { //if old doesn't exist, we end up here
+			mergedVal = currVal+newVal; //if old doesn't exist, it has value "0"
+			mergedTime = newTime;
+		}
+		else {
+			mergedVal = currVal;
+			mergedTime = currTime;
+		}
+	
+		currValTup.clear();
+		currValTup.add(Double.toString(mergedVal));
+		currTimeTup.clear();
+		currTimeTup.add(Double.toString(mergedTime));
+	}
+
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/Aggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Aggregator.java b/helix-core/src/main/java/org/apache/helix/alerts/Aggregator.java
new file mode 100644
index 0000000..6d905a0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/Aggregator.java
@@ -0,0 +1,38 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+public abstract class Aggregator {
+
+	int _numArgs;
+	
+	public Aggregator()
+	{
+		
+	}
+	
+	/*
+	 * Take curr and new values.  Update curr.
+	 */
+	public abstract void merge(Tuple<String> currVal, Tuple<String> newVal, 
+			Tuple<String> currTime, Tuple<String> newTime, String... args);
+	
+	public int getRequiredNumArgs()
+	{
+		return _numArgs;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/Alert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Alert.java b/helix-core/src/main/java/org/apache/helix/alerts/Alert.java
new file mode 100644
index 0000000..59dd548
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/Alert.java
@@ -0,0 +1,53 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+public class Alert {
+	
+	String _name;
+	
+	String _expression;
+	String _comparator;
+	Tuple<String> _constant;
+	
+	public Alert(String name, String expression, String comparator, Tuple<String> constant)
+	{
+		_name=name;
+		_expression=expression;
+		_comparator=comparator;
+		_constant=constant;
+	}
+	
+	public String getName()
+	{
+		return _name;
+	}
+	
+	public String getExpression()
+	{
+		return _expression;
+	}
+	
+	public String getComparator()
+	{
+		return _comparator;
+	}
+	
+	public Tuple<String> getConstant() 
+	{
+		return _constant;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/AlertComparator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/AlertComparator.java b/helix-core/src/main/java/org/apache/helix/alerts/AlertComparator.java
new file mode 100644
index 0000000..de0536c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/AlertComparator.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+public abstract class AlertComparator {
+
+	public AlertComparator()
+	{
+		
+	}
+	
+	public abstract boolean evaluate(Tuple<String> leftTup, Tuple<String> rightTup);
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/AlertParser.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/AlertParser.java b/helix-core/src/main/java/org/apache/helix/alerts/AlertParser.java
new file mode 100644
index 0000000..5008567
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/AlertParser.java
@@ -0,0 +1,147 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
+import org.apache.log4j.Logger;
+
+
+public class AlertParser {
+	private static Logger logger = Logger.getLogger(AlertParser.class);
+	
+	public static final String EXPRESSION_NAME = "EXP";
+	public static final String COMPARATOR_NAME = "CMP";
+	public static final String CONSTANT_NAME = "CON";
+	public static final String ACTION_NAME = "ACTION";
+	
+	static Map<String, AlertComparator> comparatorMap = new HashMap<String, AlertComparator>();
+	
+	static
+	  {
+		  
+		  addComparatorEntry("GREATER", new GreaterAlertComparator());
+	  }
+	
+	 private static void addComparatorEntry(String label, AlertComparator comp)
+	  {
+		  if (!comparatorMap.containsKey(label))
+		    {
+		      comparatorMap.put(label, comp);
+		    }
+		    logger.info("Adding comparator: "+comp);
+	  }
+	
+	public static AlertComparator getComparator(String compName)
+	{
+		compName = compName.replaceAll("\\s+", ""); //remove white space
+		if (!comparatorMap.containsKey(compName)) {
+			throw new HelixException("Comparator type <"+compName+"> unknown");
+		}
+		return comparatorMap.get(compName);
+	}
+	 
+	public static String getComponent(String component, String alert) throws HelixException
+	{
+		//find EXP and keep going until paren are closed
+		int expStartPos = alert.indexOf(component);
+		if (expStartPos < 0) {
+			throw new HelixException(alert+" does not contain component "+component);
+		}
+		expStartPos += (component.length()+1); //advance length of string and one for open paren
+		int expEndPos = expStartPos;
+		int openParenCount = 1;
+		while (openParenCount > 0) {
+			if (alert.charAt(expEndPos) == '(') {
+				openParenCount++;
+			}
+			else if (alert.charAt(expEndPos) == ')') {
+				openParenCount--;
+			}
+			expEndPos++;
+		}
+		if (openParenCount != 0) {
+			throw new HelixException(alert+" does not contain valid "+component+" component, " +
+					"parentheses do not close");
+		}
+		//return what is in between paren
+		return alert.substring(expStartPos, expEndPos-1);
+	}
+	
+	public static boolean validateAlert(String alert) throws HelixException
+	{
+		//TODO: decide if toUpperCase is going to cause problems with stuff like db name
+		alert = alert.replaceAll("\\s+", ""); //remove white space
+		String exp = getComponent(EXPRESSION_NAME, alert);
+		String cmp = getComponent(COMPARATOR_NAME, alert);
+		String val = getComponent(CONSTANT_NAME, alert);
+		logger.debug("exp: "+exp);
+		logger.debug("cmp: "+cmp);
+		logger.debug("val: "+val);
+		
+		//separately validate each portion
+		ExpressionParser.validateExpression(exp);
+		
+		//validate comparator
+		if (!comparatorMap.containsKey(cmp.toUpperCase())) {
+			throw new HelixException("Unknown comparator type "+cmp);
+		}
+		String actionValue = null;
+    try
+    {
+      actionValue = AlertParser.getComponent(AlertParser.ACTION_NAME, alert);
+    }
+    catch(Exception e)
+    {
+      logger.info("No action specified in " + alert);
+    }
+
+    if(actionValue != null)
+    {
+      validateActionValue(actionValue);
+    }
+		//ValParser.  Probably don't need this.  Just make sure it's a valid tuple.  But would also be good
+		//to validate that the tuple is same length as exp's output...maybe leave that as future todo
+		//not sure we can really do much here though...anything can be in a tuple.
+		
+		//TODO: try to compare tuple width of CON against tuple width of agg type!  Not a good idea, what if
+		//is not at full width yet, like with window  
+		
+		//if all of this passes, then we can safely record the alert in zk.  still need to implement zk location
+		
+		return false;
+	}
+	
+  public static void validateActionValue(String actionValue)
+  {
+    try
+    {
+      ActionOnError actionVal = ActionOnError.valueOf(actionValue);
+    }
+    catch(Exception e)
+    {
+      String validActions = "";
+      for (ActionOnError action : ActionOnError.values())
+      {
+        validActions = validActions + action + " ";
+      }
+      throw new HelixException("Unknown cmd type " + actionValue + ", valid types : " + validActions);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/AlertProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/AlertProcessor.java b/helix-core/src/main/java/org/apache/helix/alerts/AlertProcessor.java
new file mode 100644
index 0000000..fba4dbf
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/AlertProcessor.java
@@ -0,0 +1,355 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.healthcheck.StatHealthReportProvider;
+import org.apache.log4j.Logger;
+
+
+public class AlertProcessor
+{
+  private static Logger logger = Logger.getLogger(AlertProcessor.class);
+
+  private static final String bindingDelim = ",";
+  public static final String noWildcardAlertKey = "*";
+
+  StatsHolder _statsHolder;
+
+  // AlertsHolder _alertsHolder;
+
+  /*
+   * public AlertProcessor(StatHealthReportProvider statProvider) {
+   * 
+   * }
+   */
+
+  public AlertProcessor(StatsHolder sh)
+  {
+    _statsHolder = sh;
+  }
+
+  public static Map<String, List<Tuple<String>>> initAlertStatTuples(Alert alert)
+  {
+    // get the stats out of the alert
+    String[] alertStats = ExpressionParser.getBaseStats(alert.getExpression());
+    // init a tuple list for each alert stat
+    Map<String, List<Tuple<String>>> alertStatTuples = new HashMap<String, List<Tuple<String>>>();
+    for (String currAlertStat : alertStats)
+    {
+      List<Tuple<String>> currList = new ArrayList<Tuple<String>>();
+      alertStatTuples.put(currAlertStat, currList);
+    }
+    return alertStatTuples;
+  }
+
+  /*
+   * //this function is all messed up!!! public static void
+   * populateAlertStatTuples(Map<String,List<Tuple<String>>> tupleLists,
+   * List<Stat> persistentStats) { Set<String> alertStatNames =
+   * tupleLists.keySet(); for (Stat persistentStat : persistentStats) { //ignore
+   * stats with wildcards, they don't have values...they are just there to catch
+   * new actual stats if
+   * (ExpressionParser.statContainsWildcards(persistentStat.getName())) {
+   * continue; } Iterator<String> alertStatIter = alertStatNames.iterator();
+   * while (alertStatIter.hasNext()) { String currAlertStat =
+   * alertStatIter.next(); if
+   * (ExpressionParser.isAlertStatExactMatch(currAlertStat,
+   * persistentStat.getName()) ||
+   * ExpressionParser.isAlertStatWildcardMatch(currAlertStat,
+   * persistentStat.getName())) {
+   * tupleLists.get(currAlertStat).add(persistentStat.getValue()); } } } }
+   */
+
+  public static String formAlertKey(ArrayList<String> bindings)
+  {
+    if (bindings.size() == 0)
+    {
+      return null;
+    }
+    StringBuilder alertKey = new StringBuilder();
+    boolean emptyKey = true;
+    for (String binding : bindings)
+    {
+      if (!emptyKey)
+      {
+        alertKey.append(bindingDelim);
+      }
+      alertKey.append(binding);
+      emptyKey = false;
+    }
+    return alertKey.toString();
+  }
+
+  // XXX: major change here. return ArrayList of Stats instead of ArrayList of
+  // Tuple<String>'s
+  public static Map<String, ArrayList<Tuple<String>>> populateAlertStatTuples(
+      String[] alertStats, List<Stat> persistentStats)
+  {
+    Map<String, ArrayList<Tuple<String>>> tupleSets = new HashMap<String, ArrayList<Tuple<String>>>();
+
+    // check each persistentStat, alertStat pair
+    for (Stat persistentStat : persistentStats)
+    {
+      // ignore stats with wildcards, they don't have values...they are just
+      // there to catch new actual stats
+      if (ExpressionParser.statContainsWildcards(persistentStat.getName()))
+      {
+        continue;
+      }
+      for (int i = 0; i < alertStats.length; i++)
+      {
+        String alertStat = alertStats[i];
+        ArrayList<String> wildcardBindings = new ArrayList<String>();
+        // if match, then proceed. If the match is wildcard, additionally fill
+        // in the wildcard bindings
+        if (ExpressionParser.isAlertStatExactMatch(alertStat,
+            persistentStat.getName())
+            || ExpressionParser.isAlertStatWildcardMatch(alertStat,
+                persistentStat.getName(), wildcardBindings))
+        {
+          String alertKey;
+          if (wildcardBindings.size() == 0)
+          {
+            alertKey = noWildcardAlertKey;
+          }
+          else
+          {
+            alertKey = formAlertKey(wildcardBindings);
+          }
+          if (!tupleSets.containsKey(alertKey))
+          { // don't have an entry for alertKey yet, create one
+            ArrayList<Tuple<String>> tuples = new ArrayList<Tuple<String>>(
+                alertStats.length);
+            for (int j = 0; j < alertStats.length; j++)
+            { // init all entries to null
+              tuples.add(j, null);
+            }
+            tupleSets.put(alertKey, tuples); // add to map
+          }
+          tupleSets.get(alertKey).set(i, persistentStat.getValue());
+        }
+      }
+    }
+
+    // post-processing step to discard any rows with null vals...
+    // TODO: decide if this is best thing to do with incomplete rows
+    List<String> selectedKeysToRemove = new ArrayList<String>();
+    for (String setKey : tupleSets.keySet())
+    {
+      ArrayList<Tuple<String>> tupleSet = tupleSets.get(setKey);
+      for (Tuple<String> tup : tupleSet)
+      {
+        if (tup == null)
+        {
+          selectedKeysToRemove.add(setKey);
+          break; // move on to next setKey
+        }
+      }
+    }
+    for(String keyToRemove : selectedKeysToRemove)
+    {
+      tupleSets.remove(keyToRemove);
+    }
+
+    // convert above to a series of iterators
+
+    return tupleSets;
+  }
+
+  public static List<Iterator<Tuple<String>>> convertTupleRowsToTupleColumns(
+      Map<String, ArrayList<Tuple<String>>> tupleMap)
+  {
+    // input is a map of key -> list of tuples. each tuple list is same length
+    // output should be a list of iterators. each column in input becomes
+    // iterator in output
+
+    ArrayList<ArrayList<Tuple<String>>> columns = new ArrayList<ArrayList<Tuple<String>>>();
+    ArrayList<Iterator<Tuple<String>>> columnIters = new ArrayList<Iterator<Tuple<String>>>();
+    for (String currStat : tupleMap.keySet())
+    {
+      List<Tuple<String>> currSet = tupleMap.get(currStat);
+      for (int i = 0; i < currSet.size(); i++)
+      {
+        if (columns.size() < (i + 1))
+        {
+          ArrayList<Tuple<String>> col = new ArrayList<Tuple<String>>();
+          columns.add(col);
+        }
+        columns.get(i).add(currSet.get(i));
+      }
+    }
+    for (ArrayList<Tuple<String>> al : columns)
+    {
+      columnIters.add(al.iterator());
+    }
+    return columnIters;
+
+  }
+
+  public static Iterator<Tuple<String>> executeOperatorPipeline(
+      List<Iterator<Tuple<String>>> tupleIters, String[] operators)
+  {
+    List<Iterator<Tuple<String>>> nextIters = tupleIters;
+    if (operators != null)
+    {
+      for (String opName : operators)
+      {
+        Operator op = ExpressionParser.getOperator(opName);
+        nextIters = op.execute(nextIters);
+      }
+    }
+
+    if (nextIters.size() != 1)
+    {
+      throw new HelixException("operator pipeline produced " + nextIters.size()
+          + " tuple sets instead of exactly 1");
+    }
+
+    return nextIters.get(0);
+  }
+
+  /*
+   * TODO: consider returning actual values, rather than bools. Could just
+   * return the triggered alerts
+   */
+  public static ArrayList<AlertValueAndStatus> executeComparator(
+      Iterator<Tuple<String>> tuples, String comparatorName,
+      Tuple<String> constant)
+  {
+    ArrayList<AlertValueAndStatus> results = new ArrayList<AlertValueAndStatus>();
+    AlertComparator cmp = AlertParser.getComparator(comparatorName);
+
+    while (tuples.hasNext())
+    {
+      Tuple<String> currTup = tuples.next();
+      boolean fired = cmp.evaluate(currTup, constant);
+      results.add(new AlertValueAndStatus(currTup, fired));
+      // results.add(cmp.evaluate(currTup, constant));
+    }
+    return results;
+
+  }
+
+  /*
+   * public static void executeAlert(Alert alert, List<Stat> stats) { //init
+   * tuple lists and populate them Map<String,List<Tuple<String>>>
+   * alertStatTupleSets = initAlertStatTuples(alert);
+   * populateAlertStatTuples(alertStatTupleSets, stats); //TODO: not sure I am
+   * being careful enough with sticking stats that match each other in this
+   * list! //convert to operator friendly format List<Iterator<Tuple<String>>>
+   * tupleIters = convertTupleSetsToTupleIterators(alertStatTupleSets); //get
+   * the operators String[] operators =
+   * ExpressionParser.getOperators(alert.getExpression()); //do operator
+   * pipeline Iterator<Tuple<String>> opResultTuples =
+   * executeOperatorPipeline(tupleIters, operators); //execute comparator for
+   * tuple list ArrayList<Boolean> evalResults =
+   * executeComparator(opResultTuples, alert.getComparator(),
+   * alert.getConstant());
+   * 
+   * //TODO: convey this back to execute all
+   * 
+   * }
+   */
+
+  public static HashMap<String, AlertValueAndStatus> generateResultMap(
+      Set<String> alertStatBindings, ArrayList<AlertValueAndStatus> evalResults)
+  {
+    HashMap<String, AlertValueAndStatus> resultMap = new HashMap<String, AlertValueAndStatus>();
+    Iterator<String> bindingIter = alertStatBindings.iterator();
+    Iterator<AlertValueAndStatus> resultIter = evalResults.iterator();
+    if (alertStatBindings.size() != evalResults.size())
+    {
+      // can't match up alerts bindings to results
+      while (resultIter.hasNext())
+      {
+        resultMap.put(noWildcardAlertKey, resultIter.next());
+      }
+    }
+    else
+    {
+      // they do match up
+      while (resultIter.hasNext())
+      {
+        resultMap.put(bindingIter.next(), resultIter.next());
+      }
+    }
+    return resultMap;
+  }
+
+  public static HashMap<String, AlertValueAndStatus> executeAlert(Alert alert,
+      List<Stat> persistedStats)
+  {
+    // init tuple lists and populate them
+    // Map<String,List<Tuple<String>>> alertStatTupleSets =
+    // initAlertStatTuples(alert);
+
+    String[] alertStats = ExpressionParser.getBaseStats(alert.getExpression());
+
+    Map<String, ArrayList<Tuple<String>>> alertsToTupleRows = populateAlertStatTuples(
+        alertStats, persistedStats);
+
+    if (alertsToTupleRows.size() == 0)
+    {
+      return null;
+    }
+    // convert to operator friendly format
+    List<Iterator<Tuple<String>>> tupleIters = convertTupleRowsToTupleColumns(alertsToTupleRows);
+    // get the operators
+    String[] operators = ExpressionParser.getOperators(alert.getExpression());
+    // do operator pipeline
+    Iterator<Tuple<String>> opResultTuples = executeOperatorPipeline(
+        tupleIters, operators);
+    // execute comparator for tuple list
+    ArrayList<AlertValueAndStatus> evalResults = executeComparator(
+        opResultTuples, alert.getComparator(), alert.getConstant());
+
+    // stitch alert bindings back together with final result
+    // XXX: there is a non-critical bug here. if we have an aggregating
+    // operator, but that operator only takes one input,
+    // we bind to original wildcard binding, instead of to "*"
+
+    HashMap<String, AlertValueAndStatus> alertBindingsToResult = generateResultMap(
+        alertsToTupleRows.keySet(), evalResults);
+
+    return alertBindingsToResult;
+
+  }
+
+  public static Map<String, Map<String, AlertValueAndStatus>> executeAllAlerts(
+      List<Alert> alerts, List<Stat> stats)
+  {
+    Map<String, Map<String, AlertValueAndStatus>> alertsResults = new HashMap<String, Map<String, AlertValueAndStatus>>();
+
+    for (Alert alert : alerts)
+    {
+      HashMap<String, AlertValueAndStatus> result = executeAlert(alert, stats);
+      // TODO: decide if sticking null results in here is ok
+      alertsResults.put(alert.getName(), result);
+    }
+
+    return alertsResults;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/AlertValueAndStatus.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/AlertValueAndStatus.java b/helix-core/src/main/java/org/apache/helix/alerts/AlertValueAndStatus.java
new file mode 100644
index 0000000..5dbfee8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/AlertValueAndStatus.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+public class AlertValueAndStatus {
+	public final static String VALUE_NAME = "value";
+	public final static String FIRED_NAME = "fired";
+	
+	private Tuple<String> value;
+	private boolean fired;
+	
+	public AlertValueAndStatus(Tuple<String> value, boolean fired)
+	{
+		this.value = value;
+		this.fired = fired;
+	}
+
+	public Tuple<String> getValue() {
+		return value;
+	}
+
+	public boolean isFired() {
+		return fired;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/AlertsHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/AlertsHolder.java b/helix-core/src/main/java/org/apache/helix/alerts/AlertsHolder.java
new file mode 100644
index 0000000..bca9b6a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/AlertsHolder.java
@@ -0,0 +1,286 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.stages.HealthDataCache;
+import org.apache.helix.model.AlertStatus;
+import org.apache.helix.model.Alerts;
+import org.apache.log4j.Logger;
+
+
+public class AlertsHolder {
+
+	private static final Logger logger = Logger
+		    .getLogger(AlertsHolder.class.getName());
+
+	HelixDataAccessor _accessor;
+	HealthDataCache _cache;
+	Map<String, Map<String,String>> _alertsMap; //not sure if map or set yet
+	Map<String, Map<String,String>> _alertStatusMap;
+	//Alerts _alerts;
+	HashSet<String> alerts;
+	StatsHolder _statsHolder;
+
+  private final HelixManager _manager;
+
+  private Builder _keyBuilder;
+
+	public AlertsHolder(HelixManager manager, HealthDataCache cache)
+	{
+	  this(manager, cache, new StatsHolder(manager, cache));
+	}
+	
+	public AlertsHolder(HelixManager manager, HealthDataCache cache, StatsHolder statHolder)
+  {
+    _manager = manager;
+    _accessor = manager.getHelixDataAccessor();
+    _cache = cache;
+    _statsHolder = statHolder;
+    _keyBuilder = new PropertyKey.Builder(_manager.getClusterName());
+    updateCache(_cache);
+  }
+
+	public void refreshAlerts()
+	{
+	  _cache.refresh(_accessor);
+		updateCache(_cache);
+
+
+		/*
+		_alertsMap = _cache.getAlerts();
+		//TODO: confirm this a good place to init the _statMap when null
+		if (_alertsMap == null) {
+			_alertsMap = new HashMap<String, Map<String,String>>();
+		}\*/
+	}
+
+	public void refreshAlertStatus()
+	{
+		AlertStatus alertStatusRecord = _cache.getAlertStatus();
+		if (alertStatusRecord != null) {
+		_alertStatusMap = alertStatusRecord.getMapFields();
+		}
+		else {
+			_alertStatusMap = new HashMap<String, Map<String,String>>();
+		}
+	}
+
+	public void persistAlerts()
+	{
+		//XXX: Am I using _accessor too directly here?
+	  
+		Alerts alerts = _accessor.getProperty(_keyBuilder.alerts());
+    if (alerts == null) {
+      alerts = new Alerts(Alerts.nodeName); //TODO: fix naming of this record, if it matters
+		}
+		alerts.getRecord().setMapFields(_alertsMap);
+		 boolean retVal = _accessor.setProperty(_keyBuilder.alerts(), alerts);
+		 logger.debug("persistAlerts retVal: "+retVal);
+	}
+
+	public void persistAlertStatus()
+	{
+		//XXX: Am I using _accessor too directly here?
+	  AlertStatus alertStatus = _accessor.getProperty(_keyBuilder.alertStatus());
+		if (alertStatus == null) {
+		  alertStatus = new AlertStatus(AlertStatus.nodeName); //TODO: fix naming of this record, if it matters
+		}
+		alertStatus.getRecord().setMapFields(_alertStatusMap);
+		boolean retVal = _accessor.setProperty(_keyBuilder.alertStatus(), alertStatus);
+		logger.debug("persistAlerts retVal: "+retVal);
+	}
+
+	//read alerts from cm state
+	private void readExistingAlerts()
+	{
+
+	}
+
+	public void addAlert(String alert) throws HelixException
+	{
+		alert = alert.replaceAll("\\s+", ""); //remove white space
+		AlertParser.validateAlert(alert);
+		refreshAlerts();
+		//stick the 3 alert fields in map
+		Map<String, String> alertFields = new HashMap<String,String>();
+		alertFields.put(AlertParser.EXPRESSION_NAME,
+				AlertParser.getComponent(AlertParser.EXPRESSION_NAME, alert));
+		alertFields.put(AlertParser.COMPARATOR_NAME,
+				AlertParser.getComponent(AlertParser.COMPARATOR_NAME, alert));
+		alertFields.put(AlertParser.CONSTANT_NAME,
+				AlertParser.getComponent(AlertParser.CONSTANT_NAME, alert));
+		try
+		{
+		  alertFields.put(AlertParser.ACTION_NAME,
+	        AlertParser.getComponent(AlertParser.ACTION_NAME, alert));
+		}
+		catch(Exception e)
+		{
+		  logger.info("No action specified in " + alert);
+		}
+		//store the expression as stat
+		_statsHolder.addStat(alertFields.get(AlertParser.EXPRESSION_NAME));
+        _statsHolder.persistStats();
+
+		//naming the alert with the full name
+		_alertsMap.put(alert, alertFields);
+		persistAlerts();
+	}
+
+	/*
+	 * Add a set of alert statuses to ZK
+	 */
+	public void addAlertStatusSet(Map<String, Map<String, AlertValueAndStatus>> statusSet) throws HelixException
+	{
+	  if (_alertStatusMap == null) {
+	    _alertStatusMap = new HashMap<String, Map<String,String>>();
+	  }
+	  _alertStatusMap.clear(); //clear map.  all alerts overwrite old alerts
+	  for (String alert : statusSet.keySet()) {
+	    Map<String,AlertValueAndStatus> currStatus = statusSet.get(alert);
+	    if (currStatus != null) {
+	      addAlertStatus(alert, currStatus);
+	    }
+	  }
+
+	  AlertStatus alertStatus = _accessor.getProperty(_keyBuilder.alertStatus());
+	  int alertStatusSize = 0;
+	  if (alertStatus != null) {
+	    alertStatusSize = alertStatus.getMapFields().size();
+	  }
+	  //no need to persist alerts if there are none to persist and none are currently persisted
+	 if (_alertStatusMap.size() > 0  || alertStatusSize > 0) {
+	  persistAlertStatus(); //save statuses in zk
+	 }
+	}
+
+	private void addAlertStatus(String parentAlertKey, Map<String,AlertValueAndStatus> alertStatus) throws HelixException
+	{
+		//_alertStatusMap = new HashMap<String,Map<String,String>>();
+		for (String alertName : alertStatus.keySet()) {
+			String mapAlertKey;
+			mapAlertKey = parentAlertKey;
+			if (!alertName.equals(ExpressionParser.wildcardChar)) {
+				mapAlertKey = mapAlertKey+" : ("+alertName+")";
+			}
+			AlertValueAndStatus vs = alertStatus.get(alertName);
+			Map<String,String> alertFields = new HashMap<String,String>();
+			alertFields.put(AlertValueAndStatus.VALUE_NAME, vs.getValue().toString());
+			alertFields.put(AlertValueAndStatus.FIRED_NAME, String.valueOf(vs.isFired()));
+			_alertStatusMap.put(mapAlertKey, alertFields);
+		}
+	}
+
+	public AlertValueAndStatus getAlertValueAndStatus(String alertName)
+	{
+		Map<String,String> alertFields = _alertStatusMap.get(alertName);
+		String val = alertFields.get(AlertValueAndStatus.VALUE_NAME);
+		Tuple<String> valTup = new Tuple<String>();
+		valTup.add(val);
+		boolean fired = Boolean.valueOf(alertFields.get(AlertValueAndStatus.FIRED_NAME));
+		AlertValueAndStatus vs = new AlertValueAndStatus(valTup, fired);
+		return vs;
+	}
+
+	public static void parseAlert(String alert, StringBuilder statsName,
+			Map<String,String> alertFields) throws HelixException
+	{
+		alert = alert.replaceAll("\\s+", ""); //remove white space
+		AlertParser.validateAlert(alert);
+		//alertFields = new HashMap<String,String>();
+		alertFields.put(AlertParser.EXPRESSION_NAME,
+				AlertParser.getComponent(AlertParser.EXPRESSION_NAME, alert));
+		alertFields.put(AlertParser.COMPARATOR_NAME,
+				AlertParser.getComponent(AlertParser.COMPARATOR_NAME, alert));
+		alertFields.put(AlertParser.CONSTANT_NAME,
+				AlertParser.getComponent(AlertParser.CONSTANT_NAME, alert));
+		try
+        {
+          alertFields.put(AlertParser.ACTION_NAME,
+          AlertParser.getComponent(AlertParser.ACTION_NAME, alert));
+        }
+        catch(Exception e)
+        {
+           logger.info("No action specified in " + alert);
+        }
+		statsName.append(alertFields.get(AlertParser.EXPRESSION_NAME));
+	}
+
+
+	/*
+	public void evaluateAllAlerts()
+	{
+		for (String alert : _alertsMap.keySet()) {
+			Map<String,String> alertFields = _alertsMap.get(alert);
+			String exp = alertFields.get(AlertParser.EXPRESSION_NAME);
+			String comp = alertFields.get(AlertParser.COMPARATOR_NAME);
+			String con = alertFields.get(AlertParser.CONSTANT_NAME);
+			//TODO: test the fields for null and fail if needed
+
+			AlertProcessor.execute(exp,  comp, con, sh);
+		}
+	}
+	*/
+
+	public List<Alert> getAlertList()
+	{
+		List<Alert> alerts = new LinkedList<Alert>();
+		for (String alert : _alertsMap.keySet()) {
+			Map<String,String> alertFields = _alertsMap.get(alert);
+			String exp = alertFields.get(AlertParser.EXPRESSION_NAME);
+			String comp = alertFields.get(AlertParser.COMPARATOR_NAME);
+			Tuple<String> con = Tuple.fromString(alertFields.get(AlertParser.CONSTANT_NAME));
+			//TODO: test the fields for null and fail if needed
+
+			Alert a = new Alert(alert, exp, comp, con);
+			alerts.add(a);
+		}
+		return alerts;
+	}
+
+  public void updateCache(HealthDataCache cache)
+  {
+    _cache = cache;
+    Alerts alertsRecord = _cache.getAlerts();
+    if (alertsRecord != null) 
+    {
+      _alertsMap = alertsRecord.getMapFields();
+    }
+    else 
+    {
+      _alertsMap = new HashMap<String, Map<String,String>>();
+    }
+  }
+  
+  public Map<String, Map<String,String>> getAlertsMap()
+  {
+    return _alertsMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/DecayAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/DecayAggregator.java b/helix-core/src/main/java/org/apache/helix/alerts/DecayAggregator.java
new file mode 100644
index 0000000..e46415c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/DecayAggregator.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Iterator;
+
+import org.apache.helix.HelixException;
+
+
+public class DecayAggregator extends Aggregator {
+
+	double _decayWeight;
+	
+	public DecayAggregator(double weight)
+	{
+		_decayWeight = weight;
+	}
+	
+	public DecayAggregator() 
+	{
+		_numArgs = 1;
+	}
+	
+	@Override
+	public void merge(Tuple<String> currValTup, Tuple<String> newValTup,
+			Tuple<String> currTimeTup, Tuple<String> newTimeTup, String... args) {
+	
+		_decayWeight = Double.parseDouble(args[0]);
+		
+		double currVal = 0;
+		double currTime = -1;
+		double newVal;
+		double newTime;
+		double mergedVal;
+		double mergedTime;
+		
+		if (currValTup == null || newValTup == null || currTimeTup == null ||
+				newTimeTup == null) {
+			throw new HelixException("Tuples cannot be null");
+		}
+		
+		//old tuples may be empty, indicating no value/time exist
+		if (currValTup.size() > 0 && currTimeTup.size() > 0) {
+			currVal = Double.parseDouble(currValTup.iterator().next());
+			currTime = Double.parseDouble(currTimeTup.iterator().next());
+		}
+		newVal = Double.parseDouble(newValTup.iterator().next());
+		newTime = Double.parseDouble(newTimeTup.iterator().next());
+		
+		if (newTime > currTime) { //if old doesn't exist, we end up here
+			mergedVal = (1-_decayWeight)*currVal+_decayWeight*newVal; //if old doesn't exist, it has value "0"
+			mergedTime = newTime;
+		}
+		else {
+			mergedVal = currVal;
+			mergedTime = currTime;
+		}
+	
+		currValTup.clear();
+		currValTup.add(Double.toString(mergedVal));
+		currTimeTup.clear();
+		currTimeTup.add(Double.toString(mergedTime));
+	}
+
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/DivideOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/DivideOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/DivideOperator.java
new file mode 100644
index 0000000..d7fa0b1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/DivideOperator.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class DivideOperator extends Operator {
+
+	public DivideOperator() {
+		minInputTupleLists = 2;
+		maxInputTupleLists = 2;
+		inputOutputTupleListsCountsEqual = false;
+		numOutputTupleLists = 1;
+	}
+
+	@Override
+	public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/ExpandOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/ExpandOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/ExpandOperator.java
new file mode 100644
index 0000000..82429f3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/ExpandOperator.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class ExpandOperator extends Operator {
+
+	public ExpandOperator() {
+		minInputTupleLists = 1;
+		maxInputTupleLists = Integer.MAX_VALUE;
+		inputOutputTupleListsCountsEqual = true;
+	}
+
+	@Override
+	public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
+		//TODO: confirm this is a no-op operator
+		return input;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/ExpressionOperatorType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/ExpressionOperatorType.java b/helix-core/src/main/java/org/apache/helix/alerts/ExpressionOperatorType.java
new file mode 100644
index 0000000..e722195
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/ExpressionOperatorType.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+public enum ExpressionOperatorType {
+	//each
+	EACH(true),
+	//standard math
+	SUM(false),
+	MULTIPLY(false),
+	SUBTRACT(false),
+	DIVIDE(false),
+	//aggregation types
+	ACCUMULATE(true),
+	DECAY(false),
+	WINDOW(false);
+	
+	boolean isBase;
+	
+	private ExpressionOperatorType(boolean isBase) 
+	{
+		this.isBase = isBase;
+	}
+	
+	boolean isBaseOp()
+	{
+		return isBase;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java b/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java
new file mode 100644
index 0000000..10e5a0c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java
@@ -0,0 +1,579 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.helix.HelixException;
+import org.apache.log4j.Logger;
+
+
+public class ExpressionParser
+{
+  private static Logger logger = Logger.getLogger(ExpressionParser.class);
+
+  final static String opDelim = "|";
+  final static String opDelimForSplit = "\\|";
+  final static String argDelim = ",";
+  final public static String statFieldDelim = ".";
+  final static String wildcardChar = "*";
+
+  // static Map<String, ExpressionOperatorType> operatorMap = new
+  // HashMap<String, ExpressionOperatorType>();
+
+  static Map<String, Operator> operatorMap = new HashMap<String, Operator>();
+  static Map<String, Aggregator> aggregatorMap = new HashMap<String, Aggregator>();
+
+  static
+  {
+
+    addOperatorEntry("EXPAND", new ExpandOperator());
+    addOperatorEntry("DIVIDE", new DivideOperator());
+    addOperatorEntry("SUM", new SumOperator());
+    addOperatorEntry("SUMEACH", new SumEachOperator());
+
+    addAggregatorEntry("ACCUMULATE", new AccumulateAggregator());
+    addAggregatorEntry("DECAY", new DecayAggregator());
+    addAggregatorEntry("WINDOW", new WindowAggregator());
+    /*
+     * addEntry("EACH", ExpressionOperatorType.EACH); addEntry("SUM",
+     * ExpressionOperatorType.SUM); addEntry("DIVIDE",
+     * ExpressionOperatorType.DIVIDE); addEntry("ACCUMULATE",
+     * ExpressionOperatorType.ACCUMULATE);
+     */
+  }
+
+  // static Pattern pattern = Pattern.compile("(\\{.+?\\})");
+
+  private static void addOperatorEntry(String label, Operator op)
+  {
+    if (!operatorMap.containsKey(label))
+    {
+      operatorMap.put(label, op);
+    }
+    logger.info("Adding operator: " + op);
+  }
+
+  private static void addAggregatorEntry(String label, Aggregator agg)
+  {
+    if (!aggregatorMap.containsKey(label.toUpperCase()))
+    {
+      aggregatorMap.put(label.toUpperCase(), agg);
+    }
+    logger.info("Adding aggregator: " + agg);
+  }
+
+  /*
+   * private static void addEntry(String label, ExpressionOperatorType type) {
+   * if (!operatorMap.containsKey(label)) { operatorMap.put(label, type); }
+   * logger.info("Adding operator type: "+type); }
+   */
+
+  public static boolean isExpressionNested(String expression)
+  {
+    return expression.contains("(");
+  }
+
+  /*
+   * public static Operator getOperatorType(String expression) throws Exception
+   * { String op = expression.substring(0,expression.indexOf("(")); if
+   * (!operatorMap.containsKey(op)) { throw new
+   * Exception(op+" is not a valid op type"); } return operatorMap.get(op); }
+   */
+
+  public static String getInnerExpression(String expression)
+  {
+    return expression.substring(expression.indexOf("(") + 1,
+        expression.lastIndexOf(")"));
+  }
+
+  /*
+   * public static String[] getBaseStats(ExpressionOperatorType type, String
+   * expression) throws Exception { String[] items = null; if
+   * (isExpressionNested(expression)) { ExpressionOperatorType nextType =
+   * getOperatorType(expression); String innerExp =
+   * getInnerExpression(expression); items = getBaseStats(nextType, innerExp); }
+   * else { //base class, no nesting items = expression.split(","); }
+   * 
+   * if (type != null && type.isBaseOp()) { //surround items with type. for (int
+   * i=0; i<items.length; i++) { items[i] = type + "(" + items[i] + ")"; //!!!!
+   * NEED type to behave like string here
+   * logger.debug("Forming item "+items[i]); } } return items; }
+   * 
+   * public static String[] getBaseStats(String expression) throws Exception {
+   * expression = expression.replaceAll("\\s+", ""); return getBaseStats(null,
+   * expression); }
+   */
+
+  /*
+   * Validate 2 sets of parenthesis exist, all before first opDelim
+   * 
+   * extract agg type and validate it exists. validate number of args passed in
+   */
+  public static void validateAggregatorFormat(String expression)
+      throws HelixException
+  {
+    logger.debug("validating aggregator for expression: " + expression);
+    // have 0 or more args, 1 or more stats...e.g. ()(x) or (2)(x,y)
+    Pattern pattern = Pattern.compile("\\(.*?\\)");
+    Matcher matcher = pattern.matcher(expression);
+    String aggComponent = null;
+    String statComponent = null;
+    int lastMatchEnd = -1;
+    if (matcher.find())
+    {
+      aggComponent = matcher.group();
+      aggComponent = aggComponent.substring(1, aggComponent.length() - 1);
+      if (aggComponent.contains(")") || aggComponent.contains("("))
+      {
+        throw new HelixException(expression
+            + " has invalid aggregate component");
+      }
+    }
+    else
+    {
+      throw new HelixException(expression + " has invalid aggregate component");
+    }
+    if (matcher.find())
+    {
+      statComponent = matcher.group();
+      statComponent = statComponent.substring(1, statComponent.length() - 1);
+      // statComponent must have at least 1 arg between paren
+      if (statComponent.contains(")") || statComponent.contains("(")
+          || statComponent.length() == 0)
+      {
+        throw new HelixException(expression + " has invalid stat component");
+      }
+      lastMatchEnd = matcher.end();
+    }
+    else
+    {
+      throw new HelixException(expression + " has invalid stat component");
+    }
+    if (matcher.find())
+    {
+      throw new HelixException(expression
+          + " has too many parenthesis components");
+    }
+
+    if (expression.length() >= lastMatchEnd + 1)
+    { // lastMatchEnd is pos 1 past the pattern. check if there are paren there
+      if (expression.substring(lastMatchEnd).contains("(")
+          || expression.substring(lastMatchEnd).contains(")"))
+      {
+        throw new HelixException(expression + " has extra parenthesis");
+      }
+    }
+
+    // check wildcard locations. each part can have at most 1 wildcard, and must
+    // be at end
+    // String expStatNamePart = expression.substring(expression.)
+    StringTokenizer fieldTok = new StringTokenizer(statComponent,
+        statFieldDelim);
+    while (fieldTok.hasMoreTokens())
+    {
+      String currTok = fieldTok.nextToken();
+      if (currTok.contains(wildcardChar))
+      {
+        if (currTok.indexOf(wildcardChar) != currTok.length() - 1
+            || currTok.lastIndexOf(wildcardChar) != currTok.length() - 1)
+        {
+          throw new HelixException(currTok
+              + " is illegal stat name.  Single wildcard must appear at end.");
+        }
+      }
+    }
+  }
+
+  public static boolean statContainsWildcards(String stat)
+  {
+    return stat.contains(wildcardChar);
+  }
+
+  /*
+   * Return true if stat name matches exactly...incomingStat has no agg type
+   * currentStat can have any
+   * 
+   * Function can match for 2 cases extractStatFromAgg=false. Match
+   * accumulate()(dbFoo.partition10.latency) with
+   * accumulate()(dbFoo.partition10.latency)...trival extractStatFromAgg=true.
+   * Match accumulate()(dbFoo.partition10.latency) with
+   * dbFoo.partition10.latency
+   */
+  public static boolean isExactMatch(String currentStat, String incomingStat,
+      boolean extractStatFromAgg)
+  {
+    String currentStatName = currentStat;
+    if (extractStatFromAgg)
+    {
+      currentStatName = getSingleAggregatorStat(currentStat);
+    }
+    return (incomingStat.equals(currentStatName));
+  }
+
+  /*
+   * Return true if incomingStat matches wildcardStat except currentStat has 1+
+   * fields with "*" a*.c* matches a5.c7 a*.c* does not match a5.b6.c7
+   * 
+   * Function can match for 2 cases extractStatFromAgg=false. Match
+   * accumulate()(dbFoo.partition*.latency) with
+   * accumulate()(dbFoo.partition10.latency) extractStatFromAgg=true. Match
+   * accumulate()(dbFoo.partition*.latency) with dbFoo.partition10.latency
+   */
+  public static boolean isWildcardMatch(String currentStat,
+      String incomingStat, boolean statCompareOnly, ArrayList<String> bindings)
+  {
+    if (!statCompareOnly)
+    { // need to check for match on agg type and stat
+      String currentStatAggType = (currentStat.split("\\)"))[0];
+      String incomingStatAggType = (incomingStat.split("\\)"))[0];
+      if (!currentStatAggType.equals(incomingStatAggType))
+      {
+        return false;
+      }
+    }
+    // now just get the stats
+    String currentStatName = getSingleAggregatorStat(currentStat);
+    String incomingStatName = getSingleAggregatorStat(incomingStat);
+
+    if (!currentStatName.contains(wildcardChar))
+    { // no wildcards in stat name
+      return false;
+    }
+    
+    String currentStatNamePattern = currentStatName.replace(".", "\\.");
+    currentStatNamePattern = currentStatNamePattern.replace("*", ".*");
+    boolean result =  Pattern.matches(currentStatNamePattern, incomingStatName);
+    if(result && bindings != null)
+    {
+      bindings.add(incomingStatName);
+    }
+    return result;
+    /*
+    StringTokenizer currentStatTok = new StringTokenizer(currentStatName,
+        statFieldDelim);
+    StringTokenizer incomingStatTok = new StringTokenizer(incomingStatName,
+        statFieldDelim);
+    if (currentStatTok.countTokens() != incomingStatTok.countTokens())
+    { // stat names different numbers of fields
+      return false;
+    }
+    // for each token, if not wildcarded, must be an exact match
+    while (currentStatTok.hasMoreTokens())
+    {
+      String currTok = currentStatTok.nextToken();
+      String incomingTok = incomingStatTok.nextToken();
+      logger.debug("curTok: " + currTok);
+      logger.debug("incomingTok: " + incomingTok);
+      if (!currTok.contains(wildcardChar))
+      { // no wildcard, but have exact match
+        if (!currTok.equals(incomingTok))
+        { // not exact match
+          return false;
+        }
+      }
+      else
+      { // currTok has a wildcard
+        if (currTok.indexOf(wildcardChar) != currTok.length() - 1
+            || currTok.lastIndexOf(wildcardChar) != currTok.length() - 1)
+        {
+          throw new HelixException(currTok
+              + " is illegal stat name.  Single wildcard must appear at end.");
+        }
+        // for wildcard matching, need to escape parentheses on currTok, so
+        // regex works
+        // currTok = currTok.replace("(", "\\(");
+        // currTok = currTok.replace(")", "\\)");
+        // incomingTok = incomingTok.replace("(", "\\(");
+        // incomingTok = incomingTok.replace(")", "\\)");
+        String currTokPreWildcard = currTok.substring(0, currTok.length() - 1);
+        // TODO: if current token has a "(" in it, pattern compiling throws
+        // error
+        // Pattern pattern = Pattern.compile(currTokPreWildcard+".+"); //form
+        // pattern...wildcard part can be anything
+        // Matcher matcher = pattern.matcher(incomingTok); //see if incomingTok
+        // matches
+        if (incomingTok.indexOf(currTokPreWildcard) != 0)
+        {
+          // if (!matcher.find()) { //no match on one tok, return false
+          return false;
+        }
+        // get the binding
+
+        if (bindings != null)
+        {
+          // TODO: debug me!
+          String wildcardBinding = incomingTok.substring(incomingTok
+              .indexOf(currTokPreWildcard) + currTokPreWildcard.length());
+          bindings.add(wildcardBinding);
+        }
+      }
+    }
+    // all fields match or wildcard match...return true!
+    return true;*/
+  }
+
+  /*
+   * For checking if an incoming stat (no agg type defined) matches a persisted
+   * stat (with agg type defined)
+   */
+  public static boolean isIncomingStatExactMatch(String currentStat,
+      String incomingStat)
+  {
+    return isExactMatch(currentStat, incomingStat, true);
+  }
+
+  /*
+   * For checking if an incoming stat (no agg type defined) wildcard matches a
+   * persisted stat (with agg type defined) The persisted stat may have
+   * wildcards
+   */
+  public static boolean isIncomingStatWildcardMatch(String currentStat,
+      String incomingStat)
+  {
+    return isWildcardMatch(currentStat, incomingStat, true, null);
+  }
+
+  /*
+   * For checking if a persisted stat matches a stat defined in an alert
+   */
+  public static boolean isAlertStatExactMatch(String alertStat,
+      String currentStat)
+  {
+    return isExactMatch(alertStat, currentStat, false);
+  }
+
+  /*
+   * For checking if a maintained stat wildcard matches a stat defined in an
+   * alert. The alert may have wildcards
+   */
+  public static boolean isAlertStatWildcardMatch(String alertStat,
+      String currentStat, ArrayList<String> wildcardBindings)
+  {
+    return isWildcardMatch(alertStat, currentStat, false, wildcardBindings);
+  }
+
+  public static Aggregator getAggregator(String aggStr) throws HelixException
+  {
+    aggStr = aggStr.toUpperCase();
+    Aggregator agg = aggregatorMap.get(aggStr);
+    if (agg == null)
+    {
+      throw new HelixException("Unknown aggregator type " + aggStr);
+    }
+    return agg;
+  }
+
+  public static String getAggregatorStr(String expression)
+      throws HelixException
+  {
+    if (!expression.contains("("))
+    {
+      throw new HelixException(expression
+          + " does not contain a valid aggregator.  No parentheses found");
+    }
+    String aggName = expression.substring(0, expression.indexOf("("));
+    if (!aggregatorMap.containsKey(aggName.toUpperCase()))
+    {
+      throw new HelixException("aggregator <" + aggName + "> is unknown type");
+    }
+    return aggName;
+  }
+
+  public static String[] getAggregatorArgs(String expression)
+      throws HelixException
+  {
+    String aggregator = getAggregatorStr(expression);
+    String argsStr = getAggregatorArgsStr(expression);
+    String[] args = argsStr.split(argDelim);
+    logger.debug("args size: " + args.length);
+    int numArgs = (argsStr.length() == 0) ? 0 : args.length;
+    // String[] argList = (expression.substring(expression.indexOf("(")+1,
+    // expression.indexOf(")"))).split(argDelim);
+    // verify correct number of args
+    int requiredNumArgs = aggregatorMap.get(aggregator.toUpperCase())
+        .getRequiredNumArgs();
+    if (numArgs != requiredNumArgs)
+    {
+      throw new HelixException(expression + " contains " + args.length
+          + " arguments, but requires " + requiredNumArgs);
+    }
+    return args;
+  }
+
+  /*
+   * public static String[] getAggregatorArgsList(String expression) { String
+   * argsStr = getAggregatorArgsStr(expression); String[] args =
+   * argsStr.split(argDelim); return args; }
+   */
+
+  public static String getAggregatorArgsStr(String expression)
+  {
+    return expression.substring(expression.indexOf("(") + 1,
+        expression.indexOf(")"));
+  }
+
+  public static String[] getAggregatorStats(String expression)
+      throws HelixException
+  {
+    String justStats = expression;
+    if (expression.contains("(") && expression.contains(")"))
+    {
+      justStats = (expression.substring(expression.lastIndexOf("(") + 1,
+          expression.lastIndexOf(")")));
+    }
+    String[] statList = justStats.split(argDelim);
+    if (statList.length < 1)
+    {
+      throw new HelixException(expression
+          + " does not contain any aggregator stats");
+    }
+    return statList;
+  }
+
+  public static String getSingleAggregatorStat(String expression)
+      throws HelixException
+  {
+    String[] stats = getAggregatorStats(expression);
+    if (stats.length > 1)
+    {
+      throw new HelixException(expression + " contains more than 1 stat");
+    }
+    return stats[0];
+  }
+
+  public static String getWildcardStatSubstitution(String wildcardStat,
+      String fixedStat)
+  {
+    int lastOpenParenLoc = wildcardStat.lastIndexOf("(");
+    int lastCloseParenLoc = wildcardStat.lastIndexOf(")");
+    StringBuilder builder = new StringBuilder();
+    builder.append(wildcardStat.substring(0, lastOpenParenLoc + 1));
+    builder.append(fixedStat);
+    builder.append(")");
+    logger.debug("wildcardStat: " + wildcardStat);
+    logger.debug("fixedStat: " + fixedStat);
+    logger.debug("subbedStat: " + builder.toString());
+    return builder.toString();
+  }
+
+  // XXX: each op type should have number of inputs, number of outputs. do
+  // validation.
+  // (dbFoo.partition*.latency, dbFoo.partition*.count)|EACH|ACCUMULATE|DIVIDE
+  public static String[] getBaseStats(String expression) throws HelixException
+  {
+    expression = expression.replaceAll("\\s+", "");
+    validateAggregatorFormat(expression);
+
+    String aggName = getAggregatorStr(expression);
+    String[] aggArgs = getAggregatorArgs(expression);
+    String[] aggStats = getAggregatorStats(expression);
+
+    // form aggArgs
+    String aggArgList = getAggregatorArgsStr(expression);
+
+    String[] baseStats = new String[aggStats.length];
+    for (int i = 0; i < aggStats.length; i++)
+    {
+      StringBuilder stat = new StringBuilder();
+      stat.append(aggName);
+      stat.append("(");
+      stat.append(aggArgList);
+      stat.append(")");
+      stat.append("(");
+      stat.append(aggStats[i]);
+      stat.append(")");
+      baseStats[i] = stat.toString();
+    }
+    return baseStats;
+  }
+
+  public static String[] getOperators(String expression) throws HelixException
+  {
+    String[] ops = null;
+    int numAggStats = (getAggregatorStats(expression)).length;
+    int opDelimLoc = expression.indexOf(opDelim);
+    if (opDelimLoc < 0)
+    {
+      return null;
+    }
+    logger.debug("ops str: " + expression.substring(opDelimLoc + 1));
+    ops = expression.substring(opDelimLoc + 1).split(opDelimForSplit);
+
+    // validate this string of ops
+    // verify each op exists
+    // take num input tuples sets and verify ops will output exactly 1 tuple
+    // sets
+    int currNumTuples = numAggStats;
+    for (String op : ops)
+    {
+      logger.debug("op: " + op);
+      if (!operatorMap.containsKey(op.toUpperCase()))
+      {
+        throw new HelixException("<" + op + "> is not a valid operator type");
+      }
+      Operator currOpType = operatorMap.get(op.toUpperCase());
+      if (currNumTuples < currOpType.minInputTupleLists
+          || currNumTuples > currOpType.maxInputTupleLists)
+      {
+        throw new HelixException("<" + op + "> cannot process " + currNumTuples
+            + " input tuples");
+      }
+      // reset num tuples to this op's output size
+      if (!currOpType.inputOutputTupleListsCountsEqual)
+      { // if equal, this number does not change
+        currNumTuples = currOpType.numOutputTupleLists;
+      }
+    }
+    if (currNumTuples != 1)
+    {
+      throw new HelixException(expression
+          + " does not terminate in a single tuple set");
+    }
+    return ops;
+  }
+
+  public static void validateOperators(String expression) throws HelixException
+  {
+    getOperators(expression);
+  }
+
+  public static Operator getOperator(String opName) throws HelixException
+  {
+    if (!operatorMap.containsKey(opName))
+    {
+      throw new HelixException(opName + " is unknown op type");
+    }
+    return operatorMap.get(opName);
+  }
+
+  public static void validateExpression(String expression)
+      throws HelixException
+  {
+    // 1. extract stats part and validate
+    validateAggregatorFormat(expression);
+    // 2. extract ops part and validate the ops exist and the inputs/outputs are
+    // correct
+    validateOperators(expression);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java b/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
new file mode 100644
index 0000000..72be10b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Iterator;
+
+public class GreaterAlertComparator extends AlertComparator {
+
+	@Override
+	/*
+	 * Returns true if any element left tuple exceeds any element in right tuple
+	 */
+	public boolean evaluate(Tuple<String> leftTup, Tuple<String> rightTup) {
+		Iterator<String> leftIter = leftTup.iterator();
+		while (leftIter.hasNext()) {
+			double leftVal = Double.parseDouble(leftIter.next());
+			Iterator<String> rightIter = rightTup.iterator();
+			while (rightIter.hasNext()) {
+				double rightVal = Double.parseDouble(rightIter.next());
+				if (leftVal > rightVal) {
+					return true;
+				}
+			}
+		}
+		return false;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
new file mode 100644
index 0000000..0cd9623
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class MultiplyOperator extends Operator {
+
+	public MultiplyOperator() {
+		minInputTupleLists = 1;
+		maxInputTupleLists = Integer.MAX_VALUE;
+		inputOutputTupleListsCountsEqual = false;
+		numOutputTupleLists = 1;
+	}
+
+	
+	public List<Iterator<Tuple<String>>> singleSetToIter(ArrayList<Tuple<String>> input) 
+	{
+		List out = new ArrayList();
+		out.add(input.iterator());
+		return out;
+	}
+	
+	
+	
+	@Override
+	public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
+		ArrayList<Tuple<String>> output = new ArrayList<Tuple<String>>();
+		if (input == null || input.size() == 0) {
+			return singleSetToIter(output);
+		}
+		while (true) { //loop through set of iters, return when 1 runs out (not completing the row in progress)
+			Tuple<String> rowProduct = null;
+			for (Iterator<Tuple<String>> it : input) {
+				if (!it.hasNext()) { //when any iterator runs out, we are done
+					return singleSetToIter(output);
+				}
+				rowProduct = multiplyTuples(rowProduct, it.next());
+			}
+			output.add(rowProduct);
+		}
+	}
+
+}