You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC
[6/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ZNRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecord.java b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
new file mode 100644
index 0000000..b17bc87
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
@@ -0,0 +1,364 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecordDelta.MergeOperation;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Generic Record Format to store data at a Node This can be used to store
+ * simpleFields mapFields listFields
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ZNRecord
+{
+ static Logger _logger = Logger.getLogger(ZNRecord.class);
+ private final String id;
+
+ @JsonIgnore(true)
+ public static final String LIST_FIELD_BOUND = "listField.bound";
+
+ @JsonIgnore(true)
+ public static final int SIZE_LIMIT = 1000 * 1024; // leave a margin out of 1M
+
+ // We don't want the _deltaList to be serialized and deserialized
+ private List<ZNRecordDelta> _deltaList = new ArrayList<ZNRecordDelta>();
+
+ private Map<String, String> simpleFields;
+ private Map<String, Map<String, String>> mapFields;
+ private Map<String, List<String>> listFields;
+
+ // the version field of zookeeper Stat
+ private int _version;
+
+ private long _creationTime;
+
+ private long _modifiedTime;
+
+ @JsonCreator
+ public ZNRecord(@JsonProperty("id") String id)
+ {
+ this.id = id;
+ simpleFields = new TreeMap<String, String>();
+ mapFields = new TreeMap<String, Map<String, String>>();
+ listFields = new TreeMap<String, List<String>>();
+ }
+
+ public ZNRecord(ZNRecord record)
+ {
+ this(record, record.getId());
+ }
+
+ public ZNRecord(ZNRecord record, String id)
+ {
+ this(id);
+ simpleFields.putAll(record.getSimpleFields());
+ mapFields.putAll(record.getMapFields());
+ listFields.putAll(record.getListFields());
+ _version = record.getVersion();
+ _creationTime = record.getCreationTime();
+ _modifiedTime = record.getModifiedTime();
+ }
+
+ public ZNRecord(ZNRecord record, int version)
+ {
+ this(record);
+ _version = version;
+ }
+
+ @JsonIgnore(true)
+ public void setDeltaList(List<ZNRecordDelta> deltaList)
+ {
+ _deltaList = deltaList;
+ }
+
+ @JsonIgnore(true)
+ public List<ZNRecordDelta> getDeltaList()
+ {
+ return _deltaList;
+ }
+
+ @JsonProperty
+ public Map<String, String> getSimpleFields()
+ {
+ return simpleFields;
+ }
+
+ @JsonProperty
+ public void setSimpleFields(Map<String, String> simpleFields)
+ {
+ this.simpleFields = simpleFields;
+ }
+
+ @JsonProperty
+ public Map<String, Map<String, String>> getMapFields()
+ {
+ return mapFields;
+ }
+
+ @JsonProperty
+ public void setMapFields(Map<String, Map<String, String>> mapFields)
+ {
+ this.mapFields = mapFields;
+ }
+
+ @JsonProperty
+ public Map<String, List<String>> getListFields()
+ {
+ return listFields;
+ }
+
+ @JsonProperty
+ public void setListFields(Map<String, List<String>> listFields)
+ {
+ this.listFields = listFields;
+ }
+
+ @JsonProperty
+ public void setSimpleField(String k, String v)
+ {
+ simpleFields.put(k, v);
+ }
+
+ @JsonProperty
+ public String getId()
+ {
+ return id;
+ }
+
+ public void setMapField(String k, Map<String, String> v)
+ {
+ mapFields.put(k, v);
+ }
+
+ public void setListField(String k, List<String> v)
+ {
+ listFields.put(k, v);
+ }
+
+ public String getSimpleField(String k)
+ {
+ return simpleFields.get(k);
+ }
+
+ public Map<String, String> getMapField(String k)
+ {
+ return mapFields.get(k);
+ }
+
+ public List<String> getListField(String k)
+ {
+ return listFields.get(k);
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+ sb.append(id + ", ");
+ if (simpleFields != null)
+ {
+ sb.append(simpleFields);
+ }
+ if (mapFields != null)
+ {
+ sb.append(mapFields);
+ }
+ if (listFields != null)
+ {
+ sb.append(listFields);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * merge functionality is used to merge multiple znrecord into a single one.
+ * This will make use of the id of each ZNRecord and append it to every key
+ * thus making key unique. This is needed to optimize on the watches.
+ *
+ * @param record
+ */
+ public void merge(ZNRecord record)
+ {
+ if (record == null)
+ {
+ return;
+ }
+
+ if (record.getDeltaList().size() > 0)
+ {
+ _logger.info("Merging with delta list, recordId = " + id + " other:"
+ + record.getId());
+ merge(record.getDeltaList());
+ return;
+ }
+ simpleFields.putAll(record.simpleFields);
+ for (String key : record.mapFields.keySet())
+ {
+ Map<String, String> map = mapFields.get(key);
+ if (map != null)
+ {
+ map.putAll(record.mapFields.get(key));
+ } else
+ {
+ mapFields.put(key, record.mapFields.get(key));
+ }
+ }
+ for (String key : record.listFields.keySet())
+ {
+ List<String> list = listFields.get(key);
+ if (list != null)
+ {
+ list.addAll(record.listFields.get(key));
+ } else
+ {
+ listFields.put(key, record.listFields.get(key));
+ }
+ }
+ }
+
+ void merge(ZNRecordDelta delta)
+ {
+ if (delta.getMergeOperation() == MergeOperation.ADD)
+ {
+ merge(delta.getRecord());
+ } else if (delta.getMergeOperation() == MergeOperation.SUBTRACT)
+ {
+ subtract(delta.getRecord());
+ }
+ }
+
+ void merge(List<ZNRecordDelta> deltaList)
+ {
+ for (ZNRecordDelta delta : deltaList)
+ {
+ merge(delta);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (!(obj instanceof ZNRecord))
+ {
+ return false;
+ }
+ ZNRecord that = (ZNRecord) obj;
+ if (this.getSimpleFields().size() != that.getSimpleFields().size())
+ {
+ return false;
+ }
+ if (this.getMapFields().size() != that.getMapFields().size())
+ {
+ return false;
+ }
+ if (this.getListFields().size() != that.getListFields().size())
+ {
+ return false;
+ }
+ if (!this.getSimpleFields().equals(that.getSimpleFields()))
+ {
+ return false;
+ }
+ if (!this.getMapFields().equals(that.getMapFields()))
+ {
+ return false;
+ }
+ if (!this.getListFields().equals(that.getListFields()))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Note: does not support subtract in each list in list fields or map in
+ * mapFields
+ */
+ public void subtract(ZNRecord value)
+ {
+ for (String key : value.getSimpleFields().keySet())
+ {
+ if (simpleFields.containsKey(key))
+ {
+ simpleFields.remove(key);
+ }
+ }
+
+ for (String key : value.getListFields().keySet())
+ {
+ if (listFields.containsKey(key))
+ {
+ listFields.remove(key);
+ }
+ }
+
+ for (String key : value.getMapFields().keySet())
+ {
+ if (mapFields.containsKey(key))
+ {
+ mapFields.remove(key);
+ }
+ }
+ }
+
+ @JsonIgnore(true)
+ public int getVersion()
+ {
+ return _version;
+ }
+
+ @JsonIgnore(true)
+ public void setVersion(int version)
+ {
+ _version = version;
+ }
+
+ @JsonIgnore(true)
+ public long getCreationTime()
+ {
+ return _creationTime;
+ }
+
+ @JsonIgnore(true)
+ public void setCreationTime(long creationTime)
+ {
+ _creationTime = creationTime;
+ }
+
+ @JsonIgnore(true)
+ public long getModifiedTime()
+ {
+ return _modifiedTime;
+ }
+
+ @JsonIgnore(true)
+ public void setModifiedTime(long modifiedTime)
+ {
+ _modifiedTime = modifiedTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ZNRecordAssembler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecordAssembler.java b/helix-core/src/main/java/org/apache/helix/ZNRecordAssembler.java
new file mode 100644
index 0000000..4958f90
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecordAssembler.java
@@ -0,0 +1,30 @@
+package org.apache.helix;
+
+import java.util.List;
+
+public class ZNRecordAssembler
+{
+ public ZNRecord assemble(List<ZNRecord> records)
+ {
+ ZNRecord assembledRecord = null;
+ if (records != null && records.size() > 0)
+ {
+ for (ZNRecord record : records)
+ {
+ if (record == null)
+ {
+ continue;
+ }
+
+ if (assembledRecord == null)
+ {
+ assembledRecord = new ZNRecord(record.getId());
+ }
+
+ assembledRecord.merge(record);
+ }
+ }
+ return assembledRecord;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ZNRecordBucketizer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecordBucketizer.java b/helix-core/src/main/java/org/apache/helix/ZNRecordBucketizer.java
new file mode 100644
index 0000000..52704df
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecordBucketizer.java
@@ -0,0 +1,116 @@
+package org.apache.helix;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class ZNRecordBucketizer
+{
+ private static Logger LOG = Logger.getLogger(ZNRecordBucketizer.class);
+ final int _bucketSize;
+
+ public ZNRecordBucketizer(int bucketSize)
+ {
+ if (bucketSize <= 0)
+ {
+ LOG.debug("bucketSize <= 0 (was " + bucketSize
+ + "). Set to 0 to use non-bucketized HelixProperty.");
+ bucketSize = 0;
+ }
+
+ _bucketSize = bucketSize;
+ }
+
+ /**
+ * Calculate bucketName in form of "resourceName_p{startPartition}-p{endPartition}
+ *
+ * @param partitionName
+ * @return
+ */
+ public String getBucketName(String key)
+ {
+ if (_bucketSize == 0)
+ {
+ // no bucketize
+ return null;
+ }
+
+ int idx = key.lastIndexOf('_');
+ if (idx < 0)
+ {
+ throw new IllegalArgumentException("Could NOT find partition# in " + key
+ + ". partitionName should be in format of resourceName_partition#");
+ }
+
+ try
+ {
+ int partitionNb = Integer.parseInt(key.substring(idx + 1));
+ int bucketNb = partitionNb / _bucketSize;
+ int startPartition = bucketNb * _bucketSize;
+ int endPartition = bucketNb * _bucketSize + (_bucketSize - 1);
+ return key.substring(0, idx) + "_p" + startPartition + "-p" + endPartition;
+ }
+ catch (NumberFormatException e)
+ {
+ throw new IllegalArgumentException("Could NOT parse partition# ("
+ + key.substring(idx + 1) + ") in " + key);
+ }
+ }
+
+ public Map<String, ZNRecord> bucketize(ZNRecord record)
+ {
+ Map<String, ZNRecord> map = new HashMap<String, ZNRecord>();
+ if (_bucketSize == 0)
+ {
+ map.put(record.getId(), record);
+ return map;
+ }
+
+ // bucketize list field
+ for (String partitionName : record.getListFields().keySet())
+ {
+ String bucketName = getBucketName(partitionName);
+ if (bucketName != null)
+ {
+ if (!map.containsKey(bucketName))
+ {
+ map.put(bucketName, new ZNRecord(bucketName));
+ }
+ ZNRecord bucketizedRecord = map.get(bucketName);
+ bucketizedRecord.setListField(partitionName, record.getListField(partitionName));
+ }
+ else
+ {
+ LOG.error("Can't bucketize " + partitionName + " in list field");
+ }
+ }
+
+ // bucketize map field
+ for (String partitionName : record.getMapFields().keySet())
+ {
+ String bucketName = getBucketName(partitionName);
+ if (bucketName != null)
+ {
+ if (!map.containsKey(bucketName))
+ {
+ map.put(bucketName, new ZNRecord(bucketName));
+ }
+ ZNRecord bucketizedRecord = map.get(bucketName);
+ bucketizedRecord.setMapField(partitionName, record.getMapField(partitionName));
+ }
+ else
+ {
+ LOG.error("Can't bucketize " + partitionName + " in map field");
+ }
+ }
+
+ // copy all simple fields
+ for (ZNRecord bucketizedRecord : map.values())
+ {
+ bucketizedRecord.setSimpleFields(record.getSimpleFields());
+ }
+ return map;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java b/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java
new file mode 100644
index 0000000..f2a4636
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecordDelta.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+public class ZNRecordDelta
+{
+ public enum MergeOperation {ADD, SUBTRACT};
+ public ZNRecord _record;
+ public MergeOperation _mergeOperation;
+
+ public ZNRecordDelta(ZNRecord record, MergeOperation _mergeOperation)
+ {
+ _record = new ZNRecord(record);
+ this._mergeOperation = _mergeOperation;
+ }
+
+ public ZNRecordDelta(ZNRecord record)
+ {
+ this(record, MergeOperation.ADD);
+ }
+
+ public ZNRecordDelta()
+ {
+ this(new ZNRecord(""), MergeOperation.ADD);
+ }
+
+ public ZNRecord getRecord()
+ {
+ return _record;
+ }
+
+ public MergeOperation getMergeOperation()
+ {
+ return _mergeOperation;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ZNRecordUpdater.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecordUpdater.java b/helix-core/src/main/java/org/apache/helix/ZNRecordUpdater.java
new file mode 100644
index 0000000..2a2bb7b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecordUpdater.java
@@ -0,0 +1,24 @@
+package org.apache.helix;
+
+import org.I0Itec.zkclient.DataUpdater;
+
+public class ZNRecordUpdater implements DataUpdater<ZNRecord>
+{
+ final ZNRecord _record;
+
+ public ZNRecordUpdater(ZNRecord record)
+ {
+ _record = record;
+ }
+
+ @Override
+ public ZNRecord update(ZNRecord current)
+ {
+ if (current != null)
+ {
+ current.merge(_record);
+ return current;
+ }
+ return _record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/AccumulateAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/AccumulateAggregator.java b/helix-core/src/main/java/org/apache/helix/alerts/AccumulateAggregator.java
new file mode 100644
index 0000000..ba2c847
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/AccumulateAggregator.java
@@ -0,0 +1,71 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Iterator;
+
+import org.apache.helix.HelixException;
+
+
+public class AccumulateAggregator extends Aggregator {
+
+
+ public AccumulateAggregator()
+ {
+ _numArgs = 0;
+ }
+
+ @Override
+ public void merge(Tuple<String> currValTup, Tuple<String> newValTup,
+ Tuple<String> currTimeTup, Tuple<String> newTimeTup, String... args) {
+
+ double currVal = 0;
+ double currTime = -1;
+ double newVal;
+ double newTime;
+ double mergedVal;
+ double mergedTime;
+
+ if (currValTup == null || newValTup == null || currTimeTup == null ||
+ newTimeTup == null) {
+ throw new HelixException("Tuples cannot be null");
+ }
+
+ //old tuples may be empty, indicating no value/time exist
+ if (currValTup.size() > 0 && currTimeTup.size() > 0) {
+ currVal = Double.parseDouble(currValTup.iterator().next());
+ currTime = Double.parseDouble(currTimeTup.iterator().next());
+ }
+ newVal = Double.parseDouble(newValTup.iterator().next());
+ newTime = Double.parseDouble(newTimeTup.iterator().next());
+
+ if (newTime > currTime) { //if old doesn't exist, we end up here
+ mergedVal = currVal+newVal; //if old doesn't exist, it has value "0"
+ mergedTime = newTime;
+ }
+ else {
+ mergedVal = currVal;
+ mergedTime = currTime;
+ }
+
+ currValTup.clear();
+ currValTup.add(Double.toString(mergedVal));
+ currTimeTup.clear();
+ currTimeTup.add(Double.toString(mergedTime));
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/Aggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Aggregator.java b/helix-core/src/main/java/org/apache/helix/alerts/Aggregator.java
new file mode 100644
index 0000000..6d905a0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/Aggregator.java
@@ -0,0 +1,38 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+public abstract class Aggregator {
+
+ int _numArgs;
+
+ public Aggregator()
+ {
+
+ }
+
+ /*
+ * Take curr and new values. Update curr.
+ */
+ public abstract void merge(Tuple<String> currVal, Tuple<String> newVal,
+ Tuple<String> currTime, Tuple<String> newTime, String... args);
+
+ public int getRequiredNumArgs()
+ {
+ return _numArgs;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/Alert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Alert.java b/helix-core/src/main/java/org/apache/helix/alerts/Alert.java
new file mode 100644
index 0000000..59dd548
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/Alert.java
@@ -0,0 +1,53 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+public class Alert {
+
+ String _name;
+
+ String _expression;
+ String _comparator;
+ Tuple<String> _constant;
+
+ public Alert(String name, String expression, String comparator, Tuple<String> constant)
+ {
+ _name=name;
+ _expression=expression;
+ _comparator=comparator;
+ _constant=constant;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public String getExpression()
+ {
+ return _expression;
+ }
+
+ public String getComparator()
+ {
+ return _comparator;
+ }
+
+ public Tuple<String> getConstant()
+ {
+ return _constant;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/AlertComparator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/AlertComparator.java b/helix-core/src/main/java/org/apache/helix/alerts/AlertComparator.java
new file mode 100644
index 0000000..de0536c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/AlertComparator.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+public abstract class AlertComparator {
+
+ public AlertComparator()
+ {
+
+ }
+
+ public abstract boolean evaluate(Tuple<String> leftTup, Tuple<String> rightTup);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/AlertParser.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/AlertParser.java b/helix-core/src/main/java/org/apache/helix/alerts/AlertParser.java
new file mode 100644
index 0000000..5008567
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/AlertParser.java
@@ -0,0 +1,147 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
+import org.apache.log4j.Logger;
+
+
+public class AlertParser {
+ private static Logger logger = Logger.getLogger(AlertParser.class);
+
+ public static final String EXPRESSION_NAME = "EXP";
+ public static final String COMPARATOR_NAME = "CMP";
+ public static final String CONSTANT_NAME = "CON";
+ public static final String ACTION_NAME = "ACTION";
+
+ static Map<String, AlertComparator> comparatorMap = new HashMap<String, AlertComparator>();
+
+ static
+ {
+
+ addComparatorEntry("GREATER", new GreaterAlertComparator());
+ }
+
+ private static void addComparatorEntry(String label, AlertComparator comp)
+ {
+ if (!comparatorMap.containsKey(label))
+ {
+ comparatorMap.put(label, comp);
+ }
+ logger.info("Adding comparator: "+comp);
+ }
+
+ public static AlertComparator getComparator(String compName)
+ {
+ compName = compName.replaceAll("\\s+", ""); //remove white space
+ if (!comparatorMap.containsKey(compName)) {
+ throw new HelixException("Comparator type <"+compName+"> unknown");
+ }
+ return comparatorMap.get(compName);
+ }
+
+ public static String getComponent(String component, String alert) throws HelixException
+ {
+ //find EXP and keep going until paren are closed
+ int expStartPos = alert.indexOf(component);
+ if (expStartPos < 0) {
+ throw new HelixException(alert+" does not contain component "+component);
+ }
+ expStartPos += (component.length()+1); //advance length of string and one for open paren
+ int expEndPos = expStartPos;
+ int openParenCount = 1;
+ while (openParenCount > 0) {
+ if (alert.charAt(expEndPos) == '(') {
+ openParenCount++;
+ }
+ else if (alert.charAt(expEndPos) == ')') {
+ openParenCount--;
+ }
+ expEndPos++;
+ }
+ if (openParenCount != 0) {
+ throw new HelixException(alert+" does not contain valid "+component+" component, " +
+ "parentheses do not close");
+ }
+ //return what is in between paren
+ return alert.substring(expStartPos, expEndPos-1);
+ }
+
+ public static boolean validateAlert(String alert) throws HelixException
+ {
+ //TODO: decide if toUpperCase is going to cause problems with stuff like db name
+ alert = alert.replaceAll("\\s+", ""); //remove white space
+ String exp = getComponent(EXPRESSION_NAME, alert);
+ String cmp = getComponent(COMPARATOR_NAME, alert);
+ String val = getComponent(CONSTANT_NAME, alert);
+ logger.debug("exp: "+exp);
+ logger.debug("cmp: "+cmp);
+ logger.debug("val: "+val);
+
+ //separately validate each portion
+ ExpressionParser.validateExpression(exp);
+
+ //validate comparator
+ if (!comparatorMap.containsKey(cmp.toUpperCase())) {
+ throw new HelixException("Unknown comparator type "+cmp);
+ }
+ String actionValue = null;
+ try
+ {
+ actionValue = AlertParser.getComponent(AlertParser.ACTION_NAME, alert);
+ }
+ catch(Exception e)
+ {
+ logger.info("No action specified in " + alert);
+ }
+
+ if(actionValue != null)
+ {
+ validateActionValue(actionValue);
+ }
+ //ValParser. Probably don't need this. Just make sure it's a valid tuple. But would also be good
+ //to validate that the tuple is same length as exp's output...maybe leave that as future todo
+ //not sure we can really do much here though...anything can be in a tuple.
+
+ //TODO: try to compare tuple width of CON against tuple width of agg type! Not a good idea, what if
+ //is not at full width yet, like with window
+
+ //if all of this passes, then we can safely record the alert in zk. still need to implement zk location
+
+ return false;
+ }
+
+ public static void validateActionValue(String actionValue)
+ {
+ try
+ {
+ ActionOnError actionVal = ActionOnError.valueOf(actionValue);
+ }
+ catch(Exception e)
+ {
+ String validActions = "";
+ for (ActionOnError action : ActionOnError.values())
+ {
+ validActions = validActions + action + " ";
+ }
+ throw new HelixException("Unknown cmd type " + actionValue + ", valid types : " + validActions);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/AlertProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/AlertProcessor.java b/helix-core/src/main/java/org/apache/helix/alerts/AlertProcessor.java
new file mode 100644
index 0000000..fba4dbf
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/AlertProcessor.java
@@ -0,0 +1,355 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.healthcheck.StatHealthReportProvider;
+import org.apache.log4j.Logger;
+
+
+public class AlertProcessor
+{
+ private static Logger logger = Logger.getLogger(AlertProcessor.class);
+
+ private static final String bindingDelim = ",";
+ public static final String noWildcardAlertKey = "*";
+
+ StatsHolder _statsHolder;
+
+ // AlertsHolder _alertsHolder;
+
+ /*
+ * public AlertProcessor(StatHealthReportProvider statProvider) {
+ *
+ * }
+ */
+
+ public AlertProcessor(StatsHolder sh)
+ {
+ _statsHolder = sh;
+ }
+
+ public static Map<String, List<Tuple<String>>> initAlertStatTuples(Alert alert)
+ {
+ // get the stats out of the alert
+ String[] alertStats = ExpressionParser.getBaseStats(alert.getExpression());
+ // init a tuple list for each alert stat
+ Map<String, List<Tuple<String>>> alertStatTuples = new HashMap<String, List<Tuple<String>>>();
+ for (String currAlertStat : alertStats)
+ {
+ List<Tuple<String>> currList = new ArrayList<Tuple<String>>();
+ alertStatTuples.put(currAlertStat, currList);
+ }
+ return alertStatTuples;
+ }
+
+ /*
+ * //this function is all messed up!!! public static void
+ * populateAlertStatTuples(Map<String,List<Tuple<String>>> tupleLists,
+ * List<Stat> persistentStats) { Set<String> alertStatNames =
+ * tupleLists.keySet(); for (Stat persistentStat : persistentStats) { //ignore
+ * stats with wildcards, they don't have values...they are just there to catch
+ * new actual stats if
+ * (ExpressionParser.statContainsWildcards(persistentStat.getName())) {
+ * continue; } Iterator<String> alertStatIter = alertStatNames.iterator();
+ * while (alertStatIter.hasNext()) { String currAlertStat =
+ * alertStatIter.next(); if
+ * (ExpressionParser.isAlertStatExactMatch(currAlertStat,
+ * persistentStat.getName()) ||
+ * ExpressionParser.isAlertStatWildcardMatch(currAlertStat,
+ * persistentStat.getName())) {
+ * tupleLists.get(currAlertStat).add(persistentStat.getValue()); } } } }
+ */
+
+ public static String formAlertKey(ArrayList<String> bindings)
+ {
+ if (bindings.size() == 0)
+ {
+ return null;
+ }
+ StringBuilder alertKey = new StringBuilder();
+ boolean emptyKey = true;
+ for (String binding : bindings)
+ {
+ if (!emptyKey)
+ {
+ alertKey.append(bindingDelim);
+ }
+ alertKey.append(binding);
+ emptyKey = false;
+ }
+ return alertKey.toString();
+ }
+
+ // XXX: major change here. return ArrayList of Stats instead of ArrayList of
+ // Tuple<String>'s
+ public static Map<String, ArrayList<Tuple<String>>> populateAlertStatTuples(
+ String[] alertStats, List<Stat> persistentStats)
+ {
+ Map<String, ArrayList<Tuple<String>>> tupleSets = new HashMap<String, ArrayList<Tuple<String>>>();
+
+ // check each persistentStat, alertStat pair
+ for (Stat persistentStat : persistentStats)
+ {
+ // ignore stats with wildcards, they don't have values...they are just
+ // there to catch new actual stats
+ if (ExpressionParser.statContainsWildcards(persistentStat.getName()))
+ {
+ continue;
+ }
+ for (int i = 0; i < alertStats.length; i++)
+ {
+ String alertStat = alertStats[i];
+ ArrayList<String> wildcardBindings = new ArrayList<String>();
+ // if match, then proceed. If the match is wildcard, additionally fill
+ // in the wildcard bindings
+ if (ExpressionParser.isAlertStatExactMatch(alertStat,
+ persistentStat.getName())
+ || ExpressionParser.isAlertStatWildcardMatch(alertStat,
+ persistentStat.getName(), wildcardBindings))
+ {
+ String alertKey;
+ if (wildcardBindings.size() == 0)
+ {
+ alertKey = noWildcardAlertKey;
+ }
+ else
+ {
+ alertKey = formAlertKey(wildcardBindings);
+ }
+ if (!tupleSets.containsKey(alertKey))
+ { // don't have an entry for alertKey yet, create one
+ ArrayList<Tuple<String>> tuples = new ArrayList<Tuple<String>>(
+ alertStats.length);
+ for (int j = 0; j < alertStats.length; j++)
+ { // init all entries to null
+ tuples.add(j, null);
+ }
+ tupleSets.put(alertKey, tuples); // add to map
+ }
+ tupleSets.get(alertKey).set(i, persistentStat.getValue());
+ }
+ }
+ }
+
+ // post-processing step to discard any rows with null vals...
+ // TODO: decide if this is best thing to do with incomplete rows
+ List<String> selectedKeysToRemove = new ArrayList<String>();
+ for (String setKey : tupleSets.keySet())
+ {
+ ArrayList<Tuple<String>> tupleSet = tupleSets.get(setKey);
+ for (Tuple<String> tup : tupleSet)
+ {
+ if (tup == null)
+ {
+ selectedKeysToRemove.add(setKey);
+ break; // move on to next setKey
+ }
+ }
+ }
+ for(String keyToRemove : selectedKeysToRemove)
+ {
+ tupleSets.remove(keyToRemove);
+ }
+
+ // convert above to a series of iterators
+
+ return tupleSets;
+ }
+
+ public static List<Iterator<Tuple<String>>> convertTupleRowsToTupleColumns(
+ Map<String, ArrayList<Tuple<String>>> tupleMap)
+ {
+ // input is a map of key -> list of tuples. each tuple list is same length
+ // output should be a list of iterators. each column in input becomes
+ // iterator in output
+
+ ArrayList<ArrayList<Tuple<String>>> columns = new ArrayList<ArrayList<Tuple<String>>>();
+ ArrayList<Iterator<Tuple<String>>> columnIters = new ArrayList<Iterator<Tuple<String>>>();
+ for (String currStat : tupleMap.keySet())
+ {
+ List<Tuple<String>> currSet = tupleMap.get(currStat);
+ for (int i = 0; i < currSet.size(); i++)
+ {
+ if (columns.size() < (i + 1))
+ {
+ ArrayList<Tuple<String>> col = new ArrayList<Tuple<String>>();
+ columns.add(col);
+ }
+ columns.get(i).add(currSet.get(i));
+ }
+ }
+ for (ArrayList<Tuple<String>> al : columns)
+ {
+ columnIters.add(al.iterator());
+ }
+ return columnIters;
+
+ }
+
+ public static Iterator<Tuple<String>> executeOperatorPipeline(
+ List<Iterator<Tuple<String>>> tupleIters, String[] operators)
+ {
+ List<Iterator<Tuple<String>>> nextIters = tupleIters;
+ if (operators != null)
+ {
+ for (String opName : operators)
+ {
+ Operator op = ExpressionParser.getOperator(opName);
+ nextIters = op.execute(nextIters);
+ }
+ }
+
+ if (nextIters.size() != 1)
+ {
+ throw new HelixException("operator pipeline produced " + nextIters.size()
+ + " tuple sets instead of exactly 1");
+ }
+
+ return nextIters.get(0);
+ }
+
+ /*
+ * TODO: consider returning actual values, rather than bools. Could just
+ * return the triggered alerts
+ */
+ public static ArrayList<AlertValueAndStatus> executeComparator(
+ Iterator<Tuple<String>> tuples, String comparatorName,
+ Tuple<String> constant)
+ {
+ ArrayList<AlertValueAndStatus> results = new ArrayList<AlertValueAndStatus>();
+ AlertComparator cmp = AlertParser.getComparator(comparatorName);
+
+ while (tuples.hasNext())
+ {
+ Tuple<String> currTup = tuples.next();
+ boolean fired = cmp.evaluate(currTup, constant);
+ results.add(new AlertValueAndStatus(currTup, fired));
+ // results.add(cmp.evaluate(currTup, constant));
+ }
+ return results;
+
+ }
+
+ /*
+ * public static void executeAlert(Alert alert, List<Stat> stats) { //init
+ * tuple lists and populate them Map<String,List<Tuple<String>>>
+ * alertStatTupleSets = initAlertStatTuples(alert);
+ * populateAlertStatTuples(alertStatTupleSets, stats); //TODO: not sure I am
+ * being careful enough with sticking stats that match each other in this
+ * list! //convert to operator friendly format List<Iterator<Tuple<String>>>
+ * tupleIters = convertTupleSetsToTupleIterators(alertStatTupleSets); //get
+ * the operators String[] operators =
+ * ExpressionParser.getOperators(alert.getExpression()); //do operator
+ * pipeline Iterator<Tuple<String>> opResultTuples =
+ * executeOperatorPipeline(tupleIters, operators); //execute comparator for
+ * tuple list ArrayList<Boolean> evalResults =
+ * executeComparator(opResultTuples, alert.getComparator(),
+ * alert.getConstant());
+ *
+ * //TODO: convey this back to execute all
+ *
+ * }
+ */
+
+ public static HashMap<String, AlertValueAndStatus> generateResultMap(
+ Set<String> alertStatBindings, ArrayList<AlertValueAndStatus> evalResults)
+ {
+ HashMap<String, AlertValueAndStatus> resultMap = new HashMap<String, AlertValueAndStatus>();
+ Iterator<String> bindingIter = alertStatBindings.iterator();
+ Iterator<AlertValueAndStatus> resultIter = evalResults.iterator();
+ if (alertStatBindings.size() != evalResults.size())
+ {
+ // can't match up alerts bindings to results
+ while (resultIter.hasNext())
+ {
+ resultMap.put(noWildcardAlertKey, resultIter.next());
+ }
+ }
+ else
+ {
+ // they do match up
+ while (resultIter.hasNext())
+ {
+ resultMap.put(bindingIter.next(), resultIter.next());
+ }
+ }
+ return resultMap;
+ }
+
+ public static HashMap<String, AlertValueAndStatus> executeAlert(Alert alert,
+ List<Stat> persistedStats)
+ {
+ // init tuple lists and populate them
+ // Map<String,List<Tuple<String>>> alertStatTupleSets =
+ // initAlertStatTuples(alert);
+
+ String[] alertStats = ExpressionParser.getBaseStats(alert.getExpression());
+
+ Map<String, ArrayList<Tuple<String>>> alertsToTupleRows = populateAlertStatTuples(
+ alertStats, persistedStats);
+
+ if (alertsToTupleRows.size() == 0)
+ {
+ return null;
+ }
+ // convert to operator friendly format
+ List<Iterator<Tuple<String>>> tupleIters = convertTupleRowsToTupleColumns(alertsToTupleRows);
+ // get the operators
+ String[] operators = ExpressionParser.getOperators(alert.getExpression());
+ // do operator pipeline
+ Iterator<Tuple<String>> opResultTuples = executeOperatorPipeline(
+ tupleIters, operators);
+ // execute comparator for tuple list
+ ArrayList<AlertValueAndStatus> evalResults = executeComparator(
+ opResultTuples, alert.getComparator(), alert.getConstant());
+
+ // stitch alert bindings back together with final result
+ // XXX: there is a non-critical bug here. if we have an aggregating
+ // operator, but that operator only takes one input,
+ // we bind to original wildcard binding, instead of to "*"
+
+ HashMap<String, AlertValueAndStatus> alertBindingsToResult = generateResultMap(
+ alertsToTupleRows.keySet(), evalResults);
+
+ return alertBindingsToResult;
+
+ }
+
+ public static Map<String, Map<String, AlertValueAndStatus>> executeAllAlerts(
+ List<Alert> alerts, List<Stat> stats)
+ {
+ Map<String, Map<String, AlertValueAndStatus>> alertsResults = new HashMap<String, Map<String, AlertValueAndStatus>>();
+
+ for (Alert alert : alerts)
+ {
+ HashMap<String, AlertValueAndStatus> result = executeAlert(alert, stats);
+ // TODO: decide if sticking null results in here is ok
+ alertsResults.put(alert.getName(), result);
+ }
+
+ return alertsResults;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/AlertValueAndStatus.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/AlertValueAndStatus.java b/helix-core/src/main/java/org/apache/helix/alerts/AlertValueAndStatus.java
new file mode 100644
index 0000000..5dbfee8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/AlertValueAndStatus.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+public class AlertValueAndStatus {
+ public final static String VALUE_NAME = "value";
+ public final static String FIRED_NAME = "fired";
+
+ private Tuple<String> value;
+ private boolean fired;
+
+ public AlertValueAndStatus(Tuple<String> value, boolean fired)
+ {
+ this.value = value;
+ this.fired = fired;
+ }
+
+ public Tuple<String> getValue() {
+ return value;
+ }
+
+ public boolean isFired() {
+ return fired;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/AlertsHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/AlertsHolder.java b/helix-core/src/main/java/org/apache/helix/alerts/AlertsHolder.java
new file mode 100644
index 0000000..bca9b6a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/AlertsHolder.java
@@ -0,0 +1,286 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.stages.HealthDataCache;
+import org.apache.helix.model.AlertStatus;
+import org.apache.helix.model.Alerts;
+import org.apache.log4j.Logger;
+
+
+public class AlertsHolder {
+
+ private static final Logger logger = Logger
+ .getLogger(AlertsHolder.class.getName());
+
+ HelixDataAccessor _accessor;
+ HealthDataCache _cache;
+ Map<String, Map<String,String>> _alertsMap; //not sure if map or set yet
+ Map<String, Map<String,String>> _alertStatusMap;
+ //Alerts _alerts;
+ HashSet<String> alerts;
+ StatsHolder _statsHolder;
+
+ private final HelixManager _manager;
+
+ private Builder _keyBuilder;
+
+ public AlertsHolder(HelixManager manager, HealthDataCache cache)
+ {
+ this(manager, cache, new StatsHolder(manager, cache));
+ }
+
+ public AlertsHolder(HelixManager manager, HealthDataCache cache, StatsHolder statHolder)
+ {
+ _manager = manager;
+ _accessor = manager.getHelixDataAccessor();
+ _cache = cache;
+ _statsHolder = statHolder;
+ _keyBuilder = new PropertyKey.Builder(_manager.getClusterName());
+ updateCache(_cache);
+ }
+
+ public void refreshAlerts()
+ {
+ _cache.refresh(_accessor);
+ updateCache(_cache);
+
+
+ /*
+ _alertsMap = _cache.getAlerts();
+ //TODO: confirm this a good place to init the _statMap when null
+ if (_alertsMap == null) {
+ _alertsMap = new HashMap<String, Map<String,String>>();
+ }\*/
+ }
+
+ public void refreshAlertStatus()
+ {
+ AlertStatus alertStatusRecord = _cache.getAlertStatus();
+ if (alertStatusRecord != null) {
+ _alertStatusMap = alertStatusRecord.getMapFields();
+ }
+ else {
+ _alertStatusMap = new HashMap<String, Map<String,String>>();
+ }
+ }
+
+ public void persistAlerts()
+ {
+ //XXX: Am I using _accessor too directly here?
+
+ Alerts alerts = _accessor.getProperty(_keyBuilder.alerts());
+ if (alerts == null) {
+ alerts = new Alerts(Alerts.nodeName); //TODO: fix naming of this record, if it matters
+ }
+ alerts.getRecord().setMapFields(_alertsMap);
+ boolean retVal = _accessor.setProperty(_keyBuilder.alerts(), alerts);
+ logger.debug("persistAlerts retVal: "+retVal);
+ }
+
+ public void persistAlertStatus()
+ {
+ //XXX: Am I using _accessor too directly here?
+ AlertStatus alertStatus = _accessor.getProperty(_keyBuilder.alertStatus());
+ if (alertStatus == null) {
+ alertStatus = new AlertStatus(AlertStatus.nodeName); //TODO: fix naming of this record, if it matters
+ }
+ alertStatus.getRecord().setMapFields(_alertStatusMap);
+ boolean retVal = _accessor.setProperty(_keyBuilder.alertStatus(), alertStatus);
+ logger.debug("persistAlerts retVal: "+retVal);
+ }
+
+ //read alerts from cm state
+ private void readExistingAlerts()
+ {
+
+ }
+
+ public void addAlert(String alert) throws HelixException
+ {
+ alert = alert.replaceAll("\\s+", ""); //remove white space
+ AlertParser.validateAlert(alert);
+ refreshAlerts();
+ //stick the 3 alert fields in map
+ Map<String, String> alertFields = new HashMap<String,String>();
+ alertFields.put(AlertParser.EXPRESSION_NAME,
+ AlertParser.getComponent(AlertParser.EXPRESSION_NAME, alert));
+ alertFields.put(AlertParser.COMPARATOR_NAME,
+ AlertParser.getComponent(AlertParser.COMPARATOR_NAME, alert));
+ alertFields.put(AlertParser.CONSTANT_NAME,
+ AlertParser.getComponent(AlertParser.CONSTANT_NAME, alert));
+ try
+ {
+ alertFields.put(AlertParser.ACTION_NAME,
+ AlertParser.getComponent(AlertParser.ACTION_NAME, alert));
+ }
+ catch(Exception e)
+ {
+ logger.info("No action specified in " + alert);
+ }
+ //store the expression as stat
+ _statsHolder.addStat(alertFields.get(AlertParser.EXPRESSION_NAME));
+ _statsHolder.persistStats();
+
+ //naming the alert with the full name
+ _alertsMap.put(alert, alertFields);
+ persistAlerts();
+ }
+
+ /*
+ * Add a set of alert statuses to ZK
+ */
+ public void addAlertStatusSet(Map<String, Map<String, AlertValueAndStatus>> statusSet) throws HelixException
+ {
+ if (_alertStatusMap == null) {
+ _alertStatusMap = new HashMap<String, Map<String,String>>();
+ }
+ _alertStatusMap.clear(); //clear map. all alerts overwrite old alerts
+ for (String alert : statusSet.keySet()) {
+ Map<String,AlertValueAndStatus> currStatus = statusSet.get(alert);
+ if (currStatus != null) {
+ addAlertStatus(alert, currStatus);
+ }
+ }
+
+ AlertStatus alertStatus = _accessor.getProperty(_keyBuilder.alertStatus());
+ int alertStatusSize = 0;
+ if (alertStatus != null) {
+ alertStatusSize = alertStatus.getMapFields().size();
+ }
+ //no need to persist alerts if there are none to persist and none are currently persisted
+ if (_alertStatusMap.size() > 0 || alertStatusSize > 0) {
+ persistAlertStatus(); //save statuses in zk
+ }
+ }
+
+ private void addAlertStatus(String parentAlertKey, Map<String,AlertValueAndStatus> alertStatus) throws HelixException
+ {
+ //_alertStatusMap = new HashMap<String,Map<String,String>>();
+ for (String alertName : alertStatus.keySet()) {
+ String mapAlertKey;
+ mapAlertKey = parentAlertKey;
+ if (!alertName.equals(ExpressionParser.wildcardChar)) {
+ mapAlertKey = mapAlertKey+" : ("+alertName+")";
+ }
+ AlertValueAndStatus vs = alertStatus.get(alertName);
+ Map<String,String> alertFields = new HashMap<String,String>();
+ alertFields.put(AlertValueAndStatus.VALUE_NAME, vs.getValue().toString());
+ alertFields.put(AlertValueAndStatus.FIRED_NAME, String.valueOf(vs.isFired()));
+ _alertStatusMap.put(mapAlertKey, alertFields);
+ }
+ }
+
+ public AlertValueAndStatus getAlertValueAndStatus(String alertName)
+ {
+ Map<String,String> alertFields = _alertStatusMap.get(alertName);
+ String val = alertFields.get(AlertValueAndStatus.VALUE_NAME);
+ Tuple<String> valTup = new Tuple<String>();
+ valTup.add(val);
+ boolean fired = Boolean.valueOf(alertFields.get(AlertValueAndStatus.FIRED_NAME));
+ AlertValueAndStatus vs = new AlertValueAndStatus(valTup, fired);
+ return vs;
+ }
+
+ public static void parseAlert(String alert, StringBuilder statsName,
+ Map<String,String> alertFields) throws HelixException
+ {
+ alert = alert.replaceAll("\\s+", ""); //remove white space
+ AlertParser.validateAlert(alert);
+ //alertFields = new HashMap<String,String>();
+ alertFields.put(AlertParser.EXPRESSION_NAME,
+ AlertParser.getComponent(AlertParser.EXPRESSION_NAME, alert));
+ alertFields.put(AlertParser.COMPARATOR_NAME,
+ AlertParser.getComponent(AlertParser.COMPARATOR_NAME, alert));
+ alertFields.put(AlertParser.CONSTANT_NAME,
+ AlertParser.getComponent(AlertParser.CONSTANT_NAME, alert));
+ try
+ {
+ alertFields.put(AlertParser.ACTION_NAME,
+ AlertParser.getComponent(AlertParser.ACTION_NAME, alert));
+ }
+ catch(Exception e)
+ {
+ logger.info("No action specified in " + alert);
+ }
+ statsName.append(alertFields.get(AlertParser.EXPRESSION_NAME));
+ }
+
+
+ /*
+ public void evaluateAllAlerts()
+ {
+ for (String alert : _alertsMap.keySet()) {
+ Map<String,String> alertFields = _alertsMap.get(alert);
+ String exp = alertFields.get(AlertParser.EXPRESSION_NAME);
+ String comp = alertFields.get(AlertParser.COMPARATOR_NAME);
+ String con = alertFields.get(AlertParser.CONSTANT_NAME);
+ //TODO: test the fields for null and fail if needed
+
+ AlertProcessor.execute(exp, comp, con, sh);
+ }
+ }
+ */
+
+ public List<Alert> getAlertList()
+ {
+ List<Alert> alerts = new LinkedList<Alert>();
+ for (String alert : _alertsMap.keySet()) {
+ Map<String,String> alertFields = _alertsMap.get(alert);
+ String exp = alertFields.get(AlertParser.EXPRESSION_NAME);
+ String comp = alertFields.get(AlertParser.COMPARATOR_NAME);
+ Tuple<String> con = Tuple.fromString(alertFields.get(AlertParser.CONSTANT_NAME));
+ //TODO: test the fields for null and fail if needed
+
+ Alert a = new Alert(alert, exp, comp, con);
+ alerts.add(a);
+ }
+ return alerts;
+ }
+
+ public void updateCache(HealthDataCache cache)
+ {
+ _cache = cache;
+ Alerts alertsRecord = _cache.getAlerts();
+ if (alertsRecord != null)
+ {
+ _alertsMap = alertsRecord.getMapFields();
+ }
+ else
+ {
+ _alertsMap = new HashMap<String, Map<String,String>>();
+ }
+ }
+
+ public Map<String, Map<String,String>> getAlertsMap()
+ {
+ return _alertsMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/DecayAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/DecayAggregator.java b/helix-core/src/main/java/org/apache/helix/alerts/DecayAggregator.java
new file mode 100644
index 0000000..e46415c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/DecayAggregator.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Iterator;
+
+import org.apache.helix.HelixException;
+
+
+public class DecayAggregator extends Aggregator {
+
+ double _decayWeight;
+
+ public DecayAggregator(double weight)
+ {
+ _decayWeight = weight;
+ }
+
+ public DecayAggregator()
+ {
+ _numArgs = 1;
+ }
+
+ @Override
+ public void merge(Tuple<String> currValTup, Tuple<String> newValTup,
+ Tuple<String> currTimeTup, Tuple<String> newTimeTup, String... args) {
+
+ _decayWeight = Double.parseDouble(args[0]);
+
+ double currVal = 0;
+ double currTime = -1;
+ double newVal;
+ double newTime;
+ double mergedVal;
+ double mergedTime;
+
+ if (currValTup == null || newValTup == null || currTimeTup == null ||
+ newTimeTup == null) {
+ throw new HelixException("Tuples cannot be null");
+ }
+
+ //old tuples may be empty, indicating no value/time exist
+ if (currValTup.size() > 0 && currTimeTup.size() > 0) {
+ currVal = Double.parseDouble(currValTup.iterator().next());
+ currTime = Double.parseDouble(currTimeTup.iterator().next());
+ }
+ newVal = Double.parseDouble(newValTup.iterator().next());
+ newTime = Double.parseDouble(newTimeTup.iterator().next());
+
+ if (newTime > currTime) { //if old doesn't exist, we end up here
+ mergedVal = (1-_decayWeight)*currVal+_decayWeight*newVal; //if old doesn't exist, it has value "0"
+ mergedTime = newTime;
+ }
+ else {
+ mergedVal = currVal;
+ mergedTime = currTime;
+ }
+
+ currValTup.clear();
+ currValTup.add(Double.toString(mergedVal));
+ currTimeTup.clear();
+ currTimeTup.add(Double.toString(mergedTime));
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/DivideOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/DivideOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/DivideOperator.java
new file mode 100644
index 0000000..d7fa0b1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/DivideOperator.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class DivideOperator extends Operator {
+
+ public DivideOperator() {
+ minInputTupleLists = 2;
+ maxInputTupleLists = 2;
+ inputOutputTupleListsCountsEqual = false;
+ numOutputTupleLists = 1;
+ }
+
+ @Override
+ public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/ExpandOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/ExpandOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/ExpandOperator.java
new file mode 100644
index 0000000..82429f3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/ExpandOperator.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class ExpandOperator extends Operator {
+
+ public ExpandOperator() {
+ minInputTupleLists = 1;
+ maxInputTupleLists = Integer.MAX_VALUE;
+ inputOutputTupleListsCountsEqual = true;
+ }
+
+ @Override
+ public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
+ //TODO: confirm this is a no-op operator
+ return input;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/ExpressionOperatorType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/ExpressionOperatorType.java b/helix-core/src/main/java/org/apache/helix/alerts/ExpressionOperatorType.java
new file mode 100644
index 0000000..e722195
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/ExpressionOperatorType.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+public enum ExpressionOperatorType {
+ //each
+ EACH(true),
+ //standard math
+ SUM(false),
+ MULTIPLY(false),
+ SUBTRACT(false),
+ DIVIDE(false),
+ //aggregation types
+ ACCUMULATE(true),
+ DECAY(false),
+ WINDOW(false);
+
+ boolean isBase;
+
+ private ExpressionOperatorType(boolean isBase)
+ {
+ this.isBase = isBase;
+ }
+
+ boolean isBaseOp()
+ {
+ return isBase;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java b/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java
new file mode 100644
index 0000000..10e5a0c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/ExpressionParser.java
@@ -0,0 +1,579 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.helix.HelixException;
+import org.apache.log4j.Logger;
+
+
+public class ExpressionParser
+{
+ private static Logger logger = Logger.getLogger(ExpressionParser.class);
+
+ final static String opDelim = "|";
+ final static String opDelimForSplit = "\\|";
+ final static String argDelim = ",";
+ final public static String statFieldDelim = ".";
+ final static String wildcardChar = "*";
+
+ // static Map<String, ExpressionOperatorType> operatorMap = new
+ // HashMap<String, ExpressionOperatorType>();
+
+ static Map<String, Operator> operatorMap = new HashMap<String, Operator>();
+ static Map<String, Aggregator> aggregatorMap = new HashMap<String, Aggregator>();
+
+ static
+ {
+
+ addOperatorEntry("EXPAND", new ExpandOperator());
+ addOperatorEntry("DIVIDE", new DivideOperator());
+ addOperatorEntry("SUM", new SumOperator());
+ addOperatorEntry("SUMEACH", new SumEachOperator());
+
+ addAggregatorEntry("ACCUMULATE", new AccumulateAggregator());
+ addAggregatorEntry("DECAY", new DecayAggregator());
+ addAggregatorEntry("WINDOW", new WindowAggregator());
+ /*
+ * addEntry("EACH", ExpressionOperatorType.EACH); addEntry("SUM",
+ * ExpressionOperatorType.SUM); addEntry("DIVIDE",
+ * ExpressionOperatorType.DIVIDE); addEntry("ACCUMULATE",
+ * ExpressionOperatorType.ACCUMULATE);
+ */
+ }
+
+ // static Pattern pattern = Pattern.compile("(\\{.+?\\})");
+
+ private static void addOperatorEntry(String label, Operator op)
+ {
+ if (!operatorMap.containsKey(label))
+ {
+ operatorMap.put(label, op);
+ }
+ logger.info("Adding operator: " + op);
+ }
+
+ private static void addAggregatorEntry(String label, Aggregator agg)
+ {
+ if (!aggregatorMap.containsKey(label.toUpperCase()))
+ {
+ aggregatorMap.put(label.toUpperCase(), agg);
+ }
+ logger.info("Adding aggregator: " + agg);
+ }
+
+ /*
+ * private static void addEntry(String label, ExpressionOperatorType type) {
+ * if (!operatorMap.containsKey(label)) { operatorMap.put(label, type); }
+ * logger.info("Adding operator type: "+type); }
+ */
+
+ public static boolean isExpressionNested(String expression)
+ {
+ return expression.contains("(");
+ }
+
+ /*
+ * public static Operator getOperatorType(String expression) throws Exception
+ * { String op = expression.substring(0,expression.indexOf("(")); if
+ * (!operatorMap.containsKey(op)) { throw new
+ * Exception(op+" is not a valid op type"); } return operatorMap.get(op); }
+ */
+
+ public static String getInnerExpression(String expression)
+ {
+ return expression.substring(expression.indexOf("(") + 1,
+ expression.lastIndexOf(")"));
+ }
+
+ /*
+ * public static String[] getBaseStats(ExpressionOperatorType type, String
+ * expression) throws Exception { String[] items = null; if
+ * (isExpressionNested(expression)) { ExpressionOperatorType nextType =
+ * getOperatorType(expression); String innerExp =
+ * getInnerExpression(expression); items = getBaseStats(nextType, innerExp); }
+ * else { //base class, no nesting items = expression.split(","); }
+ *
+ * if (type != null && type.isBaseOp()) { //surround items with type. for (int
+ * i=0; i<items.length; i++) { items[i] = type + "(" + items[i] + ")"; //!!!!
+ * NEED type to behave like string here
+ * logger.debug("Forming item "+items[i]); } } return items; }
+ *
+ * public static String[] getBaseStats(String expression) throws Exception {
+ * expression = expression.replaceAll("\\s+", ""); return getBaseStats(null,
+ * expression); }
+ */
+
+ /*
+ * Validate 2 sets of parenthesis exist, all before first opDelim
+ *
+ * extract agg type and validate it exists. validate number of args passed in
+ */
+ public static void validateAggregatorFormat(String expression)
+ throws HelixException
+ {
+ logger.debug("validating aggregator for expression: " + expression);
+ // have 0 or more args, 1 or more stats...e.g. ()(x) or (2)(x,y)
+ Pattern pattern = Pattern.compile("\\(.*?\\)");
+ Matcher matcher = pattern.matcher(expression);
+ String aggComponent = null;
+ String statComponent = null;
+ int lastMatchEnd = -1;
+ if (matcher.find())
+ {
+ aggComponent = matcher.group();
+ aggComponent = aggComponent.substring(1, aggComponent.length() - 1);
+ if (aggComponent.contains(")") || aggComponent.contains("("))
+ {
+ throw new HelixException(expression
+ + " has invalid aggregate component");
+ }
+ }
+ else
+ {
+ throw new HelixException(expression + " has invalid aggregate component");
+ }
+ if (matcher.find())
+ {
+ statComponent = matcher.group();
+ statComponent = statComponent.substring(1, statComponent.length() - 1);
+ // statComponent must have at least 1 arg between paren
+ if (statComponent.contains(")") || statComponent.contains("(")
+ || statComponent.length() == 0)
+ {
+ throw new HelixException(expression + " has invalid stat component");
+ }
+ lastMatchEnd = matcher.end();
+ }
+ else
+ {
+ throw new HelixException(expression + " has invalid stat component");
+ }
+ if (matcher.find())
+ {
+ throw new HelixException(expression
+ + " has too many parenthesis components");
+ }
+
+ if (expression.length() >= lastMatchEnd + 1)
+ { // lastMatchEnd is pos 1 past the pattern. check if there are paren there
+ if (expression.substring(lastMatchEnd).contains("(")
+ || expression.substring(lastMatchEnd).contains(")"))
+ {
+ throw new HelixException(expression + " has extra parenthesis");
+ }
+ }
+
+ // check wildcard locations. each part can have at most 1 wildcard, and must
+ // be at end
+ // String expStatNamePart = expression.substring(expression.)
+ StringTokenizer fieldTok = new StringTokenizer(statComponent,
+ statFieldDelim);
+ while (fieldTok.hasMoreTokens())
+ {
+ String currTok = fieldTok.nextToken();
+ if (currTok.contains(wildcardChar))
+ {
+ if (currTok.indexOf(wildcardChar) != currTok.length() - 1
+ || currTok.lastIndexOf(wildcardChar) != currTok.length() - 1)
+ {
+ throw new HelixException(currTok
+ + " is illegal stat name. Single wildcard must appear at end.");
+ }
+ }
+ }
+ }
+
+ public static boolean statContainsWildcards(String stat)
+ {
+ return stat.contains(wildcardChar);
+ }
+
+ /*
+ * Return true if stat name matches exactly...incomingStat has no agg type
+ * currentStat can have any
+ *
+ * Function can match for 2 cases extractStatFromAgg=false. Match
+ * accumulate()(dbFoo.partition10.latency) with
+ * accumulate()(dbFoo.partition10.latency)...trival extractStatFromAgg=true.
+ * Match accumulate()(dbFoo.partition10.latency) with
+ * dbFoo.partition10.latency
+ */
+ public static boolean isExactMatch(String currentStat, String incomingStat,
+ boolean extractStatFromAgg)
+ {
+ String currentStatName = currentStat;
+ if (extractStatFromAgg)
+ {
+ currentStatName = getSingleAggregatorStat(currentStat);
+ }
+ return (incomingStat.equals(currentStatName));
+ }
+
+ /*
+ * Return true if incomingStat matches wildcardStat except currentStat has 1+
+ * fields with "*" a*.c* matches a5.c7 a*.c* does not match a5.b6.c7
+ *
+ * Function can match for 2 cases extractStatFromAgg=false. Match
+ * accumulate()(dbFoo.partition*.latency) with
+ * accumulate()(dbFoo.partition10.latency) extractStatFromAgg=true. Match
+ * accumulate()(dbFoo.partition*.latency) with dbFoo.partition10.latency
+ */
+ public static boolean isWildcardMatch(String currentStat,
+ String incomingStat, boolean statCompareOnly, ArrayList<String> bindings)
+ {
+ if (!statCompareOnly)
+ { // need to check for match on agg type and stat
+ String currentStatAggType = (currentStat.split("\\)"))[0];
+ String incomingStatAggType = (incomingStat.split("\\)"))[0];
+ if (!currentStatAggType.equals(incomingStatAggType))
+ {
+ return false;
+ }
+ }
+ // now just get the stats
+ String currentStatName = getSingleAggregatorStat(currentStat);
+ String incomingStatName = getSingleAggregatorStat(incomingStat);
+
+ if (!currentStatName.contains(wildcardChar))
+ { // no wildcards in stat name
+ return false;
+ }
+
+ String currentStatNamePattern = currentStatName.replace(".", "\\.");
+ currentStatNamePattern = currentStatNamePattern.replace("*", ".*");
+ boolean result = Pattern.matches(currentStatNamePattern, incomingStatName);
+ if(result && bindings != null)
+ {
+ bindings.add(incomingStatName);
+ }
+ return result;
+ /*
+ StringTokenizer currentStatTok = new StringTokenizer(currentStatName,
+ statFieldDelim);
+ StringTokenizer incomingStatTok = new StringTokenizer(incomingStatName,
+ statFieldDelim);
+ if (currentStatTok.countTokens() != incomingStatTok.countTokens())
+ { // stat names different numbers of fields
+ return false;
+ }
+ // for each token, if not wildcarded, must be an exact match
+ while (currentStatTok.hasMoreTokens())
+ {
+ String currTok = currentStatTok.nextToken();
+ String incomingTok = incomingStatTok.nextToken();
+ logger.debug("curTok: " + currTok);
+ logger.debug("incomingTok: " + incomingTok);
+ if (!currTok.contains(wildcardChar))
+ { // no wildcard, but have exact match
+ if (!currTok.equals(incomingTok))
+ { // not exact match
+ return false;
+ }
+ }
+ else
+ { // currTok has a wildcard
+ if (currTok.indexOf(wildcardChar) != currTok.length() - 1
+ || currTok.lastIndexOf(wildcardChar) != currTok.length() - 1)
+ {
+ throw new HelixException(currTok
+ + " is illegal stat name. Single wildcard must appear at end.");
+ }
+ // for wildcard matching, need to escape parentheses on currTok, so
+ // regex works
+ // currTok = currTok.replace("(", "\\(");
+ // currTok = currTok.replace(")", "\\)");
+ // incomingTok = incomingTok.replace("(", "\\(");
+ // incomingTok = incomingTok.replace(")", "\\)");
+ String currTokPreWildcard = currTok.substring(0, currTok.length() - 1);
+ // TODO: if current token has a "(" in it, pattern compiling throws
+ // error
+ // Pattern pattern = Pattern.compile(currTokPreWildcard+".+"); //form
+ // pattern...wildcard part can be anything
+ // Matcher matcher = pattern.matcher(incomingTok); //see if incomingTok
+ // matches
+ if (incomingTok.indexOf(currTokPreWildcard) != 0)
+ {
+ // if (!matcher.find()) { //no match on one tok, return false
+ return false;
+ }
+ // get the binding
+
+ if (bindings != null)
+ {
+ // TODO: debug me!
+ String wildcardBinding = incomingTok.substring(incomingTok
+ .indexOf(currTokPreWildcard) + currTokPreWildcard.length());
+ bindings.add(wildcardBinding);
+ }
+ }
+ }
+ // all fields match or wildcard match...return true!
+ return true;*/
+ }
+
+ /*
+ * For checking if an incoming stat (no agg type defined) matches a persisted
+ * stat (with agg type defined)
+ */
+ public static boolean isIncomingStatExactMatch(String currentStat,
+ String incomingStat)
+ {
+ return isExactMatch(currentStat, incomingStat, true);
+ }
+
+ /*
+ * For checking if an incoming stat (no agg type defined) wildcard matches a
+ * persisted stat (with agg type defined) The persisted stat may have
+ * wildcards
+ */
+ public static boolean isIncomingStatWildcardMatch(String currentStat,
+ String incomingStat)
+ {
+ return isWildcardMatch(currentStat, incomingStat, true, null);
+ }
+
+ /*
+ * For checking if a persisted stat matches a stat defined in an alert
+ */
+ public static boolean isAlertStatExactMatch(String alertStat,
+ String currentStat)
+ {
+ return isExactMatch(alertStat, currentStat, false);
+ }
+
+ /*
+ * For checking if a maintained stat wildcard matches a stat defined in an
+ * alert. The alert may have wildcards
+ */
+ public static boolean isAlertStatWildcardMatch(String alertStat,
+ String currentStat, ArrayList<String> wildcardBindings)
+ {
+ return isWildcardMatch(alertStat, currentStat, false, wildcardBindings);
+ }
+
+ public static Aggregator getAggregator(String aggStr) throws HelixException
+ {
+ aggStr = aggStr.toUpperCase();
+ Aggregator agg = aggregatorMap.get(aggStr);
+ if (agg == null)
+ {
+ throw new HelixException("Unknown aggregator type " + aggStr);
+ }
+ return agg;
+ }
+
+ public static String getAggregatorStr(String expression)
+ throws HelixException
+ {
+ if (!expression.contains("("))
+ {
+ throw new HelixException(expression
+ + " does not contain a valid aggregator. No parentheses found");
+ }
+ String aggName = expression.substring(0, expression.indexOf("("));
+ if (!aggregatorMap.containsKey(aggName.toUpperCase()))
+ {
+ throw new HelixException("aggregator <" + aggName + "> is unknown type");
+ }
+ return aggName;
+ }
+
+ public static String[] getAggregatorArgs(String expression)
+ throws HelixException
+ {
+ String aggregator = getAggregatorStr(expression);
+ String argsStr = getAggregatorArgsStr(expression);
+ String[] args = argsStr.split(argDelim);
+ logger.debug("args size: " + args.length);
+ int numArgs = (argsStr.length() == 0) ? 0 : args.length;
+ // String[] argList = (expression.substring(expression.indexOf("(")+1,
+ // expression.indexOf(")"))).split(argDelim);
+ // verify correct number of args
+ int requiredNumArgs = aggregatorMap.get(aggregator.toUpperCase())
+ .getRequiredNumArgs();
+ if (numArgs != requiredNumArgs)
+ {
+ throw new HelixException(expression + " contains " + args.length
+ + " arguments, but requires " + requiredNumArgs);
+ }
+ return args;
+ }
+
+ /*
+ * public static String[] getAggregatorArgsList(String expression) { String
+ * argsStr = getAggregatorArgsStr(expression); String[] args =
+ * argsStr.split(argDelim); return args; }
+ */
+
+ public static String getAggregatorArgsStr(String expression)
+ {
+ return expression.substring(expression.indexOf("(") + 1,
+ expression.indexOf(")"));
+ }
+
+ public static String[] getAggregatorStats(String expression)
+ throws HelixException
+ {
+ String justStats = expression;
+ if (expression.contains("(") && expression.contains(")"))
+ {
+ justStats = (expression.substring(expression.lastIndexOf("(") + 1,
+ expression.lastIndexOf(")")));
+ }
+ String[] statList = justStats.split(argDelim);
+ if (statList.length < 1)
+ {
+ throw new HelixException(expression
+ + " does not contain any aggregator stats");
+ }
+ return statList;
+ }
+
+ public static String getSingleAggregatorStat(String expression)
+ throws HelixException
+ {
+ String[] stats = getAggregatorStats(expression);
+ if (stats.length > 1)
+ {
+ throw new HelixException(expression + " contains more than 1 stat");
+ }
+ return stats[0];
+ }
+
+ public static String getWildcardStatSubstitution(String wildcardStat,
+ String fixedStat)
+ {
+ int lastOpenParenLoc = wildcardStat.lastIndexOf("(");
+ int lastCloseParenLoc = wildcardStat.lastIndexOf(")");
+ StringBuilder builder = new StringBuilder();
+ builder.append(wildcardStat.substring(0, lastOpenParenLoc + 1));
+ builder.append(fixedStat);
+ builder.append(")");
+ logger.debug("wildcardStat: " + wildcardStat);
+ logger.debug("fixedStat: " + fixedStat);
+ logger.debug("subbedStat: " + builder.toString());
+ return builder.toString();
+ }
+
+ // XXX: each op type should have number of inputs, number of outputs. do
+ // validation.
+ // (dbFoo.partition*.latency, dbFoo.partition*.count)|EACH|ACCUMULATE|DIVIDE
+ public static String[] getBaseStats(String expression) throws HelixException
+ {
+ expression = expression.replaceAll("\\s+", "");
+ validateAggregatorFormat(expression);
+
+ String aggName = getAggregatorStr(expression);
+ String[] aggArgs = getAggregatorArgs(expression);
+ String[] aggStats = getAggregatorStats(expression);
+
+ // form aggArgs
+ String aggArgList = getAggregatorArgsStr(expression);
+
+ String[] baseStats = new String[aggStats.length];
+ for (int i = 0; i < aggStats.length; i++)
+ {
+ StringBuilder stat = new StringBuilder();
+ stat.append(aggName);
+ stat.append("(");
+ stat.append(aggArgList);
+ stat.append(")");
+ stat.append("(");
+ stat.append(aggStats[i]);
+ stat.append(")");
+ baseStats[i] = stat.toString();
+ }
+ return baseStats;
+ }
+
+ public static String[] getOperators(String expression) throws HelixException
+ {
+ String[] ops = null;
+ int numAggStats = (getAggregatorStats(expression)).length;
+ int opDelimLoc = expression.indexOf(opDelim);
+ if (opDelimLoc < 0)
+ {
+ return null;
+ }
+ logger.debug("ops str: " + expression.substring(opDelimLoc + 1));
+ ops = expression.substring(opDelimLoc + 1).split(opDelimForSplit);
+
+ // validate this string of ops
+ // verify each op exists
+ // take num input tuples sets and verify ops will output exactly 1 tuple
+ // sets
+ int currNumTuples = numAggStats;
+ for (String op : ops)
+ {
+ logger.debug("op: " + op);
+ if (!operatorMap.containsKey(op.toUpperCase()))
+ {
+ throw new HelixException("<" + op + "> is not a valid operator type");
+ }
+ Operator currOpType = operatorMap.get(op.toUpperCase());
+ if (currNumTuples < currOpType.minInputTupleLists
+ || currNumTuples > currOpType.maxInputTupleLists)
+ {
+ throw new HelixException("<" + op + "> cannot process " + currNumTuples
+ + " input tuples");
+ }
+ // reset num tuples to this op's output size
+ if (!currOpType.inputOutputTupleListsCountsEqual)
+ { // if equal, this number does not change
+ currNumTuples = currOpType.numOutputTupleLists;
+ }
+ }
+ if (currNumTuples != 1)
+ {
+ throw new HelixException(expression
+ + " does not terminate in a single tuple set");
+ }
+ return ops;
+ }
+
+ public static void validateOperators(String expression) throws HelixException
+ {
+ getOperators(expression);
+ }
+
+ public static Operator getOperator(String opName) throws HelixException
+ {
+ if (!operatorMap.containsKey(opName))
+ {
+ throw new HelixException(opName + " is unknown op type");
+ }
+ return operatorMap.get(opName);
+ }
+
+ public static void validateExpression(String expression)
+ throws HelixException
+ {
+ // 1. extract stats part and validate
+ validateAggregatorFormat(expression);
+ // 2. extract ops part and validate the ops exist and the inputs/outputs are
+ // correct
+ validateOperators(expression);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java b/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
new file mode 100644
index 0000000..72be10b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.Iterator;
+
+public class GreaterAlertComparator extends AlertComparator {
+
+ @Override
+ /*
+ * Returns true if any element left tuple exceeds any element in right tuple
+ */
+ public boolean evaluate(Tuple<String> leftTup, Tuple<String> rightTup) {
+ Iterator<String> leftIter = leftTup.iterator();
+ while (leftIter.hasNext()) {
+ double leftVal = Double.parseDouble(leftIter.next());
+ Iterator<String> rightIter = rightTup.iterator();
+ while (rightIter.hasNext()) {
+ double rightVal = Double.parseDouble(rightIter.next());
+ if (leftVal > rightVal) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
new file mode 100644
index 0000000..0cd9623
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.alerts;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class MultiplyOperator extends Operator {
+
+ public MultiplyOperator() {
+ minInputTupleLists = 1;
+ maxInputTupleLists = Integer.MAX_VALUE;
+ inputOutputTupleListsCountsEqual = false;
+ numOutputTupleLists = 1;
+ }
+
+
+ public List<Iterator<Tuple<String>>> singleSetToIter(ArrayList<Tuple<String>> input)
+ {
+ List out = new ArrayList();
+ out.add(input.iterator());
+ return out;
+ }
+
+
+
+ @Override
+ public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
+ ArrayList<Tuple<String>> output = new ArrayList<Tuple<String>>();
+ if (input == null || input.size() == 0) {
+ return singleSetToIter(output);
+ }
+ while (true) { //loop through set of iters, return when 1 runs out (not completing the row in progress)
+ Tuple<String> rowProduct = null;
+ for (Iterator<Tuple<String>> it : input) {
+ if (!it.hasNext()) { //when any iterator runs out, we are done
+ return singleSetToIter(output);
+ }
+ rowProduct = multiplyTuples(rowProduct, it.next());
+ }
+ output.add(rowProduct);
+ }
+ }
+
+}