You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2011/05/06 09:28:46 UTC

svn commit: r1100113 [2/5] - in /hadoop/common/trunk: ./ conf/ ivy/ src/java/org/apache/hadoop/metrics/file/ src/java/org/apache/hadoop/metrics2/ src/java/org/apache/hadoop/metrics2/annotation/ src/java/org/apache/hadoop/metrics2/filter/ src/java/org/a...

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java Fri May  6 07:28:43 2011
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricType;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+class MetricGaugeLong extends AbstractMetric {
+  final long value;
+
+  MetricGaugeLong(MetricsInfo info, long value) {
+    super(info);
+    this.value = value;
+  }
+
+  @Override
+  public Long value() {
+    return value;
+  }
+
+  @Override
+  public MetricType type() {
+    return MetricType.GAUGE;
+  }
+
+  @Override
+  public void visit(MetricsVisitor visitor) {
+    visitor.gauge(this, value);
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBuffer.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBuffer.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBuffer.java Fri May  6 07:28:43 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.Iterator;
+
+/**
+ * An immutable element for the sink queues.
+ */
+class MetricsBuffer implements Iterable<MetricsBuffer.Entry> {
+
+  private final Iterable<Entry> mutable;
+
+  MetricsBuffer(Iterable<MetricsBuffer.Entry> mutable) {
+    this.mutable = mutable;
+  }
+
+  @Override
+  public Iterator<Entry> iterator() {
+    return mutable.iterator();
+  }
+
+  static class Entry {
+    private final String sourceName;
+    private final Iterable<MetricsRecordImpl> records;
+
+    Entry(String name, Iterable<MetricsRecordImpl> records) {
+      sourceName = name;
+      this.records = records;
+    }
+
+    String name() {
+      return sourceName;
+    }
+
+    Iterable<MetricsRecordImpl> records() {
+      return records;
+    }
+  }
+
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java Fri May  6 07:28:43 2011
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.ArrayList;
+
+/**
+ * Builder for the immutable metrics buffers
+ */
+class MetricsBufferBuilder extends ArrayList<MetricsBuffer.Entry> {
+  private static final long serialVersionUID = 1L;
+
+  boolean add(String name, Iterable<MetricsRecordImpl> records) {
+    return add(new MetricsBuffer.Entry(name, records));
+  }
+
+  MetricsBuffer get() {
+    return new MetricsBuffer(this);
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java Fri May  6 07:28:43 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import static org.apache.hadoop.metrics2.lib.Interns.*;
+
+class MetricsCollectorImpl implements MetricsCollector,
+    Iterable<MetricsRecordBuilderImpl> {
+
+  private final List<MetricsRecordBuilderImpl> rbs = Lists.newArrayList();
+  private MetricsFilter recordFilter, metricFilter;
+
+  @Override
+  public MetricsRecordBuilderImpl addRecord(MetricsInfo info) {
+    boolean acceptable = recordFilter == null ||
+                         recordFilter.accepts(info.name());
+    MetricsRecordBuilderImpl rb = new MetricsRecordBuilderImpl(this, info,
+        recordFilter, metricFilter, acceptable);
+    if (acceptable) rbs.add(rb);
+    return rb;
+  }
+
+  @Override
+  public MetricsRecordBuilderImpl addRecord(String name) {
+    return addRecord(info(name, name +" record"));
+  }
+
+  public List<MetricsRecordImpl> getRecords() {
+    List<MetricsRecordImpl> recs = Lists.newArrayListWithCapacity(rbs.size());
+    for (MetricsRecordBuilderImpl rb : rbs) {
+      MetricsRecordImpl mr = rb.getRecord();
+      if (mr != null) {
+        recs.add(mr);
+      }
+    }
+    return recs;
+  }
+
+  @Override
+  public Iterator<MetricsRecordBuilderImpl> iterator() {
+    return rbs.iterator();
+  }
+
+  void clear() { rbs.clear(); }
+
+  MetricsCollectorImpl setRecordFilter(MetricsFilter rf) {
+    recordFilter = rf;
+    return this;
+  }
+
+  MetricsCollectorImpl setMetricFilter(MetricsFilter mf) {
+    metricFilter = mf;
+    return this;
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfig.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfig.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfig.java Fri May  6 07:28:43 2011
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import static java.security.AccessController.*;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsPlugin;
+import org.apache.hadoop.metrics2.filter.GlobFilter;
+
+/**
+ * Metrics configuration for MetricsSystemImpl
+ */
+class MetricsConfig extends SubsetConfiguration {
+
+  static final Log LOG = LogFactory.getLog(MetricsConfig.class);
+
+  static final String DEFAULT_FILE_NAME = "hadoop-metrics2.properties";
+  static final String PREFIX_DEFAULT = "*.";
+
+  static final String PERIOD_KEY = "period";
+  static final int PERIOD_DEFAULT = 10; // seconds
+
+  static final String QUEUE_CAPACITY_KEY = "queue.capacity";
+  static final int QUEUE_CAPACITY_DEFAULT = 1;
+
+  static final String RETRY_DELAY_KEY = "retry.delay";
+  static final int RETRY_DELAY_DEFAULT = 10;  // seconds
+  static final String RETRY_BACKOFF_KEY = "retry.backoff";
+  static final int RETRY_BACKOFF_DEFAULT = 2; // back off factor
+  static final String RETRY_COUNT_KEY = "retry.count";
+  static final int RETRY_COUNT_DEFAULT = 1;
+
+  static final String JMX_CACHE_TTL_KEY = "jmx.cache.ttl";
+  static final String START_MBEANS_KEY = "source.start_mbeans";
+  static final String PLUGIN_URLS_KEY = "plugin.urls";
+
+  static final String CONTEXT_KEY = "context";
+  static final String NAME_KEY = "name";
+  static final String DESC_KEY = "description";
+  static final String SOURCE_KEY = "source";
+  static final String SINK_KEY = "sink";
+  static final String METRIC_FILTER_KEY = "metric.filter";
+  static final String RECORD_FILTER_KEY = "record.filter";
+  static final String SOURCE_FILTER_KEY = "source.filter";
+
+  static final Pattern INSTANCE_REGEX = Pattern.compile("([^.*]+)\\..+");
+  static final Splitter SPLITTER = Splitter.on(',').trimResults();
+  private ClassLoader pluginLoader;
+
+  MetricsConfig(Configuration c, String prefix) {
+    super(c, prefix.toLowerCase(Locale.US), ".");
+  }
+
+  static MetricsConfig create(String prefix) {
+    return loadFirst(prefix, "hadoop-metrics2-"+ prefix.toLowerCase(Locale.US)
+                     +".properties", DEFAULT_FILE_NAME);
+  }
+
+  static MetricsConfig create(String prefix, String... fileNames) {
+    return loadFirst(prefix, fileNames);
+  }
+
+  /**
+   * Load configuration from a list of files until the first successful load
+   * @param conf  the configuration object
+   * @param files the list of filenames to try
+   * @return  the configuration object
+   */
+  static MetricsConfig loadFirst(String prefix, String... fileNames) {
+    for (String fname : fileNames) {
+      try {
+        Configuration cf = new PropertiesConfiguration(fname)
+            .interpolatedConfiguration();
+        LOG.info("loaded properties from "+ fname);
+        LOG.debug(toString(cf));
+        MetricsConfig mc = new MetricsConfig(cf, prefix);
+        LOG.debug(mc);
+        return mc;
+      }
+      catch (ConfigurationException e) {
+        if (e.getMessage().startsWith("Cannot locate configuration")) {
+          continue;
+        }
+        throw new MetricsConfigException(e);
+      }
+    }
+    throw new MetricsConfigException("Cannot locate configuration: tried "+
+                                     Joiner.on(",").join(fileNames));
+  }
+
+  @Override
+  public MetricsConfig subset(String prefix) {
+    return new MetricsConfig(this, prefix);
+  }
+
+  /**
+   * Return sub configs for instance specified in the config.
+   * Assuming format specified as follows:<pre>
+   * [type].[instance].[option] = [value]</pre>
+   * Note, '*' is a special default instance, which is excluded in the result.
+   * @param type  of the instance
+   * @return  a map with [instance] as key and config object as value
+   */
+  Map<String, MetricsConfig> getInstanceConfigs(String type) {
+    Map<String, MetricsConfig> map = Maps.newHashMap();
+    MetricsConfig sub = subset(type);
+
+    for (String key : sub.keys()) {
+      Matcher matcher = INSTANCE_REGEX.matcher(key);
+      if (matcher.matches()) {
+        String instance = matcher.group(1);
+        if (!map.containsKey(instance)) {
+          map.put(instance, sub.subset(instance));
+        }
+      }
+    }
+    return map;
+  }
+
+  Iterable<String> keys() {
+    return new Iterable<String>() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public Iterator<String> iterator() {
+        return (Iterator<String>) getKeys();
+      }
+    };
+  }
+
+  /**
+   * Will poke parents for defaults
+   * @param key to lookup
+   * @return  the value or null
+   */
+  @Override
+  public Object getProperty(String key) {
+    Object value = super.getProperty(key);
+    if (value == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("poking parent '"+ getParent().getClass().getSimpleName() +
+                  "' for key: "+ key);
+      }
+      return getParent().getProperty(key.startsWith(PREFIX_DEFAULT) ? key
+                                     : PREFIX_DEFAULT + key);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("returning '"+ value +"' for key: "+ key);
+    }
+    return value;
+  }
+
+  <T extends MetricsPlugin> T getPlugin(String name) {
+    String clsName = getClassName(name);
+    if (clsName == null) return null;
+    try {
+      Class<?> cls = Class.forName(clsName, true, getPluginLoader());
+      @SuppressWarnings("unchecked")
+      T plugin = (T) cls.newInstance();
+      plugin.init(name.isEmpty() ? this : subset(name));
+      return plugin;
+    }
+    catch (Exception e) {
+      throw new MetricsConfigException("Error creating plugin: "+ clsName, e);
+    }
+  }
+
+  String getClassName(String prefix) {
+    String classKey = prefix.isEmpty() ? "class" : prefix +".class";
+    String clsName = getString(classKey);
+    LOG.debug(clsName);
+    if (clsName == null || clsName.isEmpty()) {
+      return null;
+    }
+    return clsName;
+  }
+
+  ClassLoader getPluginLoader() {
+    if (pluginLoader != null) return pluginLoader;
+    final ClassLoader defaultLoader = getClass().getClassLoader();
+    Object purls = super.getProperty(PLUGIN_URLS_KEY);
+    if (purls == null) return defaultLoader;
+    Iterable<String> jars = SPLITTER.split((String) purls);
+    int len = Iterables.size(jars);
+    if ( len > 0) {
+      final URL[] urls = new URL[len];
+      try {
+        int i = 0;
+        for (String jar : jars) {
+          LOG.debug(jar);
+          urls[i++] = new URL(jar);
+        }
+      }
+      catch (Exception e) {
+        throw new MetricsConfigException(e);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("using plugin jars: "+ Iterables.toString(jars));
+      }
+      pluginLoader = doPrivileged(new PrivilegedAction<ClassLoader>() {
+        @Override public ClassLoader run() {
+          return new URLClassLoader(urls, defaultLoader);
+        }
+      });
+      return pluginLoader;
+    }
+    if (parent instanceof MetricsConfig) {
+      return ((MetricsConfig) parent).getPluginLoader();
+    }
+    return defaultLoader;
+  }
+
+  @Override public void clear() {
+    super.clear();
+    // pluginLoader.close(); // jdk7 is saner
+  }
+
+  MetricsFilter getFilter(String prefix) {
+    // don't create filter instances without out options
+    MetricsConfig conf = subset(prefix);
+    if (conf.isEmpty()) return null;
+    MetricsFilter filter = getPlugin(prefix);
+    if (filter != null) return filter;
+    // glob filter is assumed if pattern is specified but class is not.
+    filter = new GlobFilter();
+    filter.init(conf);
+    return filter;
+  }
+
+  @Override
+  public String toString() {
+    return toString(this);
+  }
+
+  static String toString(Configuration c) {
+    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+    PrintStream ps = new PrintStream(buffer);
+    PropertiesConfiguration tmp = new PropertiesConfiguration();
+    tmp.copy(c);
+    try { tmp.save(ps); }
+    catch (Exception e) {
+      throw new MetricsConfigException(e);
+    }
+    return buffer.toString();
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfigException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfigException.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfigException.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfigException.java Fri May  6 07:28:43 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricsException;
+
+/**
+ *  The metrics configuration runtime exception
+ */
+class MetricsConfigException extends MetricsException {
+  private static final long serialVersionUID = 1L;
+
+  MetricsConfigException(String message) {
+    super(message);
+  }
+
+  MetricsConfigException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  MetricsConfigException(Throwable cause) {
+    super(cause);
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java Fri May  6 07:28:43 2011
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.lib.Interns;
+
+class MetricsRecordBuilderImpl extends MetricsRecordBuilder {
+  private final MetricsCollector parent;
+  private final long timestamp;
+  private final MetricsInfo recInfo;
+  private final List<AbstractMetric> metrics;
+  private final List<MetricsTag> tags;
+  private final MetricsFilter recordFilter, metricFilter;
+  private final boolean acceptable;
+
+  MetricsRecordBuilderImpl(MetricsCollector parent, MetricsInfo info,
+                           MetricsFilter rf, MetricsFilter mf,
+                           boolean acceptable) {
+    this.parent = parent;
+    timestamp = System.currentTimeMillis();
+    recInfo = info;
+    metrics = Lists.newArrayList();
+    tags = Lists.newArrayList();
+    recordFilter = rf;
+    metricFilter = mf;
+    this.acceptable = acceptable;
+  }
+
+  @Override
+  public MetricsCollector parent() { return parent; }
+
+  @Override
+  public MetricsRecordBuilderImpl tag(MetricsInfo info, String value) {
+    if (acceptable) {
+      tags.add(Interns.tag(info, value));
+    }
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilderImpl add(MetricsTag tag) {
+    tags.add(tag);
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilderImpl add(AbstractMetric metric) {
+    metrics.add(metric);
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilderImpl addCounter(MetricsInfo info, int value) {
+    if (acceptable && (metricFilter == null ||
+        metricFilter.accepts(info.name()))) {
+      metrics.add(new MetricCounterInt(info, value));
+    }
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilderImpl addCounter(MetricsInfo info, long value) {
+    if (acceptable && (metricFilter == null ||
+        metricFilter.accepts(info.name()))) {
+      metrics.add(new MetricCounterLong(info, value));
+    }
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilderImpl addGauge(MetricsInfo info, int value) {
+    if (acceptable && (metricFilter == null ||
+        metricFilter.accepts(info.name()))) {
+      metrics.add(new MetricGaugeInt(info, value));
+    }
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilderImpl addGauge(MetricsInfo info, long value) {
+    if (acceptable && (metricFilter == null ||
+        metricFilter.accepts(info.name()))) {
+      metrics.add(new MetricGaugeLong(info, value));
+    }
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilderImpl addGauge(MetricsInfo info, float value) {
+    if (acceptable && (metricFilter == null ||
+        metricFilter.accepts(info.name()))) {
+      metrics.add(new MetricGaugeFloat(info, value));
+    }
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilderImpl addGauge(MetricsInfo info, double value) {
+    if (acceptable && (metricFilter == null ||
+        metricFilter.accepts(info.name()))) {
+      metrics.add(new MetricGaugeDouble(info, value));
+    }
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilderImpl setContext(String value) {
+    return tag(MsInfo.Context, value);
+  }
+
+  public MetricsRecordImpl getRecord() {
+    if (acceptable && (recordFilter == null || recordFilter.accepts(tags))) {
+      return new MetricsRecordImpl(recInfo, timestamp, tags(), metrics());
+    }
+    return null;
+  }
+
+  List<MetricsTag> tags() {
+    return Collections.unmodifiableList(tags);
+  }
+
+  List<AbstractMetric> metrics() {
+    return Collections.unmodifiableList(metrics);
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java Fri May  6 07:28:43 2011
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.Iterator;
+import java.util.Collection;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+
+class MetricsRecordFiltered extends AbstractMetricsRecord {
+  private final MetricsRecord delegate;
+  private final MetricsFilter filter;
+
+  MetricsRecordFiltered(MetricsRecord delegate, MetricsFilter filter) {
+    this.delegate = delegate;
+    this.filter = filter;
+  }
+
+  @Override public long timestamp() {
+    return delegate.timestamp();
+  }
+
+  @Override public String name() {
+    return delegate.name();
+  }
+
+  @Override public String description() {
+    return delegate.description();
+  }
+
+  @Override public String context() {
+    return delegate.context();
+  }
+
+  @Override public Collection<MetricsTag> tags() {
+    return delegate.tags();
+  }
+
+  @Override public Iterable<AbstractMetric> metrics() {
+    return new Iterable<AbstractMetric>() {
+      final Iterator<AbstractMetric> it = delegate.metrics().iterator();
+      @Override public Iterator<AbstractMetric> iterator() {
+        return new AbstractIterator<AbstractMetric>() {
+          @Override public AbstractMetric computeNext() {
+            while (it.hasNext()) {
+              AbstractMetric next = it.next();
+              if (filter.accepts(next.name())) {
+                return next;
+              }
+            }
+            return endOfData();
+          }
+        };
+      }
+    };
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java Fri May  6 07:28:43 2011
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.*;
+
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsTag;
+import static org.apache.hadoop.metrics2.util.Contracts.*;
+
+class MetricsRecordImpl extends AbstractMetricsRecord {
+  protected static final String DEFAULT_CONTEXT = "default";
+
+  private final long timestamp;
+  private final MetricsInfo info;
+  private final List<MetricsTag> tags;
+  private final Iterable<AbstractMetric> metrics;
+
+  /**
+   * Construct a metrics record
+   * @param info  {@link MetricInfo} of the record
+   * @param timestamp of the record
+   * @param tags  of the record
+   * @param metrics of the record
+   */
+  public MetricsRecordImpl(MetricsInfo info, long timestamp,
+                           List<MetricsTag> tags,
+                           Iterable<AbstractMetric> metrics) {
+    this.timestamp = checkArg(timestamp, timestamp > 0, "timestamp");
+    this.info = checkNotNull(info, "info");
+    this.tags = checkNotNull(tags, "tags");
+    this.metrics = checkNotNull(metrics, "metrics");
+  }
+
+  @Override public long timestamp() {
+    return timestamp;
+  }
+
+  @Override public String name() {
+    return info.name();
+  }
+
+  MetricsInfo info() {
+    return info;
+  }
+
+  @Override public String description() {
+    return info.description();
+  }
+
+  @Override public String context() {
+    // usually the first tag
+    for (MetricsTag t : tags) {
+      if (t.info() == MsInfo.Context) {
+        return t.value();
+      }
+    }
+    return DEFAULT_CONTEXT;
+  }
+
+  @Override
+  public List<MetricsTag> tags() {
+    return tags; // already unmodifiable from MetricsRecordBuilderImpl#tags
+  }
+
+  @Override public Iterable<AbstractMetric> metrics() {
+    return metrics;
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java Fri May  6 07:28:43 2011
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.Random;
+
+import static com.google.common.base.Preconditions.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableStat;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import static org.apache.hadoop.metrics2.util.Contracts.*;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsSink;
+
+/**
+ * An adapter class for metrics sink and associated filters
+ */
+class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
+
+  private final Log LOG = LogFactory.getLog(MetricsSinkAdapter.class);
+  private final String name, description, context;
+  private final MetricsSink sink;
+  private final MetricsFilter sourceFilter, recordFilter, metricFilter;
+  private final SinkQueue<MetricsBuffer> queue;
+  private final Thread sinkThread;
+  private volatile boolean stopping = false;
+  private volatile boolean inError = false;
+  private final int period, firstRetryDelay, retryCount;
+  private final float retryBackoff;
+  private final MetricsRegistry registry = new MetricsRegistry("sinkadapter");
+  private final MutableStat latency;
+  private final MutableCounterInt dropped;
+  private final MutableGaugeInt qsize;
+
+  MetricsSinkAdapter(String name, String description, MetricsSink sink,
+                     String context, MetricsFilter sourceFilter,
+                     MetricsFilter recordFilter, MetricsFilter metricFilter,
+                     int period, int queueCapacity, int retryDelay,
+                     float retryBackoff, int retryCount) {
+    this.name = checkNotNull(name, "name");
+    this.description = description;
+    this.sink = checkNotNull(sink, "sink object");
+    this.context = context;
+    this.sourceFilter = sourceFilter;
+    this.recordFilter = recordFilter;
+    this.metricFilter = metricFilter;
+    this.period = checkArg(period, period > 0, "period");
+    firstRetryDelay = checkArg(retryDelay, retryDelay > 0, "retry delay");
+    this.retryBackoff = checkArg(retryBackoff, retryBackoff>1, "retry backoff");
+    this.retryCount = retryCount;
+    this.queue = new SinkQueue<MetricsBuffer>(checkArg(queueCapacity,
+        queueCapacity > 0, "queue capacity"));
+    latency = registry.newRate("Sink_"+ name, "Sink end to end latency", false);
+    dropped = registry.newCounter("Sink_"+ name +"Dropped",
+                                  "Dropped updates per sink", 0);
+    qsize = registry.newGauge("Sink_"+ name + "Qsize", "Queue size", 0);
+
+    sinkThread = new Thread() {
+      @Override public void run() {
+        publishMetricsFromQueue();
+      }
+    };
+    sinkThread.setName(name);
+    sinkThread.setDaemon(true);
+  }
+
+  boolean putMetrics(MetricsBuffer buffer, long logicalTime) {
+    if (logicalTime % period == 0) {
+      LOG.debug("enqueue, logicalTime="+ logicalTime);
+      if (queue.enqueue(buffer)) return true;
+      dropped.incr();
+      return false;
+    }
+    return true; // OK
+  }
+
+  void publishMetricsFromQueue() {
+    int retryDelay = firstRetryDelay;
+    int n = retryCount;
+    int minDelay = Math.min(500, retryDelay * 1000); // millis
+    Random rng = new Random(System.nanoTime());
+    while (!stopping) {
+      try {
+        queue.consumeAll(this);
+        retryDelay = firstRetryDelay;
+        n = retryCount;
+        inError = false;
+      }
+      catch (InterruptedException e) {
+        LOG.info(name +" thread interrupted.");
+      }
+      catch (Exception e) {
+        if (n > 0) {
+          int retryWindow = Math.max(0, 1000 / 2 * retryDelay - minDelay);
+          int awhile = rng.nextInt(retryWindow) + minDelay;
+          if (!inError) {
+            LOG.error("Got sink exception, retry in "+ awhile +"ms", e);
+          }
+          retryDelay *= retryBackoff;
+          try { Thread.sleep(awhile); }
+          catch (InterruptedException e2) {
+            LOG.info(name +" thread interrupted while waiting for retry", e2);
+          }
+          --n;
+        }
+        else {
+          if (!inError) {
+            LOG.error("Got sink exception and over retry limit, "+
+                      "suppressing further error messages", e);
+          }
+          queue.clear();
+          inError = true; // Don't keep complaining ad infinitum
+        }
+      }
+    }
+  }
+
+  @Override
+  public void consume(MetricsBuffer buffer) {
+    long ts = 0;
+    for (MetricsBuffer.Entry entry : buffer) {
+      if (sourceFilter == null || sourceFilter.accepts(entry.name())) {
+        for (MetricsRecordImpl record : entry.records()) {
+          if ((context == null || context.equals(record.context())) &&
+              (recordFilter == null || recordFilter.accepts(record))) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Pushing record "+ entry.name() +"."+ record.context() +
+                        "."+ record.name() +" to "+ name);
+            }
+            sink.putMetrics(metricFilter == null
+                ? record
+                : new MetricsRecordFiltered(record, metricFilter));
+            if (ts == 0) ts = record.timestamp();
+          }
+        }
+      }
+    }
+    if (ts > 0) {
+      sink.flush();
+      latency.add(System.currentTimeMillis() - ts);
+    }
+    LOG.debug("Done");
+  }
+
+  void start() {
+    sinkThread.start();
+    LOG.info("Sink "+ name +" started");
+  }
+
+  void stop() {
+    stopping = true;
+    sinkThread.interrupt();
+    try {
+      sinkThread.join();
+    }
+    catch (InterruptedException e) {
+      LOG.warn("Stop interrupted", e);
+    }
+  }
+
+  String name() {
+    return name;
+  }
+
+  String description() {
+    return description;
+  }
+
+  void snapshot(MetricsRecordBuilder rb, boolean all) {
+    registry.snapshot(rb, all);
+  }
+
+  MetricsSink sink() {
+    return sink;
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java Fri May  6 07:28:43 2011
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.HashMap;
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.InvalidAttributeValueException;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import static com.google.common.base.Preconditions.*;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsTag;
+import static org.apache.hadoop.metrics2.impl.MetricsConfig.*;
+import org.apache.hadoop.metrics2.util.MBeans;
+import static org.apache.hadoop.metrics2.util.Contracts.*;
+
+/**
+ * An adapter class for metrics source and associated filter and jmx impl
+ */
+class MetricsSourceAdapter implements DynamicMBean {
+
+  private static final Log LOG = LogFactory.getLog(MetricsSourceAdapter.class);
+
+  private final String prefix, name;
+  private final MetricsSource source;
+  private final MetricsFilter recordFilter, metricFilter;
+  private final HashMap<String, Attribute> attrCache;
+  private final MBeanInfoBuilder infoBuilder;
+  private final Iterable<MetricsTag> injectedTags;
+
+  private Iterable<MetricsRecordImpl> lastRecs;
+  private long jmxCacheTS = 0;
+  private int jmxCacheTTL;
+  private MBeanInfo infoCache;
+  private ObjectName mbeanName;
+  private final boolean startMBeans;
+
+  MetricsSourceAdapter(String prefix, String name, String description,
+                       MetricsSource source, Iterable<MetricsTag> injectedTags,
+                       MetricsFilter recordFilter, MetricsFilter metricFilter,
+                       int jmxCacheTTL, boolean startMBeans) {
+    this.prefix = checkNotNull(prefix, "prefix");
+    this.name = checkNotNull(name, "name");
+    this.source = checkNotNull(source, "source");
+    attrCache = Maps.newHashMap();
+    infoBuilder = new MBeanInfoBuilder(name, description);
+    this.injectedTags = injectedTags;
+    this.recordFilter = recordFilter;
+    this.metricFilter = metricFilter;
+    this.jmxCacheTTL = checkArg(jmxCacheTTL, jmxCacheTTL > 0, "jmxCacheTTL");
+    this.startMBeans = startMBeans;
+  }
+
+  MetricsSourceAdapter(String prefix, String name, String description,
+                       MetricsSource source, Iterable<MetricsTag> injectedTags,
+                       int period, MetricsConfig conf) {
+    this(prefix, name, description, source, injectedTags,
+         conf.getFilter(RECORD_FILTER_KEY),
+         conf.getFilter(METRIC_FILTER_KEY),
+         period + 1, // hack to avoid most of the "innocuous" races.
+         conf.getBoolean(START_MBEANS_KEY, true));
+  }
+
+  void start() {
+    if (startMBeans) startMBeans();
+  }
+
+  @Override
+  public synchronized Object getAttribute(String attribute)
+      throws AttributeNotFoundException, MBeanException, ReflectionException {
+    updateJmxCache();
+    Attribute a = attrCache.get(attribute);
+    if (a == null) {
+      throw new AttributeNotFoundException(attribute +" not found");
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(attribute +": "+ a);
+    }
+    return a.getValue();
+  }
+
+  @Override
+  public void setAttribute(Attribute attribute)
+      throws AttributeNotFoundException, InvalidAttributeValueException,
+             MBeanException, ReflectionException {
+    throw new UnsupportedOperationException("Metrics are read-only.");
+  }
+
+  @Override
+  public synchronized AttributeList getAttributes(String[] attributes) {
+    updateJmxCache();
+    AttributeList ret = new AttributeList();
+    for (String key : attributes) {
+      Attribute attr = attrCache.get(key);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(key +": "+ attr);
+      }
+      ret.add(attr);
+    }
+    return ret;
+  }
+
+  @Override
+  public AttributeList setAttributes(AttributeList attributes) {
+    throw new UnsupportedOperationException("Metrics are read-only.");
+  }
+
+  @Override
+  public Object invoke(String actionName, Object[] params, String[] signature)
+      throws MBeanException, ReflectionException {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public synchronized MBeanInfo getMBeanInfo() {
+    updateJmxCache();
+    return infoCache;
+  }
+
+  private synchronized void updateJmxCache() {
+    if (System.currentTimeMillis() - jmxCacheTS >= jmxCacheTTL) {
+      if (lastRecs == null) {
+        MetricsCollectorImpl builder = new MetricsCollectorImpl();
+        getMetrics(builder, true);
+      }
+      int oldCacheSize = attrCache.size();
+      int newCacheSize = updateAttrCache();
+      if (oldCacheSize < newCacheSize) {
+        updateInfoCache();
+      }
+      jmxCacheTS = System.currentTimeMillis();
+      lastRecs = null;  // in case regular interval update is not running
+    }
+  }
+
+  Iterable<MetricsRecordImpl> getMetrics(MetricsCollectorImpl builder,
+                                         boolean all) {
+    builder.setRecordFilter(recordFilter).setMetricFilter(metricFilter);
+    synchronized(this) {
+      if (lastRecs == null && jmxCacheTS == 0) {
+        all = true; // Get all the metrics to populate the sink caches
+      }
+    }
+    try {
+      source.getMetrics(builder, all);
+    }
+    catch (Exception e) {
+      LOG.error("Error getting metrics from source "+ name, e);
+    }
+    for (MetricsRecordBuilderImpl rb : builder) {
+      for (MetricsTag t : injectedTags) {
+        rb.add(t);
+      }
+    }
+    synchronized(this) {
+      lastRecs = builder.getRecords();
+      return lastRecs;
+    }
+  }
+
+  synchronized void stop() {
+    stopMBeans();
+  }
+
+  synchronized void startMBeans() {
+    if (mbeanName != null) {
+      LOG.warn("MBean "+ name +" already initialized!");
+      LOG.debug("Stacktrace: ", new Throwable());
+      return;
+    }
+    mbeanName = MBeans.register(prefix, name, this);
+    LOG.debug("MBean for source "+ name +" registered.");
+  }
+
+  synchronized void stopMBeans() {
+    if (mbeanName != null) {
+      MBeans.unregister(mbeanName);
+      mbeanName = null;
+    }
+  }
+
+  private void updateInfoCache() {
+    LOG.debug("Updating info cache...");
+    infoCache = infoBuilder.reset(lastRecs).get();
+    LOG.debug("Done");
+  }
+
+  private int updateAttrCache() {
+    LOG.debug("Updating attr cache...");
+    int recNo = 0;
+    int numMetrics = 0;
+    for (MetricsRecordImpl record : lastRecs) {
+      for (MetricsTag t : record.tags()) {
+        setAttrCacheTag(t, recNo);
+        ++numMetrics;
+      }
+      for (AbstractMetric m : record.metrics()) {
+        setAttrCacheMetric(m, recNo);
+        ++numMetrics;
+      }
+      ++recNo;
+    }
+    LOG.debug("Done. # tags & metrics="+ numMetrics);
+    return numMetrics;
+  }
+
+  private static String tagName(String name, int recNo) {
+    StringBuilder sb = new StringBuilder(name.length() + 16);
+    sb.append("tag.").append(name);
+    if (recNo > 0) {
+      sb.append('.').append(recNo);
+    }
+    return sb.toString();
+  }
+
+  private void setAttrCacheTag(MetricsTag tag, int recNo) {
+    String key = tagName(tag.name(), recNo);
+    attrCache.put(key, new Attribute(key, tag.value()));
+  }
+
+  private static String metricName(String name, int recNo) {
+    if (recNo == 0) {
+      return name;
+    }
+    StringBuilder sb = new StringBuilder(name.length() + 12);
+    sb.append(name);
+    if (recNo > 0) {
+      sb.append('.').append(recNo);
+    }
+    return sb.toString();
+  }
+
+  private void setAttrCacheMetric(AbstractMetric metric, int recNo) {
+    String key = metricName(metric.name(), recNo);
+    attrCache.put(key, new Attribute(key, metric.value()));
+  }
+
+  String name() {
+    return name;
+  }
+
+  MetricsSource source() {
+    return source;
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java Fri May  6 07:28:43 2011
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.io.StringWriter;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import javax.management.ObjectName;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Locale;
+import static com.google.common.base.Preconditions.*;
+
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math.util.MathUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import static org.apache.hadoop.metrics2.impl.MetricsConfig.*;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.Interns;
+import org.apache.hadoop.metrics2.lib.MetricsAnnotations;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder;
+import org.apache.hadoop.metrics2.lib.MutableStat;
+import org.apache.hadoop.metrics2.util.MBeans;
+
+/**
+ * A base class for metrics system singletons
+ */
+@InterfaceAudience.Private
+@Metrics(context="metricssystem")
+public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
+
+  static final Log LOG = LogFactory.getLog(MetricsSystemImpl.class);
+  static final String MS_NAME = "MetricsSystem";
+  static final String MS_STATS_NAME = MS_NAME +",sub=Stats";
+  static final String MS_STATS_DESC = "Metrics system metrics";
+  static final String MS_CONTROL_NAME = MS_NAME +",sub=Control";
+  static final String MS_INIT_MODE_KEY = "hadoop.metrics.init.mode";
+
+  enum InitMode { NORMAL, STANDBY }
+
+  private final Map<String, MetricsSourceAdapter> sources;
+  private final Map<String, MetricsSource> allSources;
+  private final Map<String, MetricsSinkAdapter> sinks;
+  private final Map<String, MetricsSink> allSinks;
+  private final List<Callback> callbacks;
+  private final MetricsCollectorImpl collector;
+  private final MetricsRegistry registry = new MetricsRegistry(MS_NAME);
+  @Metric({"Snapshot", "Snapshot stats"}) MutableStat snapshotStat;
+  @Metric({"Publish", "Publishing stats"}) MutableStat publishStat;
+  @Metric("Dropped updates by all sinks") MutableCounterLong droppedPubAll;
+
+  private final List<MetricsTag> injectedTags;
+
+  // Things that are changed by init()/start()/stop()
+  private String prefix;
+  private MetricsFilter sourceFilter;
+  private MetricsConfig config;
+  private Map<String, MetricsConfig> sourceConfigs, sinkConfigs;
+  private boolean monitoring = false;
+  private Timer timer;
+  private int period; // seconds
+  private long logicalTime; // number of timer invocations * period
+  private ObjectName mbeanName;
+  private boolean publishSelfMetrics = true;
+  private MetricsSourceAdapter sysSource;
+  private int refCount = 0; // for mini cluster mode
+
+  /**
+   * Construct the metrics system
+   * @param prefix  for the system
+   */
+  public MetricsSystemImpl(String prefix) {
+    this.prefix = prefix;
+    allSources = Maps.newHashMap();
+    sources = Maps.newLinkedHashMap();
+    allSinks = Maps.newHashMap();
+    sinks = Maps.newLinkedHashMap();
+    sourceConfigs = Maps.newHashMap();
+    sinkConfigs = Maps.newHashMap();
+    callbacks = Lists.newArrayList();
+    injectedTags = Lists.newArrayList();
+    collector = new MetricsCollectorImpl();
+    if (prefix != null) {
+      // prefix could be null for default ctor, which requires init later
+      initSystemMBean();
+    }
+  }
+
+  /**
+   * Construct the system but not initializing (read config etc.) it.
+   */
+  public MetricsSystemImpl() {
+    this(null);
+  }
+
+  /**
+   * Initialized the metrics system with a prefix.
+   * @param prefix  the system will look for configs with the prefix
+   * @return the metrics system object itself
+   */
+  @Override
+  public synchronized MetricsSystem init(String prefix) {
+    if (monitoring && !DefaultMetricsSystem.inMiniClusterMode()) {
+      LOG.warn(this.prefix +" metrics system already initialized!");
+      return this;
+    }
+    this.prefix = checkNotNull(prefix, "prefix");
+    ++refCount;
+    if (monitoring) {
+      // in mini cluster mode
+      LOG.info(this.prefix +" metrics system started (again)");
+      return this;
+    }
+    switch (initMode()) {
+      case NORMAL:
+        try { start(); }
+        catch (MetricsConfigException e) {
+          // Usually because hadoop-metrics2.properties is missing
+          // We can always start the metrics system later via JMX.
+          LOG.warn("Metrics system not started: "+ e.getMessage());
+          LOG.debug("Stacktrace: ", e);
+        }
+        break;
+      case STANDBY:
+        LOG.info(prefix +" metrics system started in standby mode");
+    }
+    initSystemMBean();
+    return this;
+  }
+
+  @Override
+  public synchronized void start() {
+    checkNotNull(prefix, "prefix");
+    if (monitoring) {
+      LOG.warn(prefix +" metrics system already started!",
+               new MetricsException("Illegal start"));
+      return;
+    }
+    for (Callback cb : callbacks) cb.preStart();
+    configure(prefix);
+    startTimer();
+    monitoring = true;
+    LOG.info(prefix +" metrics system started");
+    for (Callback cb : callbacks) cb.postStart();
+  }
+
+  @Override
+  public synchronized void stop() {
+    if (!monitoring && !DefaultMetricsSystem.inMiniClusterMode()) {
+      LOG.warn(prefix +" metrics system not yet started!",
+               new MetricsException("Illegal stop"));
+      return;
+    }
+    if (!monitoring) {
+      // in mini cluster mode
+      LOG.info(prefix +" metrics system stopped (again)");
+      return;
+    }
+    for (Callback cb : callbacks) cb.preStop();
+    LOG.info("Stopping "+ prefix +" metrics system...");
+    stopTimer();
+    stopSources();
+    stopSinks();
+    clearConfigs();
+    monitoring = false;
+    LOG.info(prefix +" metrics system stopped.");
+    for (Callback cb : callbacks) cb.postStop();
+  }
+
+  @Override public synchronized <T>
+  T register(String name, String desc, T source) {
+    MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(source);
+    final MetricsSource s = sb.build();
+    MetricsInfo si = sb.info();
+    String name2 = name == null ? si.name() : name;
+    final String finalDesc = desc == null ? si.description() : desc;
+    final String finalName = // be friendly to non-metrics tests
+        DefaultMetricsSystem.sourceName(name2, !monitoring);
+    allSources.put(finalName, s);
+    LOG.debug(finalName +", "+ finalDesc);
+    if (monitoring) {
+      registerSource(finalName, finalDesc, s);
+    }
+    // We want to re-register the source to pick up new config when the
+    // metrics system restarts.
+    register(new AbstractCallback() {
+      @Override public void postStart() {
+        registerSource(finalName, finalDesc, s);
+      }
+    });
+    return source;
+  }
+
+  synchronized
+  void registerSource(String name, String desc, MetricsSource source) {
+    checkNotNull(config, "config");
+    MetricsConfig conf = sourceConfigs.get(name);
+    MetricsSourceAdapter sa = conf != null
+        ? new MetricsSourceAdapter(prefix, name, desc, source,
+                                   injectedTags, period, conf)
+        : new MetricsSourceAdapter(prefix, name, desc, source,
+          injectedTags, period, config.subset(SOURCE_KEY));
+    sources.put(name, sa);
+    sa.start();
+    LOG.info("Registered source "+ name);
+  }
+
+  @Override public synchronized <T extends MetricsSink>
+  T register(final String name, final String description, final T sink) {
+    LOG.debug(name +", "+ description);
+    if (allSinks.containsKey(name)) {
+      LOG.warn("Sink "+ name +" already exists!");
+      return sink;
+    }
+    allSinks.put(name, sink);
+    if (config != null) {
+      registerSink(name, description, sink);
+    }
+    // We want to re-register the sink to pick up new config
+    // when the metrics system restarts.
+    register(new AbstractCallback() {
+      @Override public void postStart() {
+        register(name, description, sink);
+      }
+    });
+    return sink;
+  }
+
+  synchronized void registerSink(String name, String desc, MetricsSink sink) {
+    checkNotNull(config, "config");
+    MetricsConfig conf = sinkConfigs.get(name);
+    MetricsSinkAdapter sa = conf != null
+        ? newSink(name, desc, sink, conf)
+        : newSink(name, desc, sink, config.subset(SINK_KEY));
+    sinks.put(name, sa);
+    sa.start();
+    LOG.info("Registered sink "+ name);
+  }
+
+  @Override
+  public synchronized void register(final Callback callback) {
+    callbacks.add((Callback) Proxy.newProxyInstance(
+        callback.getClass().getClassLoader(), new Class<?>[] { Callback.class },
+        new InvocationHandler() {
+          @Override
+          public Object invoke(Object proxy, Method method, Object[] args)
+              throws Throwable {
+            try {
+              return method.invoke(callback, args);
+            }
+            catch (Exception e) {
+              // These are not considered fatal.
+              LOG.warn("Caught exception in callback "+ method.getName(), e);
+            }
+            return null;
+          }
+        }));
+  }
+
+  @Override
+  public synchronized void startMetricsMBeans() {
+    for (MetricsSourceAdapter sa : sources.values()) {
+      sa.startMBeans();
+    }
+  }
+
+  @Override
+  public synchronized void stopMetricsMBeans() {
+    for (MetricsSourceAdapter sa : sources.values()) {
+      sa.stopMBeans();
+    }
+  }
+
+  @Override
+  public synchronized String currentConfig() {
+    PropertiesConfiguration saver = new PropertiesConfiguration();
+    StringWriter writer = new StringWriter();
+    saver.copy(config);
+    try { saver.save(writer); }
+    catch (Exception e) {
+      throw new MetricsConfigException("Error stringify config", e);
+    }
+    return writer.toString();
+  }
+
+  private synchronized void startTimer() {
+    if (timer != null) {
+      LOG.warn(prefix +" metrics system timer already started!");
+      return;
+    }
+    logicalTime = 0;
+    long millis = period * 1000;
+    timer = new Timer("Timer for '"+ prefix +"' metrics system", true);
+    timer.scheduleAtFixedRate(new TimerTask() {
+          public void run() {
+            try {
+              onTimerEvent();
+            }
+            catch (Exception e) {
+              LOG.warn(e);
+            }
+          }
+        }, millis, millis);
+    LOG.info("Scheduled snapshot period at "+ period +" second(s).");
+  }
+
+  synchronized void onTimerEvent() {
+    logicalTime += period;
+    if (sinks.size() > 0) {
+      publishMetrics(sampleMetrics());
+    }
+  }
+
+  /**
+   * Sample all the sources for a snapshot of metrics/tags
+   * @return  the metrics buffer containing the snapshot
+   */
+  synchronized MetricsBuffer sampleMetrics() {
+    collector.clear();
+    MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder();
+
+    for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
+      if (sourceFilter == null || sourceFilter.accepts(entry.getKey())) {
+        snapshotMetrics(entry.getValue(), bufferBuilder);
+      }
+    }
+    if (publishSelfMetrics) {
+      snapshotMetrics(sysSource, bufferBuilder);
+    }
+    MetricsBuffer buffer = bufferBuilder.get();
+    return buffer;
+  }
+
+  private void snapshotMetrics(MetricsSourceAdapter sa,
+                               MetricsBufferBuilder bufferBuilder) {
+    long startTime = System.currentTimeMillis();
+    bufferBuilder.add(sa.name(), sa.getMetrics(collector, false));
+    collector.clear();
+    snapshotStat.add(System.currentTimeMillis() - startTime);
+    LOG.debug("Snapshotted source "+ sa.name());
+  }
+
+  /**
+   * Publish a metrics snapshot to all the sinks
+   * @param buffer  the metrics snapshot to publish
+   */
+  synchronized void publishMetrics(MetricsBuffer buffer) {
+    int dropped = 0;
+    for (MetricsSinkAdapter sa : sinks.values()) {
+      long startTime = System.currentTimeMillis();
+      dropped += sa.putMetrics(buffer, logicalTime) ? 0 : 1;
+      publishStat.add(System.currentTimeMillis() - startTime);
+    }
+    droppedPubAll.incr(dropped);
+  }
+
+  private synchronized void stopTimer() {
+    if (timer == null) {
+      LOG.warn(prefix +" metrics system timer already stopped!");
+      return;
+    }
+    timer.cancel();
+    timer = null;
+  }
+
+  private synchronized void stopSources() {
+    for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
+      MetricsSourceAdapter sa = entry.getValue();
+      LOG.info("Stopping metrics source "+ entry.getKey());
+      LOG.debug(sa.source().getClass());
+      sa.stop();
+    }
+    sysSource.stop();
+    sources.clear();
+  }
+
+  private synchronized void stopSinks() {
+    for (Entry<String, MetricsSinkAdapter> entry : sinks.entrySet()) {
+      MetricsSinkAdapter sa = entry.getValue();
+      LOG.info("Stopping metrics sink "+ entry.getKey());
+      LOG.debug(sa.sink().getClass());
+      sa.stop();
+    }
+    sinks.clear();
+  }
+
+  private synchronized void configure(String prefix) {
+    config = MetricsConfig.create(prefix);
+    configureSinks();
+    configureSources();
+    configureSystem();
+  }
+
+  private synchronized void configureSystem() {
+    injectedTags.add(Interns.tag(MsInfo.Hostname, getHostname()));
+  }
+
+  private synchronized void configureSinks() {
+    sinkConfigs = config.getInstanceConfigs(SINK_KEY);
+    int confPeriod = 0;
+    for (Entry<String, MetricsConfig> entry : sinkConfigs.entrySet()) {
+      MetricsConfig conf = entry.getValue();
+      int sinkPeriod = conf.getInt(PERIOD_KEY, PERIOD_DEFAULT);
+      confPeriod = confPeriod == 0 ? sinkPeriod
+                                   : MathUtils.gcd(confPeriod, sinkPeriod);
+      String clsName = conf.getClassName("");
+      if (clsName == null) continue;  // sink can be registered later on
+      String sinkName = entry.getKey();
+      try {
+        MetricsSinkAdapter sa = newSink(sinkName,
+            conf.getString(DESC_KEY, sinkName), conf);
+        sa.start();
+        sinks.put(sinkName, sa);
+      }
+      catch (Exception e) {
+        LOG.warn("Error creating sink '"+ sinkName +"'", e);
+      }
+    }
+    period = confPeriod > 0 ? confPeriod
+                            : config.getInt(PERIOD_KEY, PERIOD_DEFAULT);
+  }
+
+  static MetricsSinkAdapter newSink(String name, String desc, MetricsSink sink,
+                                    MetricsConfig conf) {
+    return new MetricsSinkAdapter(name, desc, sink, conf.getString(CONTEXT_KEY),
+        conf.getFilter(SOURCE_FILTER_KEY),
+        conf.getFilter(RECORD_FILTER_KEY),
+        conf.getFilter(METRIC_FILTER_KEY),
+        conf.getInt(PERIOD_KEY, PERIOD_DEFAULT),
+        conf.getInt(QUEUE_CAPACITY_KEY, QUEUE_CAPACITY_DEFAULT),
+        conf.getInt(RETRY_DELAY_KEY, RETRY_DELAY_DEFAULT),
+        conf.getFloat(RETRY_BACKOFF_KEY, RETRY_BACKOFF_DEFAULT),
+        conf.getInt(RETRY_COUNT_KEY, RETRY_COUNT_DEFAULT));
+  }
+
+  static MetricsSinkAdapter newSink(String name, String desc,
+                                    MetricsConfig conf) {
+    return newSink(name, desc, (MetricsSink) conf.getPlugin(""), conf);
+  }
+
+  private void configureSources() {
+    sourceFilter = config.getFilter(PREFIX_DEFAULT + SOURCE_FILTER_KEY);
+    sourceConfigs = config.getInstanceConfigs(SOURCE_KEY);
+    registerSystemSource();
+  }
+
+  private void clearConfigs() {
+    sinkConfigs.clear();
+    sourceConfigs.clear();
+    injectedTags.clear();
+    config = null;
+  }
+
+  static String getHostname() {
+    try {
+      return InetAddress.getLocalHost().getHostName();
+    }
+    catch (Exception e) {
+      LOG.error("Error getting localhost name. Using 'localhost'...", e);
+    }
+    return "localhost";
+  }
+
+  private void registerSystemSource() {
+    MetricsConfig sysConf = sourceConfigs.get(MS_NAME);
+    sysSource = new MetricsSourceAdapter(prefix, MS_STATS_NAME, MS_STATS_DESC,
+        MetricsAnnotations.makeSource(this), injectedTags, period,
+        sysConf == null ? config.subset(SOURCE_KEY) : sysConf);
+    sysSource.start();
+  }
+
+  @Override
+  public synchronized void getMetrics(MetricsCollector builder, boolean all) {
+    MetricsRecordBuilder rb = builder.addRecord(MS_NAME)
+        .addGauge(MsInfo.NumActiveSources, sources.size())
+        .addGauge(MsInfo.NumAllSources, allSources.size())
+        .addGauge(MsInfo.NumActiveSinks, sinks.size())
+        .addGauge(MsInfo.NumAllSinks, allSinks.size());
+
+    for (MetricsSinkAdapter sa : sinks.values()) {
+      sa.snapshot(rb, all);
+    }
+    registry.snapshot(rb, all);
+  }
+
+  private void initSystemMBean() {
+    checkNotNull(prefix, "prefix should not be null here!");
+    if (mbeanName == null) {
+      mbeanName = MBeans.register(prefix, MS_CONTROL_NAME, this);
+    }
+  }
+
+  @Override
+  public synchronized boolean shutdown() {
+    LOG.debug("refCount="+ refCount);
+    if (refCount <= 0) LOG.debug("Redundant shutdown", new Throwable());
+    if (--refCount > 0) return false;
+    if (monitoring) {
+      try { stop(); }
+      catch (Exception e) {
+        LOG.warn("Error stopping the metrics system", e);
+      }
+    }
+    allSources.clear();
+    allSinks.clear();
+    callbacks.clear();
+    if (mbeanName != null) {
+      MBeans.unregister(mbeanName);
+      mbeanName = null;
+    }
+    LOG.info(prefix +" metrics system shutdown complete.");
+    return true;
+  }
+
+  public MetricsSource getSource(String name) {
+    return allSources.get(name);
+  }
+
+  private InitMode initMode() {
+    LOG.debug("from system property: "+ System.getProperty(MS_INIT_MODE_KEY));
+    LOG.debug("from environment variable: "+ System.getenv(MS_INIT_MODE_KEY));
+    String m = System.getProperty(MS_INIT_MODE_KEY);
+    String m2 = m == null ? System.getenv(MS_INIT_MODE_KEY) : m;
+    return InitMode.valueOf((m2 == null ? InitMode.NORMAL.name() : m2)
+                            .toUpperCase(Locale.US));
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MsInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MsInfo.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MsInfo.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MsInfo.java Fri May  6 07:28:43 2011
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import com.google.common.base.Objects;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsInfo;
+
+/**
+ * Metrics system related metrics info instances
+ */
+@InterfaceAudience.Private
+public enum MsInfo implements MetricsInfo {
+  NumActiveSources("Number of active metrics sources"),
+  NumAllSources("Number of all registered metrics sources"),
+  NumActiveSinks("Number of active metrics sinks"),
+  NumAllSinks("Number of all registered metrics sinks"),
+  Context("Metrics context"),
+  Hostname("Local hostname"),
+  SessionId("Session ID"),
+  ProcessName("Process name");
+
+  private final String desc;
+
+  MsInfo(String desc) {
+    this.desc = desc;
+  }
+
+  @Override public String description() {
+    return desc;
+  }
+
+  @Override public String toString() {
+    return Objects.toStringHelper(this)
+        .add("name", name()).add("description", desc)
+        .toString();
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/SinkQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/SinkQueue.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/SinkQueue.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/SinkQueue.java Fri May  6 07:28:43 2011
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.ConcurrentModificationException;
+
+/**
+ * A half-blocking (nonblocking for producers, blocking for consumers) queue
+ * for metrics sinks.
+ *
+ * New elements are dropped when the queue is full to preserve "interesting"
+ * elements at the onset of queue filling events
+ */
+class SinkQueue<T> {
+
+  interface Consumer<T> {
+    void consume(T object) throws InterruptedException;
+  }
+
+  // A fixed size circular buffer to minimize garbage
+  private final T[] data;
+  private int head; // head position
+  private int tail; // tail position
+  private int size; // number of elements
+  private Thread currentConsumer = null;
+
+  @SuppressWarnings("unchecked")
+  SinkQueue(int capacity) {
+    this.data = (T[]) new Object[Math.max(1, capacity)];
+    head = tail = size = 0;
+  }
+
+  synchronized boolean enqueue(T e) {
+    if (data.length == size) {
+      return false;
+    }
+    ++size;
+    tail = (tail + 1) % data.length;
+    data[tail] = e;
+    notify();
+    return true;
+  }
+
+  /**
+   * Consume one element, will block if queue is empty
+   * Only one consumer at a time is allowed
+   * @param consumer  the consumer callback object
+   */
+  void consume(Consumer<T> consumer) throws InterruptedException {
+    T e = waitForData();
+
+    try {
+      consumer.consume(e);  // can take forever
+      _dequeue();
+    }
+    finally {
+      clearConsumerLock();
+    }
+  }
+
+  /**
+   * Consume all the elements, will block if queue is empty
+   * @param consumer  the consumer callback object
+   * @throws InterruptedException
+   */
+  void consumeAll(Consumer<T> consumer) throws InterruptedException {
+    waitForData();
+
+    try {
+      for (int i = size(); i-- > 0; ) {
+        consumer.consume(front()); // can take forever
+        _dequeue();
+      }
+    }
+    finally {
+      clearConsumerLock();
+    }
+  }
+
+  /**
+   * Dequeue one element from head of the queue, will block if queue is empty
+   * @return  the first element
+   * @throws InterruptedException
+   */
+  synchronized T dequeue() throws InterruptedException {
+    checkConsumer();
+
+    while (0 == size) {
+      wait();
+    }
+    return _dequeue();
+  }
+
+  private synchronized T waitForData() throws InterruptedException {
+    checkConsumer();
+
+    while (0 == size) {
+      wait();
+    }
+    setConsumerLock();
+    return front();
+  }
+
+  private synchronized void checkConsumer() {
+    if (currentConsumer != null) {
+      throw new ConcurrentModificationException("The "+
+          currentConsumer.getName() +" thread is consuming the queue.");
+    }
+  }
+
+  private synchronized void setConsumerLock() {
+    currentConsumer = Thread.currentThread();
+  }
+
+  private synchronized void clearConsumerLock() {
+    currentConsumer = null;
+  }
+
+  private synchronized T _dequeue() {
+    if (0 == size) {
+      throw new IllegalStateException("Size must > 0 here.");
+    }
+    --size;
+    head = (head + 1) % data.length;
+    T ret = data[head];
+    data[head] = null;  // hint to gc
+    return ret;
+  }
+
+  synchronized T front() {
+    return data[(head + 1) % data.length];
+  }
+
+  synchronized T back() {
+    return data[tail];
+  }
+
+  synchronized void clear() {
+    checkConsumer();
+
+    for (int i = data.length; i-- > 0; ) {
+      data[i] = null;
+    }
+    size = 0;
+  }
+
+  synchronized int size() {
+    return size;
+  }
+
+  int capacity() {
+    return data.length;
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/package-info.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/package-info.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/package-info.java Fri May  6 07:28:43 2011
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * A metrics system implementation
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsFactory.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsFactory.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsFactory.java Fri May  6 07:28:43 2011
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.lib;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsException;
+
+/**
+ * Experimental interface to extend metrics dynamically
+ */
+@InterfaceAudience.Private
+public enum DefaultMetricsFactory {
+  INSTANCE; // the singleton
+
+  private MutableMetricsFactory mmfImpl;
+
+  public static MutableMetricsFactory getAnnotatedMetricsFactory() {
+    return INSTANCE.getInstance(MutableMetricsFactory.class);
+  }
+
+  @SuppressWarnings("unchecked")
+  public synchronized <T> T getInstance(Class<T> cls) {
+    if (cls == MutableMetricsFactory.class) {
+      if (mmfImpl == null) {
+        mmfImpl = new MutableMetricsFactory();
+      }
+      return (T) mmfImpl;
+    }
+    throw new MetricsException("Unknown metrics factory type: "+ cls.getName());
+  }
+
+  public synchronized void setInstance(MutableMetricsFactory factory) {
+    mmfImpl = factory;
+  }
+}

Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java Fri May  6 07:28:43 2011
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.lib;
+
+import javax.management.ObjectName;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+
+/**
+ * The default metrics system singleton
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public enum DefaultMetricsSystem {
+  INSTANCE; // the singleton
+
+  private MetricsSystem impl = new MetricsSystemImpl();
+  volatile boolean miniClusterMode = false;
+  final UniqueNames mBeanNames = new UniqueNames();
+  final UniqueNames sourceNames = new UniqueNames();
+
+  /**
+   * Convenience method to initialize the metrics system
+   * @param prefix  for the metrics system configuration
+   * @return the metrics system instance
+   */
+  public static MetricsSystem initialize(String prefix) {
+    return INSTANCE.init(prefix);
+  }
+
+  synchronized MetricsSystem init(String prefix) {
+    return impl.init(prefix);
+  }
+
+  /**
+   * @return the metrics system object
+   */
+  public static MetricsSystem instance() {
+    return INSTANCE.getImpl();
+  }
+
+  /**
+   * Shutdown the metrics system
+   */
+  public static void shutdown() {
+    INSTANCE.shutdownInstance();
+  }
+
+  synchronized void shutdownInstance() {
+    if (impl.shutdown()) {
+      mBeanNames.map.clear();
+      sourceNames.map.clear();
+    }
+  }
+
+  @InterfaceAudience.Private
+  public static MetricsSystem setInstance(MetricsSystem ms) {
+    return INSTANCE.setImpl(ms);
+  }
+
+  synchronized MetricsSystem setImpl(MetricsSystem ms) {
+    MetricsSystem old = impl;
+    impl = ms;
+    return old;
+  }
+
+  synchronized MetricsSystem getImpl() { return impl; }
+
+  @InterfaceAudience.Private
+  public static void setMiniClusterMode(boolean choice) {
+    INSTANCE.miniClusterMode = choice;
+  }
+
+  @InterfaceAudience.Private
+  public static boolean inMiniClusterMode() {
+    return INSTANCE.miniClusterMode;
+  }
+
+  @InterfaceAudience.Private
+  public static ObjectName newMBeanName(String name) {
+    return INSTANCE.newObjectName(name);
+  }
+
+  @InterfaceAudience.Private
+  public static String sourceName(String name, boolean dupOK) {
+    return INSTANCE.newSourceName(name, dupOK);
+  }
+
+  synchronized ObjectName newObjectName(String name) {
+    try {
+      if (mBeanNames.map.containsKey(name) && !miniClusterMode) {
+        throw new MetricsException(name +" already exists!");
+      }
+      return new ObjectName(mBeanNames.uniqueName(name));
+    } catch (Exception e) {
+      throw new MetricsException(e);
+    }
+  }
+
+  synchronized String newSourceName(String name, boolean dupOK) {
+    if (sourceNames.map.containsKey(name)) {
+      if (dupOK) {
+        return name;
+      }
+      throw new MetricsException("Metrics source "+ name +" already exists!");
+    }
+    return sourceNames.uniqueName(name);
+  }
+}