You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:34:00 UTC

svn commit: r1077597 [2/6] - in /hadoop/common/branches/branch-0.20-security-patches: ./ conf/ ivy/ src/core/org/apache/hadoop/ipc/ src/core/org/apache/hadoop/ipc/metrics/ src/core/org/apache/hadoop/log/ src/core/org/apache/hadoop/metrics/ src/core/org...

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/Consumer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/Consumer.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/Consumer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/Consumer.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+/**
+ * A simple generic consumer interface
+ */
+interface Consumer<T> {
+  void consume(T object) throws InterruptedException;
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MBeanInfoBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MBeanInfoBuilder.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MBeanInfoBuilder.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MBeanInfoBuilder.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricCounter;
+import org.apache.hadoop.metrics2.MetricGauge;
+import org.apache.hadoop.metrics2.MetricsTag;
+
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+/**
+ * Helper class to build MBeanInfo from metrics records
+ */
+class MBeanInfoBuilder implements MetricsVisitor {
+
+  private final String name, description;
+  private List<MBeanAttributeInfo> attrs;
+  private Iterable<MetricsRecordImpl> recs;
+  private int curRecNo;
+
+  MBeanInfoBuilder(String name, String desc) {
+    this.name = name;
+    description = desc;
+    attrs = new ArrayList<MBeanAttributeInfo>();
+  }
+
+  MBeanInfoBuilder reset(Iterable<MetricsRecordImpl> recs) {
+    this.recs = recs;
+    attrs.clear();
+    return this;
+  }
+
+  MBeanAttributeInfo newAttrInfo(String name, String desc, String type) {
+    return new MBeanAttributeInfo(getAttrName(name), type, desc,
+                                  true, false, false); // read-only, non-is
+  }
+
+  MBeanAttributeInfo newAttrInfo(Metric m, String type) {
+    return newAttrInfo(m.name(), m.description(), type);
+  }
+
+  public void gauge(MetricGauge<Integer> metric, int value) {
+    attrs.add(newAttrInfo(metric, "java.lang.Integer"));
+  }
+
+  public void gauge(MetricGauge<Long> metric, long value) {
+    attrs.add(newAttrInfo(metric, "java.lang.Long"));
+  }
+
+  public void gauge(MetricGauge<Float> metric, float value) {
+    attrs.add(newAttrInfo(metric, "java.lang.Float"));
+  }
+
+  public void gauge(MetricGauge<Double> metric, double value) {
+    attrs.add(newAttrInfo(metric, "java.lang.Double"));
+  }
+
+  public void counter(MetricCounter<Integer> metric, int value) {
+    attrs.add(newAttrInfo(metric, "java.lang.Integer"));
+  }
+
+  public void counter(MetricCounter<Long> metric, long value) {
+    attrs.add(newAttrInfo(metric, "java.lang.Long"));
+  }
+
+  String getAttrName(String name) {
+    return curRecNo > 0 ? name +"."+ curRecNo : name;
+  }
+
+  MBeanInfo get() {
+    curRecNo = 0;
+    for (MetricsRecordImpl rec : recs) {
+      for (MetricsTag t : rec.tags()) {
+        attrs.add(newAttrInfo("tag."+ t.name(), t.description(),
+                  "java.lang.String"));
+      }
+      for (Metric m : rec.metrics()) {
+        m.visit(this);
+      }
+      ++curRecNo;
+    }
+    MBeanAttributeInfo[] attrsArray = new MBeanAttributeInfo[attrs.size()];
+    return new MBeanInfo(name, description, attrs.toArray(attrsArray),
+                         null, null, null); // no ops/ctors/notifications
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterInt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterInt.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterInt.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterInt.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricCounter;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+class MetricCounterInt extends MetricCounter<Integer> {
+
+  final int value;
+
+  MetricCounterInt(String name, String description, int value) {
+    super(name, description);
+    this.value = value;
+  }
+
+  public Integer value() {
+    return value;
+  }
+
+  public void visit(MetricsVisitor visitor) {
+    visitor.counter(this, value);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterLong.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterLong.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterLong.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterLong.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricCounter;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+class MetricCounterLong extends MetricCounter<Long> {
+
+  final long value;
+
+  MetricCounterLong(String name, String description, long value) {
+    super(name, description);
+    this.value = value;
+  }
+
+  public Long value() {
+    return value;
+  }
+
+  public void visit(MetricsVisitor visitor) {
+    visitor.counter(this, value);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeDouble.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeDouble.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeDouble.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeDouble.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricGauge;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+class MetricGaugeDouble extends MetricGauge<Double> {
+
+  final double value;
+
+  MetricGaugeDouble(String name, String description, double value) {
+    super(name, description);
+    this.value = value;
+  }
+
+  public Double value() {
+    return value;
+  }
+
+  public void visit(MetricsVisitor visitor) {
+    visitor.gauge(this, value);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeFloat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeFloat.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeFloat.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeFloat.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricGauge;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+class MetricGaugeFloat extends MetricGauge<Float> {
+
+  final float value;
+
+  MetricGaugeFloat(String name, String description, float value) {
+    super(name, description);
+    this.value = value;
+  }
+
+  public Float value() {
+    return value;
+  }
+
+  public void visit(MetricsVisitor visitor) {
+    visitor.gauge(this, value);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeInt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeInt.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeInt.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeInt.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricGauge;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+class MetricGaugeInt extends MetricGauge<Integer> {
+
+  final int value;
+
+  MetricGaugeInt(String name, String description, int value) {
+    super(name, description);
+    this.value = value;
+  }
+
+  public Integer value() {
+    return value;
+  }
+
+  public void visit(MetricsVisitor visitor) {
+    visitor.gauge(this, value);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricGauge;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+class MetricGaugeLong extends MetricGauge<Long> {
+
+  final long value;
+
+  MetricGaugeLong(String name, String description, long value) {
+    super(name, description);
+    this.value = value;
+  }
+
+  public Long value() {
+    return value;
+  }
+
+  public void visit(MetricsVisitor visitor) {
+    visitor.gauge(this, value);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuffer.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuffer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuffer.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.Iterator;
+
+/**
+ * An immutable element for the sink queues.
+ */
+class MetricsBuffer implements Iterable<MetricsBuffer.Entry> {
+
+  private final Iterable<Entry> mutable;
+
+  MetricsBuffer(Iterable<MetricsBuffer.Entry> mutable) {
+    this.mutable = mutable;
+  }
+
+  public Iterator<Entry> iterator() {
+    return mutable.iterator();
+  }
+
+  static class Entry {
+    private final String sourceName;
+    private final Iterable<MetricsRecordImpl> records;
+
+    Entry(String name, Iterable<MetricsRecordImpl> records) {
+      sourceName = name;
+      this.records = records;
+    }
+
+    String name() {
+      return sourceName;
+    }
+
+    Iterable<MetricsRecordImpl> records() {
+      return records;
+    }
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.ArrayList;
+
+/**
+ * Builder for the immutable metrics buffers
+ */
+class MetricsBufferBuilder extends ArrayList<MetricsBuffer.Entry> {
+  private static final long serialVersionUID = 1L;
+
+  boolean add(String name, Iterable<MetricsRecordImpl> records) {
+    return add(new MetricsBuffer.Entry(name, records));
+  }
+
+  MetricsBuffer get() {
+    return new MetricsBuffer(this);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuilderImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuilderImpl.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuilderImpl.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuilderImpl.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsFilter;
+
+class MetricsBuilderImpl extends ArrayList<MetricsRecordBuilderImpl>
+                         implements MetricsBuilder {
+  private static final long serialVersionUID = 1L;
+  private MetricsFilter recordFilter, metricFilter;
+
+  @Override
+  public MetricsRecordBuilderImpl addRecord(String name) {
+    boolean acceptable = recordFilter == null || recordFilter.accepts(name);
+    MetricsRecordBuilderImpl rb =
+        new MetricsRecordBuilderImpl(name, recordFilter, metricFilter,
+                                     acceptable);
+    if (acceptable) {
+      add(rb);
+    }
+    return rb;
+  }
+
+
+  public List<MetricsRecordImpl> getRecords() {
+    List<MetricsRecordImpl> records =
+        new ArrayList<MetricsRecordImpl>(size());
+    for (MetricsRecordBuilderImpl rb : this) {
+      MetricsRecordImpl mr = rb.getRecord();
+      if (mr != null) {
+        records.add(mr);
+      }
+    }
+    return records;
+  }
+
+  MetricsBuilderImpl setRecordFilter(MetricsFilter rf) {
+    recordFilter = rf;
+    return this;
+  }
+
+  MetricsBuilderImpl setMetricFilter(MetricsFilter mf) {
+    metricFilter = mf;
+    return this;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfig.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfig.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfig.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsPlugin;
+
+import org.apache.hadoop.util.StringUtils;
+
+class MetricsConfig extends SubsetConfiguration {
+
+  static final Log LOG = LogFactory.getLog(MetricsConfig.class);
+
+  static final String DEFAULT_FILE_NAME = "hadoop-metrics2.properties";
+  static final String PREFIX_DEFAULT = "*.";
+
+  static final String PERIOD_KEY = "period";
+  static final int PERIOD_DEFAULT = 10; // seconds
+
+  static final String QUEUE_CAPACITY_KEY = "queue.capacity";
+  static final int QUEUE_CAPACITY_DEFAULT = 1;
+
+  static final String RETRY_DELAY_KEY = "retry.delay";
+  static final int RETRY_DELAY_DEFAULT = 10;  // seconds
+  static final String RETRY_BACKOFF_KEY = "retry.backoff";
+  static final int RETRY_BACKOFF_DEFAULT = 2; // back off factor
+  static final String RETRY_COUNT_KEY = "retry.count";
+  static final int RETRY_COUNT_DEFAULT = 3;
+
+  static final String JMX_CACHE_TTL_KEY = "jmx.cache.ttl";
+  static final int JMX_CACHE_TTL_DEFAULT = 10000; // millis
+
+  static final String CONTEXT_KEY = "context";
+  static final String NAME_KEY = "name";
+  static final String DESC_KEY = "description";
+  static final String SOURCE_KEY = "source";
+  static final String SINK_KEY = "sink";
+  static final String METRIC_FILTER_KEY = "metric.filter";
+  static final String RECORD_FILTER_KEY = "record.filter";
+  static final String SOURCE_FILTER_KEY = "source.filter";
+
+  static final Pattern INSTANCE_REGEX = Pattern.compile("([^.*]+)\\..+");
+
+  MetricsConfig(Configuration c, String prefix) {
+    super(c, prefix.toLowerCase(Locale.US), ".");
+  }
+
+  static MetricsConfig create(String prefix) {
+    return loadFirst(prefix, "hadoop-metrics2-"+ prefix.toLowerCase(Locale.US)
+                     +".properties", DEFAULT_FILE_NAME);
+  }
+
+  static MetricsConfig create(String prefix, String... fileNames) {
+    return loadFirst(prefix, fileNames);
+  }
+
+  /**
+   * Load configuration from a list of files until the first successful load
+   * @param conf  the configuration object
+   * @param files the list of filenames to try
+   * @return  the configuration object
+   */
+  static MetricsConfig loadFirst(String prefix, String... fileNames) {
+    for (String fname : fileNames) {
+      try {
+        PropertiesConfiguration cf = new PropertiesConfiguration(fname);
+        LOG.info("loaded properties from "+ fname);
+        return new MetricsConfig(cf, prefix);
+      }
+      catch (ConfigurationException e) {
+        if (e.getMessage().startsWith("Cannot locate configuration")) {
+          continue;
+        }
+        throw new MetricsConfigException(e);
+      }
+    }
+    throw new MetricsConfigException("Cannot locate configuration: tried "+
+        StringUtils.join(", ", fileNames));
+  }
+
+  @Override
+  public MetricsConfig subset(String prefix) {
+    return new MetricsConfig(this, prefix);
+  }
+
+  /**
+   * Return sub configs for instance specified in the config.
+   * Assuming format specified as follows:<pre>
+   * [type].[instance].[option] = [value]</pre>
+   * Note, '*' is a special default instance, which is excluded in the result.
+   * @param type  of the instance
+   * @return  a map with [instance] as key and config object as value
+   */
+  Map<String, MetricsConfig> getInstanceConfigs(String type) {
+    HashMap<String, MetricsConfig> map = new HashMap<String, MetricsConfig>();
+    MetricsConfig sub = subset(type);
+
+    for (String key : sub.keys()) {
+      Matcher matcher = INSTANCE_REGEX.matcher(key);
+      if (matcher.matches()) {
+        String instance = matcher.group(1);
+        if (!map.containsKey(instance)) {
+          map.put(instance, sub.subset(instance));
+        }
+      }
+    }
+    return map;
+  }
+
+  Iterable<String> keys() {
+    return new Iterable<String>() {
+      @SuppressWarnings("unchecked")
+      public Iterator<String> iterator() {
+        return (Iterator<String>) getKeys();
+      }
+    };
+  }
+
+  /**
+   * Will poke parents for defaults
+   * @param key to lookup
+   * @return  the value or null
+   */
+  @Override
+  public Object getProperty(String key) {
+    Object value = super.getProperty(key);
+    if (value == null) {
+      LOG.debug("poking parent "+ getParent().getClass().getSimpleName() +
+                " for "+ key);
+      return getParent().getProperty(key.startsWith(PREFIX_DEFAULT) ? key
+                                     : PREFIX_DEFAULT + key);
+    }
+    return value;
+  }
+
+  <T extends MetricsPlugin> T getPlugin(String name) {
+    String classKey = name.isEmpty() ? "class" : name +".class";
+    String pluginClassName = getString(classKey);
+    if (pluginClassName == null || pluginClassName.isEmpty()) {
+      return null;
+    }
+    try {
+      Class<?> pluginClass = Class.forName(pluginClassName);
+      @SuppressWarnings("unchecked")
+      T plugin = (T) pluginClass.newInstance();
+      plugin.init(name.isEmpty() ? this : subset(name));
+      return plugin;
+    }
+    catch (Exception e) {
+      throw new MetricsConfigException("Error creating plugin: "+
+                                       pluginClassName, e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return toString(this);
+  }
+
+  String toString(Configuration c) {
+    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+    PrintStream ps = new PrintStream(buffer);
+    PropertiesConfiguration tmp = new PropertiesConfiguration();
+    tmp.copy(c);
+    try { tmp.save(ps); }
+    catch (Exception e) {
+      throw new MetricsConfigException(e);
+    }
+    return buffer.toString();
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfigException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfigException.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfigException.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfigException.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricsException;
+
+/**
+ *  The metrics configuration runtime exception
+ */
+public class MetricsConfigException extends MetricsException {
+  private static final long serialVersionUID = 1L;
+
+  MetricsConfigException(String message) {
+    super(message);
+  }
+
+  MetricsConfigException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  MetricsConfigException(Throwable cause) {
+    super(cause);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsTag;
+import static org.apache.hadoop.metrics2.lib.MetricsRegistry.*;
+
+class MetricsRecordBuilderImpl extends MetricsRecordBuilder {
+  private final long timestamp;
+  private final String name;
+  private final List<Metric> metrics;
+  private final List<MetricsTag> tags;
+  private final MetricsFilter recordFilter, metricFilter;
+  private final boolean acceptable;
+
+  MetricsRecordBuilderImpl(String name, MetricsFilter rf, MetricsFilter mf,
+                           boolean acceptable) {
+    timestamp = System.currentTimeMillis();
+    this.name = name;
+    metrics = new ArrayList<Metric>();
+    tags = new ArrayList<MetricsTag>();
+    recordFilter = rf;
+    metricFilter = mf;
+    this.acceptable = acceptable;
+  }
+
+  @Override
+  public MetricsRecordBuilder tag(String name, String description,
+                                  String value) {
+    if (acceptable) {
+      tags.add(new MetricsTag(name, description, value));
+    }
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder addCounter(String name, String description,
+                                         int value) {
+    if (acceptable && (metricFilter == null || metricFilter.accepts(name))) {
+      metrics.add(new MetricCounterInt(name, description, value));
+    }
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder addCounter(String name, String description,
+                                         long value) {
+    if (acceptable && (metricFilter == null || metricFilter.accepts(name))) {
+      metrics.add(new MetricCounterLong(name, description, value));
+    }
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(String name, String description,
+                                       int value) {
+    if (acceptable && (metricFilter == null || metricFilter.accepts(name))) {
+      metrics.add(new MetricGaugeInt(name, description, value));
+    }
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(String name, String description,
+                                       long value) {
+    if (acceptable && (metricFilter == null || metricFilter.accepts(name))) {
+      metrics.add(new MetricGaugeLong(name, description, value));
+    }
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(String name, String description,
+                                       float value) {
+    if (acceptable && (metricFilter == null || metricFilter.accepts(name))) {
+      metrics.add(new MetricGaugeFloat(name, description, value));
+    }
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(String name, String description,
+                                       double value) {
+    if (acceptable && (metricFilter == null || metricFilter.accepts(name))) {
+      metrics.add(new MetricGaugeDouble(name, description, value));
+    }
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder add(MetricsTag tag) {
+    tags.add(tag);
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder add(Metric metric) {
+    metrics.add(metric);
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder setContext(String value) {
+    return tag(CONTEXT_KEY, CONTEXT_DESC, value);
+  }
+
+  public MetricsRecordImpl getRecord() {
+    if (acceptable && (recordFilter == null || recordFilter.accepts(tags))) {
+      return new MetricsRecordImpl(name, timestamp, tags(), metrics());
+    }
+    return null;
+  }
+
+  List<MetricsTag> tags() {
+    return Collections.unmodifiableList(tags);
+  }
+
+  List<Metric> metrics() {
+    return Collections.unmodifiableList(metrics);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.Iterator;
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.util.TryIterator;
+
+class MetricsRecordFiltered implements MetricsRecord {
+
+  private final MetricsRecord delegate;
+  private final MetricsFilter filter;
+
+  MetricsRecordFiltered(MetricsRecord delegate, MetricsFilter filter) {
+    this.delegate = delegate;
+    this.filter = filter;
+  }
+
+  public long timestamp() {
+    return delegate.timestamp();
+  }
+
+  public String name() {
+    return delegate.name();
+  }
+
+  public String context() {
+    return delegate.context();
+  }
+
+  public Iterable<MetricsTag> tags() {
+    return delegate.tags();
+  }
+
+  public Iterable<Metric> metrics() {
+    return new Iterable<Metric>() {
+      final Iterator<Metric> it = delegate.metrics().iterator();
+      public Iterator<Metric> iterator() {
+        return new TryIterator<Metric>() {
+          public Metric tryNext() {
+            if (it.hasNext()) do {
+              Metric next = it.next();
+              if (filter.accepts(next.name())) {
+                return next;
+              }
+            } while (it.hasNext());
+            return done();
+          }
+        };
+      }
+    };
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.util.Contracts;
+
+public class MetricsRecordImpl implements MetricsRecord {
+
+  protected static final String CONTEXT_KEY = "context";
+  protected static final String DEFAULT_CONTEXT = "default";
+
+  private final long timestamp;
+  private final String name;
+  private final Iterable<MetricsTag> tags;
+  private final Iterable<Metric> metrics;
+
+  /**
+   * Construct a metrics record
+   * @param name  of the record
+   * @param timestamp of the record
+   * @param tags  of the record
+   * @param metrics of the record
+   */
+  public MetricsRecordImpl(String name, long timestamp,
+                           Iterable<MetricsTag> tags,
+                           Iterable<Metric> metrics) {
+    this.timestamp = Contracts.checkArg(timestamp, timestamp > 0, "timestamp");
+    this.name = Contracts.checkNotNull(name, "name");
+    this.tags = Contracts.checkNotNull(tags, "tags");
+    this.metrics = Contracts.checkNotNull(metrics, "metrics");
+  }
+
+  public long timestamp() {
+    return timestamp;
+  }
+
+  public String name() {
+    return name;
+  }
+
+  public String context() {
+    // usually the first tag
+    for (MetricsTag t : tags) {
+      if (t.name().equals(CONTEXT_KEY)) {
+        return String.valueOf(t.value());
+      }
+    }
+    return DEFAULT_CONTEXT;
+  }
+
+  public Iterable<MetricsTag> tags() {
+    return tags;
+  }
+
+  public Iterable<Metric> metrics() {
+    return metrics;
+  }
+
+  // Mostly for testing
+  @Override public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    final MetricsRecordImpl other = (MetricsRecordImpl) obj;
+    if (this.timestamp != other.timestamp()) {
+      return false;
+    }
+    if (!this.name.equals(other.name())) {
+      return false;
+    }
+    if (!this.tags.equals(other.tags())) {
+      return false;
+    }
+    if (!this.metrics.equals(other.metrics())) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override public int hashCode() {
+    return name.hashCode();
+  }
+
+  @Override public String toString() {
+    return "MetricsRecordImpl{" + "timestamp=" + timestamp + " name='" + name +
+        "' tags=" + tags + " metrics=" + metrics + "}\n";
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.lib.MetricMutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MetricMutableStat;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.util.Contracts;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsSink;
+
+/**
+ * An adapter class for metrics sink and associated filters
+ */
+class MetricsSinkAdapter {
+
+  private final Log LOG = LogFactory.getLog(MetricsSinkAdapter.class);
+  private final String name, description, context;
+  private final MetricsSink sink;
+  private final MetricsFilter sourceFilter, recordFilter, metricFilter;
+  private final SinkQueue<MetricsBuffer> queue;
+  private final Thread sinkThread;
+  private volatile boolean stopping = false;
+  private volatile boolean inError = false;
+  private final int period, firstRetryDelay, retryCount;
+  private final float retryBackoff;
+  private final MetricsRegistry registry = new MetricsRegistry("sinkadapter");
+  private final MetricMutableStat latency;
+  private final MetricMutableCounterInt dropped;
+  private final MetricMutableGaugeInt qsize;
+
+  private final Consumer<MetricsBuffer> consumer =
+      new Consumer<MetricsBuffer>() {
+        public void consume(MetricsBuffer buffer) {
+          publishMetrics(buffer);
+        }
+      };
+
+  MetricsSinkAdapter(String name, String description, MetricsSink sink,
+                     String context, MetricsFilter sourceFilter,
+                     MetricsFilter recordFilter, MetricsFilter metricFilter,
+                     int period, int queueCapacity, int retryDelay,
+                     float retryBackoff, int retryCount) {
+    this.name = Contracts.checkNotNull(name, "name");
+    this.description = description;
+    this.sink = Contracts.checkNotNull(sink, "sink object");
+    this.context = context;
+    this.sourceFilter = sourceFilter;
+    this.recordFilter = recordFilter;
+    this.metricFilter = metricFilter;
+    this.period = Contracts.checkArg(period, period > 0, "period");
+    firstRetryDelay =
+        Contracts.checkArg(retryDelay, retryDelay > 0, "retry delay");
+    this.retryBackoff =
+        Contracts.checkArg(retryBackoff, retryBackoff > 1, "backoff factor");
+    this.retryCount = retryCount;
+    this.queue = new SinkQueue<MetricsBuffer>(
+        Contracts.checkArg(queueCapacity, queueCapacity > 0, "queue capacity"));
+    latency = registry.newStat(name +"_latency", "End to end latency",
+                               "ops", "time");
+    dropped = registry.newCounter(name +"_dropped", "Dropped updates", 0);
+    qsize = registry.newGauge(name + "_qsize", "Queue size", 0);
+
+    sinkThread = new Thread() {
+      @Override public void run() {
+        publishMetricsFromQueue();
+      }
+    };
+    sinkThread.setName(name);
+  }
+
+  boolean putMetrics(MetricsBuffer buffer, long logicalTime) {
+    if (logicalTime % period == 0) {
+      LOG.debug("enqueue, logicalTime="+ logicalTime);
+      if (queue.enqueue(buffer)) return true;
+      dropped.incr();
+      return false;
+    }
+    return true; // OK
+  }
+
+  void publishMetricsFromQueue() {
+    int retryDelay = firstRetryDelay;
+    int n = retryCount;
+    while (!stopping) {
+      try {
+        queue.consumeAll(consumer);
+        retryDelay = firstRetryDelay;
+        n = retryCount;
+        inError = false;
+      }
+      catch (InterruptedException e) {
+        LOG.info(name +" thread interrupted.");
+      }
+      catch (Exception e) {
+        if (n > 0) {
+          if (!inError) {
+            LOG.error("Got sink exception, retry in "+ retryDelay +"s", e);
+          }
+          retryDelay *= retryBackoff;
+          try { Thread.sleep(retryDelay * 1000); }
+          catch (InterruptedException e2) {
+            LOG.info(name +" thread interrupted while waiting for retry", e2);
+          }
+          --n;
+        }
+        else {
+          if (!inError) {
+            LOG.error("Got sink exception and over retry limit!", e);
+          }
+          queue.clear();
+          inError = true; // Don't keep complaining ad infinitum
+        }
+      }
+    }
+  }
+
+  void publishMetrics(MetricsBuffer buffer) {
+    long ts = 0;
+    for (MetricsBuffer.Entry entry : buffer) {
+      if (sourceFilter == null || sourceFilter.accepts(entry.name())) {
+        for (MetricsRecordImpl record : entry.records()) {
+          if ((context == null || context.equals(record.context())) &&
+              (recordFilter == null || recordFilter.accepts(record))) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Pushing record "+ entry.name() +"."+ record.context() +
+                        "."+ record.name() +" to "+ name);
+            }
+            sink.putMetrics(metricFilter == null
+                ? record
+                : new MetricsRecordFiltered(record, metricFilter));
+            if (ts == 0) ts = record.timestamp();
+          }
+        }
+      }
+    }
+    if (ts > 0) {
+      sink.flush();
+      latency.add(System.currentTimeMillis() - ts);
+    }
+    LOG.debug("Done");
+  }
+
+  void start() {
+    sinkThread.start();
+    LOG.info("Sink "+ name +" started");
+  }
+
+  void stop() {
+    stopping = true;
+    sinkThread.interrupt();
+    try {
+      sinkThread.join();
+    }
+    catch (InterruptedException e) {
+      LOG.warn("Stop interrupted", e);
+    }
+  }
+
+  String name() {
+    return name;
+  }
+
+  String description() {
+    return description;
+  }
+
+  void sample(MetricsRecordBuilder rb, boolean all) {
+    registry.snapshot(rb, all);
+  }
+
+  MetricsSink sink() {
+    return sink;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.HashMap;
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.InvalidAttributeValueException;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsTag;
+import static org.apache.hadoop.metrics2.impl.MetricsConfig.*;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.metrics2.util.Contracts;
+
+/**
+ * An adapter class for metrics source and associated filter and jmx impl
+ */
+class MetricsSourceAdapter implements DynamicMBean {
+
+  private static final Log LOG = LogFactory.getLog(MetricsSourceAdapter.class);
+
+  private final String prefix, name;
+  private final MetricsSource source;
+  private final MetricsFilter recordFilter, metricFilter;
+  private final HashMap<String, Attribute> attrCache;
+  private final MBeanInfoBuilder infoBuilder;
+  private final Iterable<MetricsTag> injectedTags;
+
+  private Iterable<MetricsRecordImpl> lastRecs;
+  private long jmxCacheTS;
+  private int jmxCacheTTL;
+  private MBeanInfo infoCache;
+  private ObjectName mbeanName;
+
+  MetricsSourceAdapter(String prefix, String name, String description,
+                       MetricsSource source, Iterable<MetricsTag> injectedTags,
+                       MetricsFilter recordFilter, MetricsFilter metricFilter,
+                       int jmxCacheTTL) {
+    this.prefix = Contracts.checkNotNull(prefix, "prefix");
+    this.name = Contracts.checkNotNull(name, "name");
+    this.source = Contracts.checkNotNull(source, "source");
+    attrCache = new HashMap<String, Attribute>();
+    infoBuilder = new MBeanInfoBuilder(name, description);
+    this.injectedTags = injectedTags;
+    this.recordFilter = recordFilter;
+    this.metricFilter = metricFilter;
+    this.jmxCacheTTL = Contracts.checkArg(jmxCacheTTL, jmxCacheTTL > 0,
+                                          "jmxCacheTTL");
+  }
+
+  MetricsSourceAdapter(String prefix, String name, String description,
+                       MetricsSource source, Iterable<MetricsTag> injectedTags,
+                       int period, MetricsConfig conf) {
+    this(prefix, name, description, source, injectedTags,
+        (MetricsFilter) conf.getPlugin(RECORD_FILTER_KEY),
+        (MetricsFilter) conf.getPlugin(METRIC_FILTER_KEY), period);
+  }
+
+  void start() {
+    if (mbeanName != null) {
+      LOG.warn("MBean Source "+ name +" already initialized!");
+    }
+    mbeanName = MBeans.register(prefix, name, this);
+    LOG.debug("MBean for source "+ name +" registered.", new Throwable());
+  }
+
+  @Override
+  public synchronized Object getAttribute(String attribute)
+      throws AttributeNotFoundException, MBeanException, ReflectionException {
+    updateJmxCache();
+    Attribute a = attrCache.get(attribute);
+    if (a == null) {
+      throw new AttributeNotFoundException(attribute +" not found");
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(attribute +": "+ a.getName() +"="+ a.getValue());
+    }
+    return a;
+  }
+
+  public void setAttribute(Attribute attribute)
+      throws AttributeNotFoundException, InvalidAttributeValueException,
+             MBeanException, ReflectionException {
+    throw new UnsupportedOperationException("Metrics are read-only.");
+  }
+
+  @Override
+  public synchronized AttributeList getAttributes(String[] attributes) {
+    updateJmxCache();
+    AttributeList ret = new AttributeList();
+    for (String key : attributes) {
+      Attribute attr = attrCache.get(key);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(key +": "+ attr.getName() +"="+ attr.getValue());
+      }
+      ret.add(attr);
+    }
+    return ret;
+  }
+
+  @Override
+  public AttributeList setAttributes(AttributeList attributes) {
+    throw new UnsupportedOperationException("Metrics are read-only.");
+  }
+
+  @Override
+  public Object invoke(String actionName, Object[] params, String[] signature)
+      throws MBeanException, ReflectionException {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public synchronized MBeanInfo getMBeanInfo() {
+    updateJmxCache();
+    return infoCache;
+  }
+
+  private void updateJmxCache() {
+    if (System.currentTimeMillis() - jmxCacheTS >= jmxCacheTTL) {
+      if (lastRecs == null) {
+        MetricsBuilderImpl builder = new MetricsBuilderImpl();
+        getMetrics(builder, true);
+      }
+      int cacheSize = attrCache.size(); // because updateAttrCache changes it!
+      int numMetrics = updateAttrCache();
+      if (cacheSize < numMetrics) {
+        updateInfoCache();
+      }
+      jmxCacheTS = System.currentTimeMillis();
+      lastRecs = null;
+    }
+  }
+
+  Iterable<MetricsRecordImpl> getMetrics(MetricsBuilderImpl builder,
+                                         boolean all) {
+    builder.setRecordFilter(recordFilter).setMetricFilter(metricFilter);
+    synchronized(this) {
+      if (lastRecs == null) {
+        all = true; // Get all the metrics to populate the sink caches
+      }
+    }
+    source.getMetrics(builder, all);
+    for (MetricsRecordBuilderImpl rb : builder) {
+      for (MetricsTag t : injectedTags) {
+        rb.add(t);
+      }
+    }
+    synchronized(this) {
+      lastRecs = builder.getRecords();
+      return lastRecs;
+    }
+  }
+
+  synchronized void stop() {
+    MBeans.unregister(mbeanName);
+    mbeanName = null;
+  }
+
+  synchronized void refreshMBean() {
+    MBeans.unregister(mbeanName);
+    mbeanName = MBeans.register(prefix, name, this);
+  }
+
+  private void updateInfoCache() {
+    LOG.debug("Updating info cache...");
+    infoCache = infoBuilder.reset(lastRecs).get();
+    LOG.debug("Done");
+  }
+
+  private int updateAttrCache() {
+    LOG.debug("Updating attr cache...");
+    int recNo = 0;
+    int numMetrics = 0;
+    for (MetricsRecordImpl record : lastRecs) {
+      for (MetricsTag t : record.tags()) {
+        setAttrCacheTag(t, recNo);
+        ++numMetrics;
+      }
+      for (Metric m : record.metrics()) {
+        setAttrCacheMetric(m, recNo);
+        ++numMetrics;
+      }
+      ++recNo;
+    }
+    LOG.debug("Done. numMetrics="+ numMetrics);
+    return numMetrics;
+  }
+
+  private static String tagName(String name, int recNo) {
+    StringBuilder sb = new StringBuilder(name.length() + 16);
+    sb.append("tag.").append(name);
+    if (recNo > 0) {
+      sb.append('.').append(recNo);
+    }
+    return sb.toString();
+  }
+
+  private void setAttrCacheTag(MetricsTag tag, int recNo) {
+    String key = tagName(tag.name(), recNo);
+    attrCache.put(key, new Attribute(key, tag.value()));
+  }
+
+  private static String metricName(String name, int recNo) {
+    if (recNo == 0) {
+      return name;
+    }
+    StringBuilder sb = new StringBuilder(name.length() + 12);
+    sb.append(name);
+    if (recNo > 0) {
+      sb.append('.').append(recNo);
+    }
+    return sb.toString();
+  }
+
+  private void setAttrCacheMetric(Metric metric, int recNo) {
+    String key = metricName(metric.name(), recNo);
+    attrCache.put(key, new Attribute(key, metric.value()));
+  }
+
+  String name() {
+    return name;
+  }
+
+  MetricsSource source() {
+    return source;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.io.StringWriter;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import javax.management.ObjectName;
+
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math.util.MathUtils;
+
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MetricMutableStat;
+import static org.apache.hadoop.metrics2.impl.MetricsConfig.*;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.metrics2.util.Contracts;
+
+/**
+ * A base class for metrics system singletons
+ */
+public class MetricsSystemImpl implements MetricsSystem {
+
+  private static final Log LOG = LogFactory.getLog(MetricsSystemImpl.class);
+  static final String MS_CONTEXT = "metricssystem";
+  static final String NUM_SOURCES_KEY = "num_sources";
+  static final String NUM_SOURCES_DESC = "Number of metrics sources";
+  static final String NUM_SINKS_KEY = "num_sinks";
+  static final String NUM_SINKS_DESC = "Number of metrics sinks";
+  static final String MS_NAME = "MetricsSystem";
+  static final String MS_STATS_NAME = MS_NAME +",sub=Stats";
+  static final String MS_STATS_DESC = "Metrics system metrics";
+  static final String MS_CONTROL_NAME = MS_NAME +",sub=Control";
+
+  private final Map<String, MetricsSourceAdapter> sources;
+  private final Map<String, MetricsSinkAdapter> sinks;
+  private final List<Callback> callbacks;
+  private final MetricsBuilderImpl metricsBuilder;
+  private final MetricMutableStat sampleStat =
+      new MetricMutableStat("sample", "sampling stats", "ops", "time", true);
+  private final MetricMutableStat publishStat =
+      new MetricMutableStat("publish", "publishing stats", "ops", "time", true);
+  private final MetricMutableCounterLong dropStat =
+      new MetricMutableCounterLong("dropped_pub_all",
+        "number of dropped updates by all sinks", 0L);
+  private final List<MetricsTag> injectedTags;
+
+  // Things that are changed by init()/start()/stop()
+  private String prefix;
+  private MetricsFilter sourceFilter;
+  private MetricsConfig config;
+  private Map<String, MetricsConfig> sourceConfigs, sinkConfigs;
+  private boolean monitoring = false;
+  private Timer timer;
+  private int period; // seconds
+  private long logicalTime; // number of timer invocations * period
+  private ObjectName mbeanName;
+  private boolean publishSelfMetrics = true;
+  private MetricsSourceAdapter sysSource;
+
+  /**
+   * Construct the metrics system
+   * @param prefix  for the system
+   */
+  public MetricsSystemImpl(String prefix) {
+    this.prefix = prefix;
+    sources = new LinkedHashMap<String, MetricsSourceAdapter>();
+    sinks = new LinkedHashMap<String, MetricsSinkAdapter>();
+    sourceConfigs = new HashMap<String, MetricsConfig>();
+    sinkConfigs = new HashMap<String, MetricsConfig>();
+    callbacks = new ArrayList<Callback>();
+    injectedTags = new ArrayList<MetricsTag>();
+    metricsBuilder = new MetricsBuilderImpl();
+    if (prefix != null) {
+      // prefix could be null for default ctor, which requires init later
+      initSystemMBean();
+    }
+  }
+
+  /**
+   * Construct the system but not initializing (read config etc.) it.
+   */
+  public MetricsSystemImpl() {
+    this(null);
+  }
+
+  /**
+   * Initialized the metrics system with a prefix.
+   * @param prefix  the system will look for configs with the prefix
+   */
+  public synchronized void init(String prefix) {
+    if (monitoring) {
+      LOG.warn(this.prefix +" metrics system already initialized!");
+      return;
+    }
+    Contracts.checkState(this.prefix == null, "prefix should be null so far.");
+    this.prefix = Contracts.checkNotNull(prefix, "prefix");
+    try { start(); }
+    catch (MetricsConfigException e) {
+      // Usually because hadoop-metrics2.properties is missing
+      // We can always start the metrics system later via JMX.
+      LOG.warn("Metrics system not started!", e);
+    }
+    initSystemMBean();
+  }
+
+  @Override
+  public synchronized void start() {
+    Contracts.checkNotNull(prefix, "prefix");
+    if (monitoring) {
+      LOG.warn(prefix +" metrics system already started!",
+               new MetricsException("Illegal start"));
+      return;
+    }
+    for (Callback cb : callbacks) cb.preStart();
+    configure(prefix);
+    startTimer();
+    monitoring = true;
+    LOG.info(prefix +" metrics system started");
+    for (Callback cb : callbacks) cb.postStart();
+  }
+
+  @Override
+  public synchronized void stop() {
+    if (!monitoring) {
+      LOG.warn(prefix +" metrics system not yet started!",
+               new MetricsException("Illegal stop"));
+      return;
+    }
+    for (Callback cb : callbacks) cb.preStop();
+    LOG.info("Stopping "+ prefix +" metrics system...");
+    stopTimer();
+    stopSources();
+    stopSinks();
+    clearConfigs();
+    monitoring = false;
+    LOG.info(prefix +" metrics system stopped.");
+    for (Callback cb : callbacks) cb.postStop();
+  }
+
+  @Override
+  public synchronized <T extends MetricsSource> T register(final String name,
+      final String desc, final T source) {
+    if (monitoring) {
+      registerSource(name, desc, source);
+    }
+    // We want to re-register the source to pick up new config when the
+    // metrics system restarts.
+    register(new AbstractCallback() {
+
+      @Override public void postStart() {
+        registerSource(name, desc, source);
+      }
+
+    });
+    LOG.debug("Registered source "+ name);
+    return source;
+  }
+
+  synchronized void registerSource(String name, String desc,
+                                   MetricsSource source) {
+    Contracts.checkNotNull(config, "config");
+    MetricsSourceAdapter sa = sources.get(name);
+    if (sa != null) {
+      LOG.warn("Source name "+name+" already exists!");
+      return;
+    }
+    MetricsConfig conf = sourceConfigs.get(name);
+    sa = conf != null
+        ? new MetricsSourceAdapter(prefix, name, desc, source,
+                                   injectedTags, period, conf)
+        : new MetricsSourceAdapter(prefix, name, desc, source,
+          injectedTags, period, config.subset(SOURCE_KEY));
+    sources.put(name, sa);
+    sa.start();
+  }
+
+  @Override
+  public synchronized <T extends MetricsSink> T register(final String name,
+      final String description, final T sink) {
+    if (config != null) {
+      registerSink(name, description, sink);
+    }
+    // We want to re-register the sink to pick up new config
+    // when the metrics system restarts.
+    register(new AbstractCallback() {
+
+      @Override public void postStart() {
+        registerSink(name, description, sink);
+      }
+
+    });
+    LOG.debug("Registered sink "+ name);
+    return sink;
+  }
+
+  synchronized void registerSink(String name, String desc, MetricsSink sink) {
+    Contracts.checkNotNull(config, "config");
+    MetricsSinkAdapter sa = sinks.get(name);
+    if (sa != null) {
+      LOG.warn("Sink name "+name+" already exists!");
+      return;
+    }
+    MetricsConfig conf = sinkConfigs.get(name);
+    sa = conf != null
+        ? newSink(name, desc, sink, conf)
+        : newSink(name, desc, sink, config.subset(SINK_KEY));
+    sinks.put(name, sa);
+    sa.start();
+  }
+
+  @Override
+  public synchronized void register(final Callback callback) {
+    callbacks.add((Callback) Proxy.newProxyInstance(
+        callback.getClass().getClassLoader(), new Class<?>[] { Callback.class },
+        new InvocationHandler() {
+          public Object invoke(Object proxy, Method method, Object[] args)
+              throws Throwable {
+            try {
+              return method.invoke(callback, args);
+            }
+            catch (Exception e) {
+              LOG.warn("Caught exception in callback "+ method.getName(), e);
+            }
+            return null;
+          }
+        }));
+  }
+
+  @Override
+  public synchronized void refreshMBeans() {
+    for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
+      entry.getValue().refreshMBean();
+    }
+  }
+
+  @Override
+  public synchronized String currentConfig() {
+    PropertiesConfiguration saver = new PropertiesConfiguration();
+    StringWriter writer = new StringWriter();
+    saver.copy(config);
+    try { saver.save(writer); }
+    catch (Exception e) {
+      throw new MetricsConfigException("Error stringify config", e);
+    }
+    return writer.toString();
+  }
+
+  private synchronized void startTimer() {
+    if (timer != null) {
+      LOG.warn(prefix +" metrics system timer already started!");
+      return;
+    }
+    logicalTime = 0;
+    long millis = period * 1000;
+    timer = new Timer("Timer for '"+ prefix +"' metrics system", true);
+    timer.scheduleAtFixedRate(new TimerTask() {
+          public void run() {
+            try {
+              onTimerEvent();
+            }
+            catch (Exception e) {
+              LOG.warn(e);
+            }
+          }
+        }, millis, millis);
+    LOG.info("Scheduled sampling period at "+ period +" second(s).");
+  }
+
+  synchronized void onTimerEvent() {
+    logicalTime += period;
+    if (sinks.size() > 0) {
+      publishMetrics(sampleMetrics());
+    }
+  }
+
+  /**
+   * Sample all the sources for a snapshot of metrics/tags
+   * @return  the metrics buffer containing the snapshot
+   */
+  synchronized MetricsBuffer sampleMetrics() {
+    metricsBuilder.clear();
+    MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder();
+
+    for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
+      if (sourceFilter == null || sourceFilter.accepts(entry.getKey())) {
+        sampleMetrics(entry.getValue(), bufferBuilder);
+      }
+    }
+    if (publishSelfMetrics) {
+      sampleMetrics(sysSource, bufferBuilder);
+    }
+    MetricsBuffer buffer = bufferBuilder.get();
+    return buffer;
+  }
+
+  private void sampleMetrics(MetricsSourceAdapter sa,
+                             MetricsBufferBuilder bufferBuilder) {
+    long startTime = System.currentTimeMillis();
+    bufferBuilder.add(sa.name(), sa.getMetrics(metricsBuilder, false));
+    metricsBuilder.clear();
+    sampleStat.add(System.currentTimeMillis() - startTime);
+    LOG.debug("Sampled source "+ sa.name());
+  }
+
+  /**
+   * Publish a metrics snapshot to all the sinks
+   * @param buffer  the metrics snapshot to publish
+   */
+  synchronized void publishMetrics(MetricsBuffer buffer) {
+    int dropped = 0;
+    for (Entry<String, MetricsSinkAdapter> entry : sinks.entrySet()) {
+      long startTime = System.currentTimeMillis();
+      dropped += entry.getValue().putMetrics(buffer, logicalTime) ? 0 : 1;
+      publishStat.add(System.currentTimeMillis() - startTime);
+    }
+    dropStat.incr(dropped);
+  }
+
+  private synchronized void stopTimer() {
+    if (timer == null) {
+      LOG.warn(prefix +" metrics system timer already stopped!");
+      return;
+    }
+    timer.cancel();
+    timer = null;
+  }
+
+  private synchronized void stopSources() {
+    for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
+      MetricsSourceAdapter sa = entry.getValue();
+      LOG.info("Stopping metrics source "+ entry.getKey() +"("+
+                sa.source().getClass().getName() +")");
+      sa.stop();
+    }
+    sources.clear();
+  }
+
+  private synchronized void stopSinks() {
+    for (Entry<String, MetricsSinkAdapter> entry : sinks.entrySet()) {
+      MetricsSinkAdapter sa = entry.getValue();
+      LOG.info("Stopping metrics sink "+ entry.getKey() +"("+
+               sa.sink().getClass().getName() +")");
+      sa.stop();
+    }
+    sinks.clear();
+  }
+
+  private synchronized void configure(String prefix) {
+    config = MetricsConfig.create(prefix);
+    configureSinks();
+    configureSources();
+    configureSystem();
+  }
+
+  private synchronized void configureSystem() {
+    injectedTags.add(new MetricsTag("hostName", "Local hostname",
+                                    getHostname()));
+  }
+
+  private synchronized void configureSinks() {
+    Map<String, MetricsConfig> confs = config.getInstanceConfigs(SINK_KEY);
+    int confPeriod = 0;
+    for (Entry<String, MetricsConfig> entry : confs.entrySet()) {
+      MetricsConfig conf = entry.getValue();
+      int sinkPeriod = conf.getInt(PERIOD_KEY, PERIOD_DEFAULT);
+      confPeriod = confPeriod == 0 ? sinkPeriod
+                                   : MathUtils.gcd(confPeriod, sinkPeriod);
+      String sinkName = conf.getString(NAME_KEY);
+      if (sinkName != null && !sinkName.isEmpty()) {
+        // named config is for internally registered sinks
+        sinkConfigs.put(sinkName, conf);
+      }
+      else {
+        sinkName = "sink"+ entry.getKey();
+      }
+      try {
+        MetricsSinkAdapter sa = newSink(sinkName,
+            conf.getString(DESC_KEY, sinkName), conf);
+        sa.start();
+        sinks.put(sinkName, sa);
+      }
+      catch (Exception e) {
+        LOG.warn("Error creating "+ sinkName, e);
+      }
+    }
+    period = confPeriod > 0 ? confPeriod
+                            : config.getInt(PERIOD_KEY, PERIOD_DEFAULT);
+  }
+
+  static MetricsSinkAdapter newSink(String name, String desc, MetricsSink sink,
+                                    MetricsConfig conf) {
+    return new MetricsSinkAdapter(name, desc, sink, conf.getString(CONTEXT_KEY),
+        (MetricsFilter) conf.getPlugin(SOURCE_FILTER_KEY),
+        (MetricsFilter) conf.getPlugin(RECORD_FILTER_KEY),
+        (MetricsFilter) conf.getPlugin(METRIC_FILTER_KEY),
+        conf.getInt(PERIOD_KEY, PERIOD_DEFAULT),
+        conf.getInt(QUEUE_CAPACITY_KEY, QUEUE_CAPACITY_DEFAULT),
+        conf.getInt(RETRY_DELAY_KEY, RETRY_DELAY_DEFAULT),
+        conf.getFloat(RETRY_BACKOFF_KEY, RETRY_BACKOFF_DEFAULT),
+        conf.getInt(RETRY_COUNT_KEY, RETRY_COUNT_DEFAULT));
+  }
+
+  static MetricsSinkAdapter newSink(String name, String desc,
+                                    MetricsConfig conf) {
+    return newSink(name, desc, (MetricsSink) conf.getPlugin(""), conf);
+  }
+
+  private void configureSources() {
+    sourceFilter =
+        (MetricsFilter) config.getPlugin(PREFIX_DEFAULT + SOURCE_FILTER_KEY);
+    Map<String, MetricsConfig> confs = config.getInstanceConfigs(SOURCE_KEY);
+    for (Entry<String, MetricsConfig> entry : confs.entrySet()) {
+     sourceConfigs.put(entry.getKey(), entry.getValue());
+    }
+    registerSystemSource();
+  }
+
+  private void clearConfigs() {
+    sinkConfigs.clear();
+    sourceConfigs.clear();
+    injectedTags.clear();
+    config = null;
+  }
+
+  static String getHostname() {
+    try {
+      return InetAddress.getLocalHost().getHostName();
+    }
+    catch (Exception e) {
+      LOG.error("Error getting localhost name. Using 'localhost'...", e);
+    }
+    return "localhost";
+  }
+
+  private void registerSystemSource() {
+    sysSource = new MetricsSourceAdapter(prefix, MS_STATS_NAME, MS_STATS_DESC,
+        new MetricsSource() {
+      @Override
+      public void getMetrics(MetricsBuilder builder, boolean all) {
+        int numSources, numSinks;
+        synchronized(MetricsSystemImpl.this) {
+          numSources = sources.size();
+          numSinks = sinks.size();
+        }
+        MetricsRecordBuilder rb = builder.addRecord(MS_NAME)
+            .setContext(MS_CONTEXT)
+            .addGauge(NUM_SOURCES_KEY, NUM_SOURCES_DESC, numSources)
+            .addGauge(NUM_SINKS_KEY, NUM_SINKS_DESC, numSinks);
+        synchronized(MetricsSystemImpl.this) {
+          for (Entry<String, MetricsSinkAdapter> entry : sinks.entrySet()) {
+            entry.getValue().sample(rb, all);
+          }
+        }
+        sampleStat.snapshot(rb, all);
+        publishStat.snapshot(rb, all);
+        dropStat.snapshot(rb, all);
+      }
+    }, injectedTags, null, null, period);
+    sysSource.start();
+  }
+
+  private void initSystemMBean() {
+    mbeanName = MBeans.register(prefix, MS_CONTROL_NAME, this);
+  }
+
+  @Override
+  public synchronized void shutdown() {
+    if (monitoring) {
+      try { stop(); }
+      catch (Exception e) {
+        LOG.warn("Error stopping the metrics system", e);
+      }
+    }
+    MBeans.unregister(mbeanName);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/SinkQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/SinkQueue.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/SinkQueue.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/SinkQueue.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.ConcurrentModificationException;
+
+/**
+ * A half-blocking (nonblocking for producers, blocking for consumers) queue
+ * for metrics sinks.
+ *
+ * New elements are dropped when the queue is full to preserve "interesting"
+ * elements at the onset of queue filling events
+ */
+class SinkQueue<T> {
+  // A fixed size circular buffer to minimize garbage
+  private final T[] data;
+  private int head; // head position
+  private int tail; // tail position
+  private int size; // number of elements
+  private Thread currentConsumer = null;
+
+  @SuppressWarnings("unchecked")
+  SinkQueue(int capacity) {
+    this.data = (T[]) new Object[Math.max(1, capacity)];
+    head = tail = size = 0;
+  }
+
+  synchronized boolean enqueue(T e) {
+    if (data.length == size) {
+      return false;
+    }
+    ++size;
+    tail = (tail + 1) % data.length;
+    data[tail] = e;
+    notify();
+    return true;
+  }
+
+  /**
+   * Consume one element, will block if queue is empty
+   * Only one consumer at a time is allowed
+   * @param consumer  the consumer callback object
+   */
+  void consume(Consumer<T> consumer) throws InterruptedException {
+    T e = waitForData();
+
+    try {
+      consumer.consume(e);  // can take forever
+      _dequeue();
+    }
+    finally {
+      clearConsumer();
+    }
+  }
+
+  /**
+   * Consume all the elements, will block if queue is empty
+   * @param consumer  the consumer callback object
+   * @throws InterruptedException
+   */
+  void consumeAll(Consumer<T> consumer) throws InterruptedException {
+    waitForData();
+
+    try {
+      for (int i = size(); i-- > 0; ) {
+        consumer.consume(front()); // can take forever
+        _dequeue();
+      }
+    }
+    finally {
+      clearConsumer();
+    }
+  }
+
+  /**
+   * Dequeue one element from head of the queue, will block if queue is empty
+   * @return  the first element
+   * @throws InterruptedException
+   */
+  synchronized T dequeue() throws InterruptedException {
+    checkConsumer();
+
+    while (0 == size) {
+      wait();
+    }
+    return _dequeue();
+  }
+
+  private synchronized T waitForData() throws InterruptedException {
+    checkConsumer();
+
+    while (0 == size) {
+      wait();
+    }
+    currentConsumer = Thread.currentThread();
+    return front();
+  }
+
+  private synchronized void checkConsumer() {
+    if (currentConsumer != null) {
+      throw new ConcurrentModificationException("The "+
+          currentConsumer.getName() +" thread is consuming the queue.");
+    }
+  }
+
+  private synchronized void clearConsumer() {
+    currentConsumer = null;
+  }
+
+  private synchronized T _dequeue() {
+    if (0 == size) {
+      throw new IllegalStateException("Size must > 0 here.");
+    }
+    --size;
+    head = (head + 1) % data.length;
+    T ret = data[head];
+    data[head] = null;  // hint to gc
+    return ret;
+  }
+
+  synchronized T front() {
+    return data[(head + 1) % data.length];
+  }
+
+  synchronized T back() {
+    return data[tail];
+  }
+
+  synchronized void clear() {
+    checkConsumer();
+
+    for (int i = data.length; i-- > 0; ) {
+      data[i] = null;
+    }
+    size = 0;
+  }
+
+  synchronized int size() {
+    return size;
+  }
+
+  int capacity() {
+    return data.length;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/lib/AbstractMetricsSource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/lib/AbstractMetricsSource.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/lib/AbstractMetricsSource.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/lib/AbstractMetricsSource.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.lib;
+
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+
+/**
+ * A convenient base class for writing metrics sources
+ */
+public abstract class AbstractMetricsSource implements MetricsSource {
+
+  protected final MetricsRegistry registry;
+
+  /**
+   * Construct the source with name and a mutable metrics factory
+   * @param name  of the default record
+   * @param mf  the factory to create mutable metrics
+   */
+  public AbstractMetricsSource(String name, MetricMutableFactory mf) {
+    registry = new MetricsRegistry(name, mf);
+  }
+
+  /**
+   * Construct the source with a name with a default factory
+   * @param name  of the default record
+   */
+  public AbstractMetricsSource(String name) {
+    this(name, new MetricMutableFactory());
+  }
+
+  /**
+   * @return  the registry for mutable metrics
+   */
+  public MetricsRegistry registry() {
+    return registry;
+  }
+
+  @Override
+  public void getMetrics(MetricsBuilder builder, boolean all) {
+    registry.snapshot(builder.addRecord(registry.name()), all);
+  }
+
+}
\ No newline at end of file