You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2011/05/06 09:28:46 UTC
svn commit: r1100113 [2/5] - in /hadoop/common/trunk: ./ conf/ ivy/
src/java/org/apache/hadoop/metrics/file/ src/java/org/apache/hadoop/metrics2/
src/java/org/apache/hadoop/metrics2/annotation/
src/java/org/apache/hadoop/metrics2/filter/ src/java/org/a...
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java Fri May 6 07:28:43 2011
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricType;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+class MetricGaugeLong extends AbstractMetric {
+ final long value;
+
+ MetricGaugeLong(MetricsInfo info, long value) {
+ super(info);
+ this.value = value;
+ }
+
+ @Override
+ public Long value() {
+ return value;
+ }
+
+ @Override
+ public MetricType type() {
+ return MetricType.GAUGE;
+ }
+
+ @Override
+ public void visit(MetricsVisitor visitor) {
+ visitor.gauge(this, value);
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBuffer.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBuffer.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBuffer.java Fri May 6 07:28:43 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.Iterator;
+
+/**
+ * An immutable element for the sink queues.
+ */
+class MetricsBuffer implements Iterable<MetricsBuffer.Entry> {
+
+ private final Iterable<Entry> mutable;
+
+ MetricsBuffer(Iterable<MetricsBuffer.Entry> mutable) {
+ this.mutable = mutable;
+ }
+
+ @Override
+ public Iterator<Entry> iterator() {
+ return mutable.iterator();
+ }
+
+ static class Entry {
+ private final String sourceName;
+ private final Iterable<MetricsRecordImpl> records;
+
+ Entry(String name, Iterable<MetricsRecordImpl> records) {
+ sourceName = name;
+ this.records = records;
+ }
+
+ String name() {
+ return sourceName;
+ }
+
+ Iterable<MetricsRecordImpl> records() {
+ return records;
+ }
+ }
+
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java Fri May 6 07:28:43 2011
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.ArrayList;
+
+/**
+ * Builder for the immutable metrics buffers
+ */
+class MetricsBufferBuilder extends ArrayList<MetricsBuffer.Entry> {
+ private static final long serialVersionUID = 1L;
+
+ boolean add(String name, Iterable<MetricsRecordImpl> records) {
+ return add(new MetricsBuffer.Entry(name, records));
+ }
+
+ MetricsBuffer get() {
+ return new MetricsBuffer(this);
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsCollectorImpl.java Fri May 6 07:28:43 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import static org.apache.hadoop.metrics2.lib.Interns.*;
+
+class MetricsCollectorImpl implements MetricsCollector,
+ Iterable<MetricsRecordBuilderImpl> {
+
+ private final List<MetricsRecordBuilderImpl> rbs = Lists.newArrayList();
+ private MetricsFilter recordFilter, metricFilter;
+
+ @Override
+ public MetricsRecordBuilderImpl addRecord(MetricsInfo info) {
+ boolean acceptable = recordFilter == null ||
+ recordFilter.accepts(info.name());
+ MetricsRecordBuilderImpl rb = new MetricsRecordBuilderImpl(this, info,
+ recordFilter, metricFilter, acceptable);
+ if (acceptable) rbs.add(rb);
+ return rb;
+ }
+
+ @Override
+ public MetricsRecordBuilderImpl addRecord(String name) {
+ return addRecord(info(name, name +" record"));
+ }
+
+ public List<MetricsRecordImpl> getRecords() {
+ List<MetricsRecordImpl> recs = Lists.newArrayListWithCapacity(rbs.size());
+ for (MetricsRecordBuilderImpl rb : rbs) {
+ MetricsRecordImpl mr = rb.getRecord();
+ if (mr != null) {
+ recs.add(mr);
+ }
+ }
+ return recs;
+ }
+
+ @Override
+ public Iterator<MetricsRecordBuilderImpl> iterator() {
+ return rbs.iterator();
+ }
+
+ void clear() { rbs.clear(); }
+
+ MetricsCollectorImpl setRecordFilter(MetricsFilter rf) {
+ recordFilter = rf;
+ return this;
+ }
+
+ MetricsCollectorImpl setMetricFilter(MetricsFilter mf) {
+ metricFilter = mf;
+ return this;
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfig.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfig.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfig.java Fri May 6 07:28:43 2011
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import static java.security.AccessController.*;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsPlugin;
+import org.apache.hadoop.metrics2.filter.GlobFilter;
+
+/**
+ * Metrics configuration for MetricsSystemImpl
+ */
+class MetricsConfig extends SubsetConfiguration {
+
+ static final Log LOG = LogFactory.getLog(MetricsConfig.class);
+
+ static final String DEFAULT_FILE_NAME = "hadoop-metrics2.properties";
+ static final String PREFIX_DEFAULT = "*.";
+
+ static final String PERIOD_KEY = "period";
+ static final int PERIOD_DEFAULT = 10; // seconds
+
+ static final String QUEUE_CAPACITY_KEY = "queue.capacity";
+ static final int QUEUE_CAPACITY_DEFAULT = 1;
+
+ static final String RETRY_DELAY_KEY = "retry.delay";
+ static final int RETRY_DELAY_DEFAULT = 10; // seconds
+ static final String RETRY_BACKOFF_KEY = "retry.backoff";
+ static final int RETRY_BACKOFF_DEFAULT = 2; // back off factor
+ static final String RETRY_COUNT_KEY = "retry.count";
+ static final int RETRY_COUNT_DEFAULT = 1;
+
+ static final String JMX_CACHE_TTL_KEY = "jmx.cache.ttl";
+ static final String START_MBEANS_KEY = "source.start_mbeans";
+ static final String PLUGIN_URLS_KEY = "plugin.urls";
+
+ static final String CONTEXT_KEY = "context";
+ static final String NAME_KEY = "name";
+ static final String DESC_KEY = "description";
+ static final String SOURCE_KEY = "source";
+ static final String SINK_KEY = "sink";
+ static final String METRIC_FILTER_KEY = "metric.filter";
+ static final String RECORD_FILTER_KEY = "record.filter";
+ static final String SOURCE_FILTER_KEY = "source.filter";
+
+ static final Pattern INSTANCE_REGEX = Pattern.compile("([^.*]+)\\..+");
+ static final Splitter SPLITTER = Splitter.on(',').trimResults();
+ private ClassLoader pluginLoader;
+
+ MetricsConfig(Configuration c, String prefix) {
+ super(c, prefix.toLowerCase(Locale.US), ".");
+ }
+
+ static MetricsConfig create(String prefix) {
+ return loadFirst(prefix, "hadoop-metrics2-"+ prefix.toLowerCase(Locale.US)
+ +".properties", DEFAULT_FILE_NAME);
+ }
+
+ static MetricsConfig create(String prefix, String... fileNames) {
+ return loadFirst(prefix, fileNames);
+ }
+
+ /**
+ * Load configuration from a list of files until the first successful load
+ * @param conf the configuration object
+ * @param files the list of filenames to try
+ * @return the configuration object
+ */
+ static MetricsConfig loadFirst(String prefix, String... fileNames) {
+ for (String fname : fileNames) {
+ try {
+ Configuration cf = new PropertiesConfiguration(fname)
+ .interpolatedConfiguration();
+ LOG.info("loaded properties from "+ fname);
+ LOG.debug(toString(cf));
+ MetricsConfig mc = new MetricsConfig(cf, prefix);
+ LOG.debug(mc);
+ return mc;
+ }
+ catch (ConfigurationException e) {
+ if (e.getMessage().startsWith("Cannot locate configuration")) {
+ continue;
+ }
+ throw new MetricsConfigException(e);
+ }
+ }
+ throw new MetricsConfigException("Cannot locate configuration: tried "+
+ Joiner.on(",").join(fileNames));
+ }
+
+ @Override
+ public MetricsConfig subset(String prefix) {
+ return new MetricsConfig(this, prefix);
+ }
+
+ /**
+ * Return sub configs for instance specified in the config.
+ * Assuming format specified as follows:<pre>
+ * [type].[instance].[option] = [value]</pre>
+ * Note, '*' is a special default instance, which is excluded in the result.
+ * @param type of the instance
+ * @return a map with [instance] as key and config object as value
+ */
+ Map<String, MetricsConfig> getInstanceConfigs(String type) {
+ Map<String, MetricsConfig> map = Maps.newHashMap();
+ MetricsConfig sub = subset(type);
+
+ for (String key : sub.keys()) {
+ Matcher matcher = INSTANCE_REGEX.matcher(key);
+ if (matcher.matches()) {
+ String instance = matcher.group(1);
+ if (!map.containsKey(instance)) {
+ map.put(instance, sub.subset(instance));
+ }
+ }
+ }
+ return map;
+ }
+
+ Iterable<String> keys() {
+ return new Iterable<String>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Iterator<String> iterator() {
+ return (Iterator<String>) getKeys();
+ }
+ };
+ }
+
+ /**
+ * Will poke parents for defaults
+ * @param key to lookup
+ * @return the value or null
+ */
+ @Override
+ public Object getProperty(String key) {
+ Object value = super.getProperty(key);
+ if (value == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("poking parent '"+ getParent().getClass().getSimpleName() +
+ "' for key: "+ key);
+ }
+ return getParent().getProperty(key.startsWith(PREFIX_DEFAULT) ? key
+ : PREFIX_DEFAULT + key);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("returning '"+ value +"' for key: "+ key);
+ }
+ return value;
+ }
+
+ <T extends MetricsPlugin> T getPlugin(String name) {
+ String clsName = getClassName(name);
+ if (clsName == null) return null;
+ try {
+ Class<?> cls = Class.forName(clsName, true, getPluginLoader());
+ @SuppressWarnings("unchecked")
+ T plugin = (T) cls.newInstance();
+ plugin.init(name.isEmpty() ? this : subset(name));
+ return plugin;
+ }
+ catch (Exception e) {
+ throw new MetricsConfigException("Error creating plugin: "+ clsName, e);
+ }
+ }
+
+ String getClassName(String prefix) {
+ String classKey = prefix.isEmpty() ? "class" : prefix +".class";
+ String clsName = getString(classKey);
+ LOG.debug(clsName);
+ if (clsName == null || clsName.isEmpty()) {
+ return null;
+ }
+ return clsName;
+ }
+
+ ClassLoader getPluginLoader() {
+ if (pluginLoader != null) return pluginLoader;
+ final ClassLoader defaultLoader = getClass().getClassLoader();
+ Object purls = super.getProperty(PLUGIN_URLS_KEY);
+ if (purls == null) return defaultLoader;
+ Iterable<String> jars = SPLITTER.split((String) purls);
+ int len = Iterables.size(jars);
+ if ( len > 0) {
+ final URL[] urls = new URL[len];
+ try {
+ int i = 0;
+ for (String jar : jars) {
+ LOG.debug(jar);
+ urls[i++] = new URL(jar);
+ }
+ }
+ catch (Exception e) {
+ throw new MetricsConfigException(e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("using plugin jars: "+ Iterables.toString(jars));
+ }
+ pluginLoader = doPrivileged(new PrivilegedAction<ClassLoader>() {
+ @Override public ClassLoader run() {
+ return new URLClassLoader(urls, defaultLoader);
+ }
+ });
+ return pluginLoader;
+ }
+ if (parent instanceof MetricsConfig) {
+ return ((MetricsConfig) parent).getPluginLoader();
+ }
+ return defaultLoader;
+ }
+
+ @Override public void clear() {
+ super.clear();
+ // pluginLoader.close(); // jdk7 is saner
+ }
+
+ MetricsFilter getFilter(String prefix) {
+ // don't create filter instances without out options
+ MetricsConfig conf = subset(prefix);
+ if (conf.isEmpty()) return null;
+ MetricsFilter filter = getPlugin(prefix);
+ if (filter != null) return filter;
+ // glob filter is assumed if pattern is specified but class is not.
+ filter = new GlobFilter();
+ filter.init(conf);
+ return filter;
+ }
+
+ @Override
+ public String toString() {
+ return toString(this);
+ }
+
+ static String toString(Configuration c) {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(buffer);
+ PropertiesConfiguration tmp = new PropertiesConfiguration();
+ tmp.copy(c);
+ try { tmp.save(ps); }
+ catch (Exception e) {
+ throw new MetricsConfigException(e);
+ }
+ return buffer.toString();
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfigException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfigException.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfigException.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsConfigException.java Fri May 6 07:28:43 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricsException;
+
+/**
+ * The metrics configuration runtime exception
+ */
+class MetricsConfigException extends MetricsException {
+ private static final long serialVersionUID = 1L;
+
+ MetricsConfigException(String message) {
+ super(message);
+ }
+
+ MetricsConfigException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ MetricsConfigException(Throwable cause) {
+ super(cause);
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java Fri May 6 07:28:43 2011
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.lib.Interns;
+
+class MetricsRecordBuilderImpl extends MetricsRecordBuilder {
+ private final MetricsCollector parent;
+ private final long timestamp;
+ private final MetricsInfo recInfo;
+ private final List<AbstractMetric> metrics;
+ private final List<MetricsTag> tags;
+ private final MetricsFilter recordFilter, metricFilter;
+ private final boolean acceptable;
+
+ MetricsRecordBuilderImpl(MetricsCollector parent, MetricsInfo info,
+ MetricsFilter rf, MetricsFilter mf,
+ boolean acceptable) {
+ this.parent = parent;
+ timestamp = System.currentTimeMillis();
+ recInfo = info;
+ metrics = Lists.newArrayList();
+ tags = Lists.newArrayList();
+ recordFilter = rf;
+ metricFilter = mf;
+ this.acceptable = acceptable;
+ }
+
+ @Override
+ public MetricsCollector parent() { return parent; }
+
+ @Override
+ public MetricsRecordBuilderImpl tag(MetricsInfo info, String value) {
+ if (acceptable) {
+ tags.add(Interns.tag(info, value));
+ }
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilderImpl add(MetricsTag tag) {
+ tags.add(tag);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilderImpl add(AbstractMetric metric) {
+ metrics.add(metric);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilderImpl addCounter(MetricsInfo info, int value) {
+ if (acceptable && (metricFilter == null ||
+ metricFilter.accepts(info.name()))) {
+ metrics.add(new MetricCounterInt(info, value));
+ }
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilderImpl addCounter(MetricsInfo info, long value) {
+ if (acceptable && (metricFilter == null ||
+ metricFilter.accepts(info.name()))) {
+ metrics.add(new MetricCounterLong(info, value));
+ }
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilderImpl addGauge(MetricsInfo info, int value) {
+ if (acceptable && (metricFilter == null ||
+ metricFilter.accepts(info.name()))) {
+ metrics.add(new MetricGaugeInt(info, value));
+ }
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilderImpl addGauge(MetricsInfo info, long value) {
+ if (acceptable && (metricFilter == null ||
+ metricFilter.accepts(info.name()))) {
+ metrics.add(new MetricGaugeLong(info, value));
+ }
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilderImpl addGauge(MetricsInfo info, float value) {
+ if (acceptable && (metricFilter == null ||
+ metricFilter.accepts(info.name()))) {
+ metrics.add(new MetricGaugeFloat(info, value));
+ }
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilderImpl addGauge(MetricsInfo info, double value) {
+ if (acceptable && (metricFilter == null ||
+ metricFilter.accepts(info.name()))) {
+ metrics.add(new MetricGaugeDouble(info, value));
+ }
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilderImpl setContext(String value) {
+ return tag(MsInfo.Context, value);
+ }
+
+ public MetricsRecordImpl getRecord() {
+ if (acceptable && (recordFilter == null || recordFilter.accepts(tags))) {
+ return new MetricsRecordImpl(recInfo, timestamp, tags(), metrics());
+ }
+ return null;
+ }
+
+ List<MetricsTag> tags() {
+ return Collections.unmodifiableList(tags);
+ }
+
+ List<AbstractMetric> metrics() {
+ return Collections.unmodifiableList(metrics);
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java Fri May 6 07:28:43 2011
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.Iterator;
+import java.util.Collection;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+
+class MetricsRecordFiltered extends AbstractMetricsRecord {
+ private final MetricsRecord delegate;
+ private final MetricsFilter filter;
+
+ MetricsRecordFiltered(MetricsRecord delegate, MetricsFilter filter) {
+ this.delegate = delegate;
+ this.filter = filter;
+ }
+
+ @Override public long timestamp() {
+ return delegate.timestamp();
+ }
+
+ @Override public String name() {
+ return delegate.name();
+ }
+
+ @Override public String description() {
+ return delegate.description();
+ }
+
+ @Override public String context() {
+ return delegate.context();
+ }
+
+ @Override public Collection<MetricsTag> tags() {
+ return delegate.tags();
+ }
+
+ @Override public Iterable<AbstractMetric> metrics() {
+ return new Iterable<AbstractMetric>() {
+ final Iterator<AbstractMetric> it = delegate.metrics().iterator();
+ @Override public Iterator<AbstractMetric> iterator() {
+ return new AbstractIterator<AbstractMetric>() {
+ @Override public AbstractMetric computeNext() {
+ while (it.hasNext()) {
+ AbstractMetric next = it.next();
+ if (filter.accepts(next.name())) {
+ return next;
+ }
+ }
+ return endOfData();
+ }
+ };
+ }
+ };
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java Fri May 6 07:28:43 2011
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.*;
+
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsTag;
+import static org.apache.hadoop.metrics2.util.Contracts.*;
+
+class MetricsRecordImpl extends AbstractMetricsRecord {
+ protected static final String DEFAULT_CONTEXT = "default";
+
+ private final long timestamp;
+ private final MetricsInfo info;
+ private final List<MetricsTag> tags;
+ private final Iterable<AbstractMetric> metrics;
+
+ /**
+ * Construct a metrics record
+ * @param info {@link MetricInfo} of the record
+ * @param timestamp of the record
+ * @param tags of the record
+ * @param metrics of the record
+ */
+ public MetricsRecordImpl(MetricsInfo info, long timestamp,
+ List<MetricsTag> tags,
+ Iterable<AbstractMetric> metrics) {
+ this.timestamp = checkArg(timestamp, timestamp > 0, "timestamp");
+ this.info = checkNotNull(info, "info");
+ this.tags = checkNotNull(tags, "tags");
+ this.metrics = checkNotNull(metrics, "metrics");
+ }
+
+ @Override public long timestamp() {
+ return timestamp;
+ }
+
+ @Override public String name() {
+ return info.name();
+ }
+
+ MetricsInfo info() {
+ return info;
+ }
+
+ @Override public String description() {
+ return info.description();
+ }
+
+ @Override public String context() {
+ // usually the first tag
+ for (MetricsTag t : tags) {
+ if (t.info() == MsInfo.Context) {
+ return t.value();
+ }
+ }
+ return DEFAULT_CONTEXT;
+ }
+
+ @Override
+ public List<MetricsTag> tags() {
+ return tags; // already unmodifiable from MetricsRecordBuilderImpl#tags
+ }
+
+ @Override public Iterable<AbstractMetric> metrics() {
+ return metrics;
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java Fri May 6 07:28:43 2011
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.Random;
+
+import static com.google.common.base.Preconditions.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableStat;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import static org.apache.hadoop.metrics2.util.Contracts.*;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsSink;
+
+/**
+ * An adapter class for metrics sink and associated filters
+ */
+class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
+
+ private final Log LOG = LogFactory.getLog(MetricsSinkAdapter.class);
+ private final String name, description, context;
+ private final MetricsSink sink;
+ private final MetricsFilter sourceFilter, recordFilter, metricFilter;
+ private final SinkQueue<MetricsBuffer> queue;
+ private final Thread sinkThread;
+ private volatile boolean stopping = false;
+ private volatile boolean inError = false;
+ private final int period, firstRetryDelay, retryCount;
+ private final float retryBackoff;
+ private final MetricsRegistry registry = new MetricsRegistry("sinkadapter");
+ private final MutableStat latency;
+ private final MutableCounterInt dropped;
+ private final MutableGaugeInt qsize;
+
+ MetricsSinkAdapter(String name, String description, MetricsSink sink,
+ String context, MetricsFilter sourceFilter,
+ MetricsFilter recordFilter, MetricsFilter metricFilter,
+ int period, int queueCapacity, int retryDelay,
+ float retryBackoff, int retryCount) {
+ this.name = checkNotNull(name, "name");
+ this.description = description;
+ this.sink = checkNotNull(sink, "sink object");
+ this.context = context;
+ this.sourceFilter = sourceFilter;
+ this.recordFilter = recordFilter;
+ this.metricFilter = metricFilter;
+ this.period = checkArg(period, period > 0, "period");
+ firstRetryDelay = checkArg(retryDelay, retryDelay > 0, "retry delay");
+ this.retryBackoff = checkArg(retryBackoff, retryBackoff>1, "retry backoff");
+ this.retryCount = retryCount;
+ this.queue = new SinkQueue<MetricsBuffer>(checkArg(queueCapacity,
+ queueCapacity > 0, "queue capacity"));
+ latency = registry.newRate("Sink_"+ name, "Sink end to end latency", false);
+ dropped = registry.newCounter("Sink_"+ name +"Dropped",
+ "Dropped updates per sink", 0);
+ qsize = registry.newGauge("Sink_"+ name + "Qsize", "Queue size", 0);
+
+ sinkThread = new Thread() {
+ @Override public void run() {
+ publishMetricsFromQueue();
+ }
+ };
+ sinkThread.setName(name);
+ sinkThread.setDaemon(true);
+ }
+
+ boolean putMetrics(MetricsBuffer buffer, long logicalTime) {
+ if (logicalTime % period == 0) {
+ LOG.debug("enqueue, logicalTime="+ logicalTime);
+ if (queue.enqueue(buffer)) return true;
+ dropped.incr();
+ return false;
+ }
+ return true; // OK
+ }
+
+ void publishMetricsFromQueue() {
+ int retryDelay = firstRetryDelay;
+ int n = retryCount;
+ int minDelay = Math.min(500, retryDelay * 1000); // millis
+ Random rng = new Random(System.nanoTime());
+ while (!stopping) {
+ try {
+ queue.consumeAll(this);
+ retryDelay = firstRetryDelay;
+ n = retryCount;
+ inError = false;
+ }
+ catch (InterruptedException e) {
+ LOG.info(name +" thread interrupted.");
+ }
+ catch (Exception e) {
+ if (n > 0) {
+ int retryWindow = Math.max(0, 1000 / 2 * retryDelay - minDelay);
+ int awhile = rng.nextInt(retryWindow) + minDelay;
+ if (!inError) {
+ LOG.error("Got sink exception, retry in "+ awhile +"ms", e);
+ }
+ retryDelay *= retryBackoff;
+ try { Thread.sleep(awhile); }
+ catch (InterruptedException e2) {
+ LOG.info(name +" thread interrupted while waiting for retry", e2);
+ }
+ --n;
+ }
+ else {
+ if (!inError) {
+ LOG.error("Got sink exception and over retry limit, "+
+ "suppressing further error messages", e);
+ }
+ queue.clear();
+ inError = true; // Don't keep complaining ad infinitum
+ }
+ }
+ }
+ }
+
+ @Override
+ public void consume(MetricsBuffer buffer) {
+ long ts = 0;
+ for (MetricsBuffer.Entry entry : buffer) {
+ if (sourceFilter == null || sourceFilter.accepts(entry.name())) {
+ for (MetricsRecordImpl record : entry.records()) {
+ if ((context == null || context.equals(record.context())) &&
+ (recordFilter == null || recordFilter.accepts(record))) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Pushing record "+ entry.name() +"."+ record.context() +
+ "."+ record.name() +" to "+ name);
+ }
+ sink.putMetrics(metricFilter == null
+ ? record
+ : new MetricsRecordFiltered(record, metricFilter));
+ if (ts == 0) ts = record.timestamp();
+ }
+ }
+ }
+ }
+ if (ts > 0) {
+ sink.flush();
+ latency.add(System.currentTimeMillis() - ts);
+ }
+ LOG.debug("Done");
+ }
+
+ void start() {
+ sinkThread.start();
+ LOG.info("Sink "+ name +" started");
+ }
+
+ void stop() {
+ stopping = true;
+ sinkThread.interrupt();
+ try {
+ sinkThread.join();
+ }
+ catch (InterruptedException e) {
+ LOG.warn("Stop interrupted", e);
+ }
+ }
+
+ String name() {
+ return name;
+ }
+
+ String description() {
+ return description;
+ }
+
+ void snapshot(MetricsRecordBuilder rb, boolean all) {
+ registry.snapshot(rb, all);
+ }
+
+ MetricsSink sink() {
+ return sink;
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java Fri May 6 07:28:43 2011
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.HashMap;
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.InvalidAttributeValueException;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import static com.google.common.base.Preconditions.*;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsTag;
+import static org.apache.hadoop.metrics2.impl.MetricsConfig.*;
+import org.apache.hadoop.metrics2.util.MBeans;
+import static org.apache.hadoop.metrics2.util.Contracts.*;
+
+/**
+ * An adapter class for metrics source and associated filter and jmx impl
+ */
+class MetricsSourceAdapter implements DynamicMBean {
+
+ private static final Log LOG = LogFactory.getLog(MetricsSourceAdapter.class);
+
+ private final String prefix, name;
+ private final MetricsSource source;
+ private final MetricsFilter recordFilter, metricFilter;
+ private final HashMap<String, Attribute> attrCache;
+ private final MBeanInfoBuilder infoBuilder;
+ private final Iterable<MetricsTag> injectedTags;
+
+ private Iterable<MetricsRecordImpl> lastRecs;
+ private long jmxCacheTS = 0;
+ private int jmxCacheTTL;
+ private MBeanInfo infoCache;
+ private ObjectName mbeanName;
+ private final boolean startMBeans;
+
+ MetricsSourceAdapter(String prefix, String name, String description,
+ MetricsSource source, Iterable<MetricsTag> injectedTags,
+ MetricsFilter recordFilter, MetricsFilter metricFilter,
+ int jmxCacheTTL, boolean startMBeans) {
+ this.prefix = checkNotNull(prefix, "prefix");
+ this.name = checkNotNull(name, "name");
+ this.source = checkNotNull(source, "source");
+ attrCache = Maps.newHashMap();
+ infoBuilder = new MBeanInfoBuilder(name, description);
+ this.injectedTags = injectedTags;
+ this.recordFilter = recordFilter;
+ this.metricFilter = metricFilter;
+ this.jmxCacheTTL = checkArg(jmxCacheTTL, jmxCacheTTL > 0, "jmxCacheTTL");
+ this.startMBeans = startMBeans;
+ }
+
+ MetricsSourceAdapter(String prefix, String name, String description,
+ MetricsSource source, Iterable<MetricsTag> injectedTags,
+ int period, MetricsConfig conf) {
+ this(prefix, name, description, source, injectedTags,
+ conf.getFilter(RECORD_FILTER_KEY),
+ conf.getFilter(METRIC_FILTER_KEY),
+ period + 1, // hack to avoid most of the "innocuous" races.
+ conf.getBoolean(START_MBEANS_KEY, true));
+ }
+
+ void start() {
+ if (startMBeans) startMBeans();
+ }
+
+ @Override
+ public synchronized Object getAttribute(String attribute)
+ throws AttributeNotFoundException, MBeanException, ReflectionException {
+ updateJmxCache();
+ Attribute a = attrCache.get(attribute);
+ if (a == null) {
+ throw new AttributeNotFoundException(attribute +" not found");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(attribute +": "+ a);
+ }
+ return a.getValue();
+ }
+
+ @Override
+ public void setAttribute(Attribute attribute)
+ throws AttributeNotFoundException, InvalidAttributeValueException,
+ MBeanException, ReflectionException {
+ throw new UnsupportedOperationException("Metrics are read-only.");
+ }
+
+ @Override
+ public synchronized AttributeList getAttributes(String[] attributes) {
+ updateJmxCache();
+ AttributeList ret = new AttributeList();
+ for (String key : attributes) {
+ Attribute attr = attrCache.get(key);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(key +": "+ attr);
+ }
+ ret.add(attr);
+ }
+ return ret;
+ }
+
+ @Override
+ public AttributeList setAttributes(AttributeList attributes) {
+ throw new UnsupportedOperationException("Metrics are read-only.");
+ }
+
+ @Override
+ public Object invoke(String actionName, Object[] params, String[] signature)
+ throws MBeanException, ReflectionException {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public synchronized MBeanInfo getMBeanInfo() {
+ updateJmxCache();
+ return infoCache;
+ }
+
+ private synchronized void updateJmxCache() {
+ if (System.currentTimeMillis() - jmxCacheTS >= jmxCacheTTL) {
+ if (lastRecs == null) {
+ MetricsCollectorImpl builder = new MetricsCollectorImpl();
+ getMetrics(builder, true);
+ }
+ int oldCacheSize = attrCache.size();
+ int newCacheSize = updateAttrCache();
+ if (oldCacheSize < newCacheSize) {
+ updateInfoCache();
+ }
+ jmxCacheTS = System.currentTimeMillis();
+ lastRecs = null; // in case regular interval update is not running
+ }
+ }
+
+ Iterable<MetricsRecordImpl> getMetrics(MetricsCollectorImpl builder,
+ boolean all) {
+ builder.setRecordFilter(recordFilter).setMetricFilter(metricFilter);
+ synchronized(this) {
+ if (lastRecs == null && jmxCacheTS == 0) {
+ all = true; // Get all the metrics to populate the sink caches
+ }
+ }
+ try {
+ source.getMetrics(builder, all);
+ }
+ catch (Exception e) {
+ LOG.error("Error getting metrics from source "+ name, e);
+ }
+ for (MetricsRecordBuilderImpl rb : builder) {
+ for (MetricsTag t : injectedTags) {
+ rb.add(t);
+ }
+ }
+ synchronized(this) {
+ lastRecs = builder.getRecords();
+ return lastRecs;
+ }
+ }
+
+ synchronized void stop() {
+ stopMBeans();
+ }
+
+ synchronized void startMBeans() {
+ if (mbeanName != null) {
+ LOG.warn("MBean "+ name +" already initialized!");
+ LOG.debug("Stacktrace: ", new Throwable());
+ return;
+ }
+ mbeanName = MBeans.register(prefix, name, this);
+ LOG.debug("MBean for source "+ name +" registered.");
+ }
+
+ synchronized void stopMBeans() {
+ if (mbeanName != null) {
+ MBeans.unregister(mbeanName);
+ mbeanName = null;
+ }
+ }
+
+ private void updateInfoCache() {
+ LOG.debug("Updating info cache...");
+ infoCache = infoBuilder.reset(lastRecs).get();
+ LOG.debug("Done");
+ }
+
+ private int updateAttrCache() {
+ LOG.debug("Updating attr cache...");
+ int recNo = 0;
+ int numMetrics = 0;
+ for (MetricsRecordImpl record : lastRecs) {
+ for (MetricsTag t : record.tags()) {
+ setAttrCacheTag(t, recNo);
+ ++numMetrics;
+ }
+ for (AbstractMetric m : record.metrics()) {
+ setAttrCacheMetric(m, recNo);
+ ++numMetrics;
+ }
+ ++recNo;
+ }
+ LOG.debug("Done. # tags & metrics="+ numMetrics);
+ return numMetrics;
+ }
+
+ private static String tagName(String name, int recNo) {
+ StringBuilder sb = new StringBuilder(name.length() + 16);
+ sb.append("tag.").append(name);
+ if (recNo > 0) {
+ sb.append('.').append(recNo);
+ }
+ return sb.toString();
+ }
+
+ private void setAttrCacheTag(MetricsTag tag, int recNo) {
+ String key = tagName(tag.name(), recNo);
+ attrCache.put(key, new Attribute(key, tag.value()));
+ }
+
+ private static String metricName(String name, int recNo) {
+ if (recNo == 0) {
+ return name;
+ }
+ StringBuilder sb = new StringBuilder(name.length() + 12);
+ sb.append(name);
+ if (recNo > 0) {
+ sb.append('.').append(recNo);
+ }
+ return sb.toString();
+ }
+
+ private void setAttrCacheMetric(AbstractMetric metric, int recNo) {
+ String key = metricName(metric.name(), recNo);
+ attrCache.put(key, new Attribute(key, metric.value()));
+ }
+
+ String name() {
+ return name;
+ }
+
+ MetricsSource source() {
+ return source;
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java Fri May 6 07:28:43 2011
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.io.StringWriter;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import javax.management.ObjectName;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Locale;
+import static com.google.common.base.Preconditions.*;
+
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math.util.MathUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import static org.apache.hadoop.metrics2.impl.MetricsConfig.*;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.Interns;
+import org.apache.hadoop.metrics2.lib.MetricsAnnotations;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder;
+import org.apache.hadoop.metrics2.lib.MutableStat;
+import org.apache.hadoop.metrics2.util.MBeans;
+
+/**
+ * A base class for metrics system singletons
+ */
+@InterfaceAudience.Private
+@Metrics(context="metricssystem")
+public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
+
+ static final Log LOG = LogFactory.getLog(MetricsSystemImpl.class);
+ static final String MS_NAME = "MetricsSystem";
+ static final String MS_STATS_NAME = MS_NAME +",sub=Stats";
+ static final String MS_STATS_DESC = "Metrics system metrics";
+ static final String MS_CONTROL_NAME = MS_NAME +",sub=Control";
+ static final String MS_INIT_MODE_KEY = "hadoop.metrics.init.mode";
+
+ enum InitMode { NORMAL, STANDBY }
+
+ private final Map<String, MetricsSourceAdapter> sources;
+ private final Map<String, MetricsSource> allSources;
+ private final Map<String, MetricsSinkAdapter> sinks;
+ private final Map<String, MetricsSink> allSinks;
+ private final List<Callback> callbacks;
+ private final MetricsCollectorImpl collector;
+ private final MetricsRegistry registry = new MetricsRegistry(MS_NAME);
+ @Metric({"Snapshot", "Snapshot stats"}) MutableStat snapshotStat;
+ @Metric({"Publish", "Publishing stats"}) MutableStat publishStat;
+ @Metric("Dropped updates by all sinks") MutableCounterLong droppedPubAll;
+
+ private final List<MetricsTag> injectedTags;
+
+ // Things that are changed by init()/start()/stop()
+ private String prefix;
+ private MetricsFilter sourceFilter;
+ private MetricsConfig config;
+ private Map<String, MetricsConfig> sourceConfigs, sinkConfigs;
+ private boolean monitoring = false;
+ private Timer timer;
+ private int period; // seconds
+ private long logicalTime; // number of timer invocations * period
+ private ObjectName mbeanName;
+ private boolean publishSelfMetrics = true;
+ private MetricsSourceAdapter sysSource;
+ private int refCount = 0; // for mini cluster mode
+
+ /**
+ * Construct the metrics system
+ * @param prefix for the system
+ */
+ public MetricsSystemImpl(String prefix) {
+ this.prefix = prefix;
+ allSources = Maps.newHashMap();
+ sources = Maps.newLinkedHashMap();
+ allSinks = Maps.newHashMap();
+ sinks = Maps.newLinkedHashMap();
+ sourceConfigs = Maps.newHashMap();
+ sinkConfigs = Maps.newHashMap();
+ callbacks = Lists.newArrayList();
+ injectedTags = Lists.newArrayList();
+ collector = new MetricsCollectorImpl();
+ if (prefix != null) {
+ // prefix could be null for default ctor, which requires init later
+ initSystemMBean();
+ }
+ }
+
+ /**
+ * Construct the system but not initializing (read config etc.) it.
+ */
+ public MetricsSystemImpl() {
+ this(null);
+ }
+
+ /**
+ * Initialized the metrics system with a prefix.
+ * @param prefix the system will look for configs with the prefix
+ * @return the metrics system object itself
+ */
+ @Override
+ public synchronized MetricsSystem init(String prefix) {
+ if (monitoring && !DefaultMetricsSystem.inMiniClusterMode()) {
+ LOG.warn(this.prefix +" metrics system already initialized!");
+ return this;
+ }
+ this.prefix = checkNotNull(prefix, "prefix");
+ ++refCount;
+ if (monitoring) {
+ // in mini cluster mode
+ LOG.info(this.prefix +" metrics system started (again)");
+ return this;
+ }
+ switch (initMode()) {
+ case NORMAL:
+ try { start(); }
+ catch (MetricsConfigException e) {
+ // Usually because hadoop-metrics2.properties is missing
+ // We can always start the metrics system later via JMX.
+ LOG.warn("Metrics system not started: "+ e.getMessage());
+ LOG.debug("Stacktrace: ", e);
+ }
+ break;
+ case STANDBY:
+ LOG.info(prefix +" metrics system started in standby mode");
+ }
+ initSystemMBean();
+ return this;
+ }
+
+ @Override
+ public synchronized void start() {
+ checkNotNull(prefix, "prefix");
+ if (monitoring) {
+ LOG.warn(prefix +" metrics system already started!",
+ new MetricsException("Illegal start"));
+ return;
+ }
+ for (Callback cb : callbacks) cb.preStart();
+ configure(prefix);
+ startTimer();
+ monitoring = true;
+ LOG.info(prefix +" metrics system started");
+ for (Callback cb : callbacks) cb.postStart();
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (!monitoring && !DefaultMetricsSystem.inMiniClusterMode()) {
+ LOG.warn(prefix +" metrics system not yet started!",
+ new MetricsException("Illegal stop"));
+ return;
+ }
+ if (!monitoring) {
+ // in mini cluster mode
+ LOG.info(prefix +" metrics system stopped (again)");
+ return;
+ }
+ for (Callback cb : callbacks) cb.preStop();
+ LOG.info("Stopping "+ prefix +" metrics system...");
+ stopTimer();
+ stopSources();
+ stopSinks();
+ clearConfigs();
+ monitoring = false;
+ LOG.info(prefix +" metrics system stopped.");
+ for (Callback cb : callbacks) cb.postStop();
+ }
+
+ @Override public synchronized <T>
+ T register(String name, String desc, T source) {
+ MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(source);
+ final MetricsSource s = sb.build();
+ MetricsInfo si = sb.info();
+ String name2 = name == null ? si.name() : name;
+ final String finalDesc = desc == null ? si.description() : desc;
+ final String finalName = // be friendly to non-metrics tests
+ DefaultMetricsSystem.sourceName(name2, !monitoring);
+ allSources.put(finalName, s);
+ LOG.debug(finalName +", "+ finalDesc);
+ if (monitoring) {
+ registerSource(finalName, finalDesc, s);
+ }
+ // We want to re-register the source to pick up new config when the
+ // metrics system restarts.
+ register(new AbstractCallback() {
+ @Override public void postStart() {
+ registerSource(finalName, finalDesc, s);
+ }
+ });
+ return source;
+ }
+
+ synchronized
+ void registerSource(String name, String desc, MetricsSource source) {
+ checkNotNull(config, "config");
+ MetricsConfig conf = sourceConfigs.get(name);
+ MetricsSourceAdapter sa = conf != null
+ ? new MetricsSourceAdapter(prefix, name, desc, source,
+ injectedTags, period, conf)
+ : new MetricsSourceAdapter(prefix, name, desc, source,
+ injectedTags, period, config.subset(SOURCE_KEY));
+ sources.put(name, sa);
+ sa.start();
+ LOG.info("Registered source "+ name);
+ }
+
+ @Override public synchronized <T extends MetricsSink>
+ T register(final String name, final String description, final T sink) {
+ LOG.debug(name +", "+ description);
+ if (allSinks.containsKey(name)) {
+ LOG.warn("Sink "+ name +" already exists!");
+ return sink;
+ }
+ allSinks.put(name, sink);
+ if (config != null) {
+ registerSink(name, description, sink);
+ }
+ // We want to re-register the sink to pick up new config
+ // when the metrics system restarts.
+ register(new AbstractCallback() {
+ @Override public void postStart() {
+ register(name, description, sink);
+ }
+ });
+ return sink;
+ }
+
+ synchronized void registerSink(String name, String desc, MetricsSink sink) {
+ checkNotNull(config, "config");
+ MetricsConfig conf = sinkConfigs.get(name);
+ MetricsSinkAdapter sa = conf != null
+ ? newSink(name, desc, sink, conf)
+ : newSink(name, desc, sink, config.subset(SINK_KEY));
+ sinks.put(name, sa);
+ sa.start();
+ LOG.info("Registered sink "+ name);
+ }
+
+ @Override
+ public synchronized void register(final Callback callback) {
+ callbacks.add((Callback) Proxy.newProxyInstance(
+ callback.getClass().getClassLoader(), new Class<?>[] { Callback.class },
+ new InvocationHandler() {
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ try {
+ return method.invoke(callback, args);
+ }
+ catch (Exception e) {
+ // These are not considered fatal.
+ LOG.warn("Caught exception in callback "+ method.getName(), e);
+ }
+ return null;
+ }
+ }));
+ }
+
+ @Override
+ public synchronized void startMetricsMBeans() {
+ for (MetricsSourceAdapter sa : sources.values()) {
+ sa.startMBeans();
+ }
+ }
+
+ @Override
+ public synchronized void stopMetricsMBeans() {
+ for (MetricsSourceAdapter sa : sources.values()) {
+ sa.stopMBeans();
+ }
+ }
+
+ @Override
+ public synchronized String currentConfig() {
+ PropertiesConfiguration saver = new PropertiesConfiguration();
+ StringWriter writer = new StringWriter();
+ saver.copy(config);
+ try { saver.save(writer); }
+ catch (Exception e) {
+ throw new MetricsConfigException("Error stringify config", e);
+ }
+ return writer.toString();
+ }
+
+ private synchronized void startTimer() {
+ if (timer != null) {
+ LOG.warn(prefix +" metrics system timer already started!");
+ return;
+ }
+ logicalTime = 0;
+ long millis = period * 1000;
+ timer = new Timer("Timer for '"+ prefix +"' metrics system", true);
+ timer.scheduleAtFixedRate(new TimerTask() {
+ public void run() {
+ try {
+ onTimerEvent();
+ }
+ catch (Exception e) {
+ LOG.warn(e);
+ }
+ }
+ }, millis, millis);
+ LOG.info("Scheduled snapshot period at "+ period +" second(s).");
+ }
+
+ synchronized void onTimerEvent() {
+ logicalTime += period;
+ if (sinks.size() > 0) {
+ publishMetrics(sampleMetrics());
+ }
+ }
+
+ /**
+ * Sample all the sources for a snapshot of metrics/tags
+ * @return the metrics buffer containing the snapshot
+ */
+ synchronized MetricsBuffer sampleMetrics() {
+ collector.clear();
+ MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder();
+
+ for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
+ if (sourceFilter == null || sourceFilter.accepts(entry.getKey())) {
+ snapshotMetrics(entry.getValue(), bufferBuilder);
+ }
+ }
+ if (publishSelfMetrics) {
+ snapshotMetrics(sysSource, bufferBuilder);
+ }
+ MetricsBuffer buffer = bufferBuilder.get();
+ return buffer;
+ }
+
+ private void snapshotMetrics(MetricsSourceAdapter sa,
+ MetricsBufferBuilder bufferBuilder) {
+ long startTime = System.currentTimeMillis();
+ bufferBuilder.add(sa.name(), sa.getMetrics(collector, false));
+ collector.clear();
+ snapshotStat.add(System.currentTimeMillis() - startTime);
+ LOG.debug("Snapshotted source "+ sa.name());
+ }
+
+ /**
+ * Publish a metrics snapshot to all the sinks
+ * @param buffer the metrics snapshot to publish
+ */
+ synchronized void publishMetrics(MetricsBuffer buffer) {
+ int dropped = 0;
+ for (MetricsSinkAdapter sa : sinks.values()) {
+ long startTime = System.currentTimeMillis();
+ dropped += sa.putMetrics(buffer, logicalTime) ? 0 : 1;
+ publishStat.add(System.currentTimeMillis() - startTime);
+ }
+ droppedPubAll.incr(dropped);
+ }
+
+ private synchronized void stopTimer() {
+ if (timer == null) {
+ LOG.warn(prefix +" metrics system timer already stopped!");
+ return;
+ }
+ timer.cancel();
+ timer = null;
+ }
+
+ private synchronized void stopSources() {
+ for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
+ MetricsSourceAdapter sa = entry.getValue();
+ LOG.info("Stopping metrics source "+ entry.getKey());
+ LOG.debug(sa.source().getClass());
+ sa.stop();
+ }
+ sysSource.stop();
+ sources.clear();
+ }
+
+ private synchronized void stopSinks() {
+ for (Entry<String, MetricsSinkAdapter> entry : sinks.entrySet()) {
+ MetricsSinkAdapter sa = entry.getValue();
+ LOG.info("Stopping metrics sink "+ entry.getKey());
+ LOG.debug(sa.sink().getClass());
+ sa.stop();
+ }
+ sinks.clear();
+ }
+
+ private synchronized void configure(String prefix) {
+ config = MetricsConfig.create(prefix);
+ configureSinks();
+ configureSources();
+ configureSystem();
+ }
+
+ private synchronized void configureSystem() {
+ injectedTags.add(Interns.tag(MsInfo.Hostname, getHostname()));
+ }
+
+ private synchronized void configureSinks() {
+ sinkConfigs = config.getInstanceConfigs(SINK_KEY);
+ int confPeriod = 0;
+ for (Entry<String, MetricsConfig> entry : sinkConfigs.entrySet()) {
+ MetricsConfig conf = entry.getValue();
+ int sinkPeriod = conf.getInt(PERIOD_KEY, PERIOD_DEFAULT);
+ confPeriod = confPeriod == 0 ? sinkPeriod
+ : MathUtils.gcd(confPeriod, sinkPeriod);
+ String clsName = conf.getClassName("");
+ if (clsName == null) continue; // sink can be registered later on
+ String sinkName = entry.getKey();
+ try {
+ MetricsSinkAdapter sa = newSink(sinkName,
+ conf.getString(DESC_KEY, sinkName), conf);
+ sa.start();
+ sinks.put(sinkName, sa);
+ }
+ catch (Exception e) {
+ LOG.warn("Error creating sink '"+ sinkName +"'", e);
+ }
+ }
+ period = confPeriod > 0 ? confPeriod
+ : config.getInt(PERIOD_KEY, PERIOD_DEFAULT);
+ }
+
+ static MetricsSinkAdapter newSink(String name, String desc, MetricsSink sink,
+ MetricsConfig conf) {
+ return new MetricsSinkAdapter(name, desc, sink, conf.getString(CONTEXT_KEY),
+ conf.getFilter(SOURCE_FILTER_KEY),
+ conf.getFilter(RECORD_FILTER_KEY),
+ conf.getFilter(METRIC_FILTER_KEY),
+ conf.getInt(PERIOD_KEY, PERIOD_DEFAULT),
+ conf.getInt(QUEUE_CAPACITY_KEY, QUEUE_CAPACITY_DEFAULT),
+ conf.getInt(RETRY_DELAY_KEY, RETRY_DELAY_DEFAULT),
+ conf.getFloat(RETRY_BACKOFF_KEY, RETRY_BACKOFF_DEFAULT),
+ conf.getInt(RETRY_COUNT_KEY, RETRY_COUNT_DEFAULT));
+ }
+
+ static MetricsSinkAdapter newSink(String name, String desc,
+ MetricsConfig conf) {
+ return newSink(name, desc, (MetricsSink) conf.getPlugin(""), conf);
+ }
+
+ private void configureSources() {
+ sourceFilter = config.getFilter(PREFIX_DEFAULT + SOURCE_FILTER_KEY);
+ sourceConfigs = config.getInstanceConfigs(SOURCE_KEY);
+ registerSystemSource();
+ }
+
+ private void clearConfigs() {
+ sinkConfigs.clear();
+ sourceConfigs.clear();
+ injectedTags.clear();
+ config = null;
+ }
+
+ static String getHostname() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ }
+ catch (Exception e) {
+ LOG.error("Error getting localhost name. Using 'localhost'...", e);
+ }
+ return "localhost";
+ }
+
+ private void registerSystemSource() {
+ MetricsConfig sysConf = sourceConfigs.get(MS_NAME);
+ sysSource = new MetricsSourceAdapter(prefix, MS_STATS_NAME, MS_STATS_DESC,
+ MetricsAnnotations.makeSource(this), injectedTags, period,
+ sysConf == null ? config.subset(SOURCE_KEY) : sysConf);
+ sysSource.start();
+ }
+
+ @Override
+ public synchronized void getMetrics(MetricsCollector builder, boolean all) {
+ MetricsRecordBuilder rb = builder.addRecord(MS_NAME)
+ .addGauge(MsInfo.NumActiveSources, sources.size())
+ .addGauge(MsInfo.NumAllSources, allSources.size())
+ .addGauge(MsInfo.NumActiveSinks, sinks.size())
+ .addGauge(MsInfo.NumAllSinks, allSinks.size());
+
+ for (MetricsSinkAdapter sa : sinks.values()) {
+ sa.snapshot(rb, all);
+ }
+ registry.snapshot(rb, all);
+ }
+
+ private void initSystemMBean() {
+ checkNotNull(prefix, "prefix should not be null here!");
+ if (mbeanName == null) {
+ mbeanName = MBeans.register(prefix, MS_CONTROL_NAME, this);
+ }
+ }
+
+ @Override
+ public synchronized boolean shutdown() {
+ LOG.debug("refCount="+ refCount);
+ if (refCount <= 0) LOG.debug("Redundant shutdown", new Throwable());
+ if (--refCount > 0) return false;
+ if (monitoring) {
+ try { stop(); }
+ catch (Exception e) {
+ LOG.warn("Error stopping the metrics system", e);
+ }
+ }
+ allSources.clear();
+ allSinks.clear();
+ callbacks.clear();
+ if (mbeanName != null) {
+ MBeans.unregister(mbeanName);
+ mbeanName = null;
+ }
+ LOG.info(prefix +" metrics system shutdown complete.");
+ return true;
+ }
+
+ public MetricsSource getSource(String name) {
+ return allSources.get(name);
+ }
+
+ private InitMode initMode() {
+ LOG.debug("from system property: "+ System.getProperty(MS_INIT_MODE_KEY));
+ LOG.debug("from environment variable: "+ System.getenv(MS_INIT_MODE_KEY));
+ String m = System.getProperty(MS_INIT_MODE_KEY);
+ String m2 = m == null ? System.getenv(MS_INIT_MODE_KEY) : m;
+ return InitMode.valueOf((m2 == null ? InitMode.NORMAL.name() : m2)
+ .toUpperCase(Locale.US));
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MsInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MsInfo.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MsInfo.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/MsInfo.java Fri May 6 07:28:43 2011
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import com.google.common.base.Objects;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsInfo;
+
+/**
+ * Metrics system related metrics info instances
+ */
+@InterfaceAudience.Private
+public enum MsInfo implements MetricsInfo {
+ NumActiveSources("Number of active metrics sources"),
+ NumAllSources("Number of all registered metrics sources"),
+ NumActiveSinks("Number of active metrics sinks"),
+ NumAllSinks("Number of all registered metrics sinks"),
+ Context("Metrics context"),
+ Hostname("Local hostname"),
+ SessionId("Session ID"),
+ ProcessName("Process name");
+
+ private final String desc;
+
+ MsInfo(String desc) {
+ this.desc = desc;
+ }
+
+ @Override public String description() {
+ return desc;
+ }
+
+ @Override public String toString() {
+ return Objects.toStringHelper(this)
+ .add("name", name()).add("description", desc)
+ .toString();
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/SinkQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/SinkQueue.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/SinkQueue.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/SinkQueue.java Fri May 6 07:28:43 2011
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.ConcurrentModificationException;
+
+/**
+ * A half-blocking (nonblocking for producers, blocking for consumers) queue
+ * for metrics sinks.
+ *
+ * New elements are dropped when the queue is full to preserve "interesting"
+ * elements at the onset of queue filling events
+ */
+class SinkQueue<T> {
+
+ interface Consumer<T> {
+ void consume(T object) throws InterruptedException;
+ }
+
+ // A fixed size circular buffer to minimize garbage
+ private final T[] data;
+ private int head; // head position
+ private int tail; // tail position
+ private int size; // number of elements
+ private Thread currentConsumer = null;
+
+ @SuppressWarnings("unchecked")
+ SinkQueue(int capacity) {
+ this.data = (T[]) new Object[Math.max(1, capacity)];
+ head = tail = size = 0;
+ }
+
+ synchronized boolean enqueue(T e) {
+ if (data.length == size) {
+ return false;
+ }
+ ++size;
+ tail = (tail + 1) % data.length;
+ data[tail] = e;
+ notify();
+ return true;
+ }
+
+ /**
+ * Consume one element, will block if queue is empty
+ * Only one consumer at a time is allowed
+ * @param consumer the consumer callback object
+ */
+ void consume(Consumer<T> consumer) throws InterruptedException {
+ T e = waitForData();
+
+ try {
+ consumer.consume(e); // can take forever
+ _dequeue();
+ }
+ finally {
+ clearConsumerLock();
+ }
+ }
+
+ /**
+ * Consume all the elements, will block if queue is empty
+ * @param consumer the consumer callback object
+ * @throws InterruptedException
+ */
+ void consumeAll(Consumer<T> consumer) throws InterruptedException {
+ waitForData();
+
+ try {
+ for (int i = size(); i-- > 0; ) {
+ consumer.consume(front()); // can take forever
+ _dequeue();
+ }
+ }
+ finally {
+ clearConsumerLock();
+ }
+ }
+
+ /**
+ * Dequeue one element from head of the queue, will block if queue is empty
+ * @return the first element
+ * @throws InterruptedException
+ */
+ synchronized T dequeue() throws InterruptedException {
+ checkConsumer();
+
+ while (0 == size) {
+ wait();
+ }
+ return _dequeue();
+ }
+
+ private synchronized T waitForData() throws InterruptedException {
+ checkConsumer();
+
+ while (0 == size) {
+ wait();
+ }
+ setConsumerLock();
+ return front();
+ }
+
+ private synchronized void checkConsumer() {
+ if (currentConsumer != null) {
+ throw new ConcurrentModificationException("The "+
+ currentConsumer.getName() +" thread is consuming the queue.");
+ }
+ }
+
+ private synchronized void setConsumerLock() {
+ currentConsumer = Thread.currentThread();
+ }
+
+ private synchronized void clearConsumerLock() {
+ currentConsumer = null;
+ }
+
+ private synchronized T _dequeue() {
+ if (0 == size) {
+ throw new IllegalStateException("Size must > 0 here.");
+ }
+ --size;
+ head = (head + 1) % data.length;
+ T ret = data[head];
+ data[head] = null; // hint to gc
+ return ret;
+ }
+
+ synchronized T front() {
+ return data[(head + 1) % data.length];
+ }
+
+ synchronized T back() {
+ return data[tail];
+ }
+
+ synchronized void clear() {
+ checkConsumer();
+
+ for (int i = data.length; i-- > 0; ) {
+ data[i] = null;
+ }
+ size = 0;
+ }
+
+ synchronized int size() {
+ return size;
+ }
+
+ int capacity() {
+ return data.length;
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/package-info.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/package-info.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/impl/package-info.java Fri May 6 07:28:43 2011
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * A metrics system implementation
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsFactory.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsFactory.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsFactory.java Fri May 6 07:28:43 2011
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.lib;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsException;
+
+/**
+ * Experimental interface to extend metrics dynamically
+ */
+@InterfaceAudience.Private
+public enum DefaultMetricsFactory {
+ INSTANCE; // the singleton
+
+ private MutableMetricsFactory mmfImpl;
+
+ public static MutableMetricsFactory getAnnotatedMetricsFactory() {
+ return INSTANCE.getInstance(MutableMetricsFactory.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized <T> T getInstance(Class<T> cls) {
+ if (cls == MutableMetricsFactory.class) {
+ if (mmfImpl == null) {
+ mmfImpl = new MutableMetricsFactory();
+ }
+ return (T) mmfImpl;
+ }
+ throw new MetricsException("Unknown metrics factory type: "+ cls.getName());
+ }
+
+ public synchronized void setInstance(MutableMetricsFactory factory) {
+ mmfImpl = factory;
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java?rev=1100113&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystem.java Fri May 6 07:28:43 2011
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.lib;
+
+import javax.management.ObjectName;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+
+/**
+ * The default metrics system singleton
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public enum DefaultMetricsSystem {
+ INSTANCE; // the singleton
+
+ private MetricsSystem impl = new MetricsSystemImpl();
+ volatile boolean miniClusterMode = false;
+ final UniqueNames mBeanNames = new UniqueNames();
+ final UniqueNames sourceNames = new UniqueNames();
+
+ /**
+ * Convenience method to initialize the metrics system
+ * @param prefix for the metrics system configuration
+ * @return the metrics system instance
+ */
+ public static MetricsSystem initialize(String prefix) {
+ return INSTANCE.init(prefix);
+ }
+
+ synchronized MetricsSystem init(String prefix) {
+ return impl.init(prefix);
+ }
+
+ /**
+ * @return the metrics system object
+ */
+ public static MetricsSystem instance() {
+ return INSTANCE.getImpl();
+ }
+
+ /**
+ * Shutdown the metrics system
+ */
+ public static void shutdown() {
+ INSTANCE.shutdownInstance();
+ }
+
+ synchronized void shutdownInstance() {
+ if (impl.shutdown()) {
+ mBeanNames.map.clear();
+ sourceNames.map.clear();
+ }
+ }
+
+ @InterfaceAudience.Private
+ public static MetricsSystem setInstance(MetricsSystem ms) {
+ return INSTANCE.setImpl(ms);
+ }
+
+ synchronized MetricsSystem setImpl(MetricsSystem ms) {
+ MetricsSystem old = impl;
+ impl = ms;
+ return old;
+ }
+
+ synchronized MetricsSystem getImpl() { return impl; }
+
+ @InterfaceAudience.Private
+ public static void setMiniClusterMode(boolean choice) {
+ INSTANCE.miniClusterMode = choice;
+ }
+
+ @InterfaceAudience.Private
+ public static boolean inMiniClusterMode() {
+ return INSTANCE.miniClusterMode;
+ }
+
+ @InterfaceAudience.Private
+ public static ObjectName newMBeanName(String name) {
+ return INSTANCE.newObjectName(name);
+ }
+
+ @InterfaceAudience.Private
+ public static String sourceName(String name, boolean dupOK) {
+ return INSTANCE.newSourceName(name, dupOK);
+ }
+
+ synchronized ObjectName newObjectName(String name) {
+ try {
+ if (mBeanNames.map.containsKey(name) && !miniClusterMode) {
+ throw new MetricsException(name +" already exists!");
+ }
+ return new ObjectName(mBeanNames.uniqueName(name));
+ } catch (Exception e) {
+ throw new MetricsException(e);
+ }
+ }
+
+ synchronized String newSourceName(String name, boolean dupOK) {
+ if (sourceNames.map.containsKey(name)) {
+ if (dupOK) {
+ return name;
+ }
+ throw new MetricsException("Metrics source "+ name +" already exists!");
+ }
+ return sourceNames.uniqueName(name);
+ }
+}