You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:34:00 UTC
svn commit: r1077597 [2/6] - in
/hadoop/common/branches/branch-0.20-security-patches: ./ conf/ ivy/
src/core/org/apache/hadoop/ipc/ src/core/org/apache/hadoop/ipc/metrics/
src/core/org/apache/hadoop/log/ src/core/org/apache/hadoop/metrics/
src/core/org...
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/Consumer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/Consumer.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/Consumer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/Consumer.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+/**
+ * A simple generic consumer interface
+ */
+interface Consumer<T> {
+ void consume(T object) throws InterruptedException;
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MBeanInfoBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MBeanInfoBuilder.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MBeanInfoBuilder.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MBeanInfoBuilder.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricCounter;
+import org.apache.hadoop.metrics2.MetricGauge;
+import org.apache.hadoop.metrics2.MetricsTag;
+
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+/**
+ * Helper class to build MBeanInfo from metrics records
+ */
+class MBeanInfoBuilder implements MetricsVisitor {
+
+ private final String name, description;
+ private List<MBeanAttributeInfo> attrs;
+ private Iterable<MetricsRecordImpl> recs;
+ private int curRecNo;
+
+ MBeanInfoBuilder(String name, String desc) {
+ this.name = name;
+ description = desc;
+ attrs = new ArrayList<MBeanAttributeInfo>();
+ }
+
+ MBeanInfoBuilder reset(Iterable<MetricsRecordImpl> recs) {
+ this.recs = recs;
+ attrs.clear();
+ return this;
+ }
+
+ MBeanAttributeInfo newAttrInfo(String name, String desc, String type) {
+ return new MBeanAttributeInfo(getAttrName(name), type, desc,
+ true, false, false); // read-only, non-is
+ }
+
+ MBeanAttributeInfo newAttrInfo(Metric m, String type) {
+ return newAttrInfo(m.name(), m.description(), type);
+ }
+
+ public void gauge(MetricGauge<Integer> metric, int value) {
+ attrs.add(newAttrInfo(metric, "java.lang.Integer"));
+ }
+
+ public void gauge(MetricGauge<Long> metric, long value) {
+ attrs.add(newAttrInfo(metric, "java.lang.Long"));
+ }
+
+ public void gauge(MetricGauge<Float> metric, float value) {
+ attrs.add(newAttrInfo(metric, "java.lang.Float"));
+ }
+
+ public void gauge(MetricGauge<Double> metric, double value) {
+ attrs.add(newAttrInfo(metric, "java.lang.Double"));
+ }
+
+ public void counter(MetricCounter<Integer> metric, int value) {
+ attrs.add(newAttrInfo(metric, "java.lang.Integer"));
+ }
+
+ public void counter(MetricCounter<Long> metric, long value) {
+ attrs.add(newAttrInfo(metric, "java.lang.Long"));
+ }
+
+ String getAttrName(String name) {
+ return curRecNo > 0 ? name +"."+ curRecNo : name;
+ }
+
+ MBeanInfo get() {
+ curRecNo = 0;
+ for (MetricsRecordImpl rec : recs) {
+ for (MetricsTag t : rec.tags()) {
+ attrs.add(newAttrInfo("tag."+ t.name(), t.description(),
+ "java.lang.String"));
+ }
+ for (Metric m : rec.metrics()) {
+ m.visit(this);
+ }
+ ++curRecNo;
+ }
+ MBeanAttributeInfo[] attrsArray = new MBeanAttributeInfo[attrs.size()];
+ return new MBeanInfo(name, description, attrs.toArray(attrsArray),
+ null, null, null); // no ops/ctors/notifications
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterInt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterInt.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterInt.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterInt.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricCounter;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+class MetricCounterInt extends MetricCounter<Integer> {
+
+ final int value;
+
+ MetricCounterInt(String name, String description, int value) {
+ super(name, description);
+ this.value = value;
+ }
+
+ public Integer value() {
+ return value;
+ }
+
+ public void visit(MetricsVisitor visitor) {
+ visitor.counter(this, value);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterLong.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterLong.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterLong.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricCounterLong.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricCounter;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+class MetricCounterLong extends MetricCounter<Long> {
+
+ final long value;
+
+ MetricCounterLong(String name, String description, long value) {
+ super(name, description);
+ this.value = value;
+ }
+
+ public Long value() {
+ return value;
+ }
+
+ public void visit(MetricsVisitor visitor) {
+ visitor.counter(this, value);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeDouble.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeDouble.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeDouble.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeDouble.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricGauge;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+class MetricGaugeDouble extends MetricGauge<Double> {
+
+ final double value;
+
+ MetricGaugeDouble(String name, String description, double value) {
+ super(name, description);
+ this.value = value;
+ }
+
+ public Double value() {
+ return value;
+ }
+
+ public void visit(MetricsVisitor visitor) {
+ visitor.gauge(this, value);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeFloat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeFloat.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeFloat.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeFloat.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricGauge;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+class MetricGaugeFloat extends MetricGauge<Float> {
+
+ final float value;
+
+ MetricGaugeFloat(String name, String description, float value) {
+ super(name, description);
+ this.value = value;
+ }
+
+ public Float value() {
+ return value;
+ }
+
+ public void visit(MetricsVisitor visitor) {
+ visitor.gauge(this, value);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeInt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeInt.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeInt.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeInt.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricGauge;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+class MetricGaugeInt extends MetricGauge<Integer> {
+
+ final int value;
+
+ MetricGaugeInt(String name, String description, int value) {
+ super(name, description);
+ this.value = value;
+ }
+
+ public Integer value() {
+ return value;
+ }
+
+ public void visit(MetricsVisitor visitor) {
+ visitor.gauge(this, value);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricGaugeLong.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricGauge;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+
+class MetricGaugeLong extends MetricGauge<Long> {
+
+ final long value;
+
+ MetricGaugeLong(String name, String description, long value) {
+ super(name, description);
+ this.value = value;
+ }
+
+ public Long value() {
+ return value;
+ }
+
+ public void visit(MetricsVisitor visitor) {
+ visitor.gauge(this, value);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuffer.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuffer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuffer.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.Iterator;
+
+/**
+ * An immutable element for the sink queues.
+ */
+class MetricsBuffer implements Iterable<MetricsBuffer.Entry> {
+
+ private final Iterable<Entry> mutable;
+
+ MetricsBuffer(Iterable<MetricsBuffer.Entry> mutable) {
+ this.mutable = mutable;
+ }
+
+ public Iterator<Entry> iterator() {
+ return mutable.iterator();
+ }
+
+ static class Entry {
+ private final String sourceName;
+ private final Iterable<MetricsRecordImpl> records;
+
+ Entry(String name, Iterable<MetricsRecordImpl> records) {
+ sourceName = name;
+ this.records = records;
+ }
+
+ String name() {
+ return sourceName;
+ }
+
+ Iterable<MetricsRecordImpl> records() {
+ return records;
+ }
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBufferBuilder.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.ArrayList;
+
+/**
+ * Builder for the immutable metrics buffers
+ */
+class MetricsBufferBuilder extends ArrayList<MetricsBuffer.Entry> {
+ private static final long serialVersionUID = 1L;
+
+ boolean add(String name, Iterable<MetricsRecordImpl> records) {
+ return add(new MetricsBuffer.Entry(name, records));
+ }
+
+ MetricsBuffer get() {
+ return new MetricsBuffer(this);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuilderImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuilderImpl.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuilderImpl.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsBuilderImpl.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsFilter;
+
+class MetricsBuilderImpl extends ArrayList<MetricsRecordBuilderImpl>
+ implements MetricsBuilder {
+ private static final long serialVersionUID = 1L;
+ private MetricsFilter recordFilter, metricFilter;
+
+ @Override
+ public MetricsRecordBuilderImpl addRecord(String name) {
+ boolean acceptable = recordFilter == null || recordFilter.accepts(name);
+ MetricsRecordBuilderImpl rb =
+ new MetricsRecordBuilderImpl(name, recordFilter, metricFilter,
+ acceptable);
+ if (acceptable) {
+ add(rb);
+ }
+ return rb;
+ }
+
+
+ public List<MetricsRecordImpl> getRecords() {
+ List<MetricsRecordImpl> records =
+ new ArrayList<MetricsRecordImpl>(size());
+ for (MetricsRecordBuilderImpl rb : this) {
+ MetricsRecordImpl mr = rb.getRecord();
+ if (mr != null) {
+ records.add(mr);
+ }
+ }
+ return records;
+ }
+
+ MetricsBuilderImpl setRecordFilter(MetricsFilter rf) {
+ recordFilter = rf;
+ return this;
+ }
+
+ MetricsBuilderImpl setMetricFilter(MetricsFilter mf) {
+ metricFilter = mf;
+ return this;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfig.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfig.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfig.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsPlugin;
+
+import org.apache.hadoop.util.StringUtils;
+
+class MetricsConfig extends SubsetConfiguration {
+
+ static final Log LOG = LogFactory.getLog(MetricsConfig.class);
+
+ static final String DEFAULT_FILE_NAME = "hadoop-metrics2.properties";
+ static final String PREFIX_DEFAULT = "*.";
+
+ static final String PERIOD_KEY = "period";
+ static final int PERIOD_DEFAULT = 10; // seconds
+
+ static final String QUEUE_CAPACITY_KEY = "queue.capacity";
+ static final int QUEUE_CAPACITY_DEFAULT = 1;
+
+ static final String RETRY_DELAY_KEY = "retry.delay";
+ static final int RETRY_DELAY_DEFAULT = 10; // seconds
+ static final String RETRY_BACKOFF_KEY = "retry.backoff";
+ static final int RETRY_BACKOFF_DEFAULT = 2; // back off factor
+ static final String RETRY_COUNT_KEY = "retry.count";
+ static final int RETRY_COUNT_DEFAULT = 3;
+
+ static final String JMX_CACHE_TTL_KEY = "jmx.cache.ttl";
+ static final int JMX_CACHE_TTL_DEFAULT = 10000; // millis
+
+ static final String CONTEXT_KEY = "context";
+ static final String NAME_KEY = "name";
+ static final String DESC_KEY = "description";
+ static final String SOURCE_KEY = "source";
+ static final String SINK_KEY = "sink";
+ static final String METRIC_FILTER_KEY = "metric.filter";
+ static final String RECORD_FILTER_KEY = "record.filter";
+ static final String SOURCE_FILTER_KEY = "source.filter";
+
+ static final Pattern INSTANCE_REGEX = Pattern.compile("([^.*]+)\\..+");
+
+ MetricsConfig(Configuration c, String prefix) {
+ super(c, prefix.toLowerCase(Locale.US), ".");
+ }
+
+ static MetricsConfig create(String prefix) {
+ return loadFirst(prefix, "hadoop-metrics2-"+ prefix.toLowerCase(Locale.US)
+ +".properties", DEFAULT_FILE_NAME);
+ }
+
+ static MetricsConfig create(String prefix, String... fileNames) {
+ return loadFirst(prefix, fileNames);
+ }
+
+ /**
+ * Load configuration from a list of files until the first successful load
+ * @param conf the configuration object
+ * @param files the list of filenames to try
+ * @return the configuration object
+ */
+ static MetricsConfig loadFirst(String prefix, String... fileNames) {
+ for (String fname : fileNames) {
+ try {
+ PropertiesConfiguration cf = new PropertiesConfiguration(fname);
+ LOG.info("loaded properties from "+ fname);
+ return new MetricsConfig(cf, prefix);
+ }
+ catch (ConfigurationException e) {
+ if (e.getMessage().startsWith("Cannot locate configuration")) {
+ continue;
+ }
+ throw new MetricsConfigException(e);
+ }
+ }
+ throw new MetricsConfigException("Cannot locate configuration: tried "+
+ StringUtils.join(", ", fileNames));
+ }
+
+ @Override
+ public MetricsConfig subset(String prefix) {
+ return new MetricsConfig(this, prefix);
+ }
+
+ /**
+ * Return sub configs for instance specified in the config.
+ * Assuming format specified as follows:<pre>
+ * [type].[instance].[option] = [value]</pre>
+ * Note, '*' is a special default instance, which is excluded in the result.
+ * @param type of the instance
+ * @return a map with [instance] as key and config object as value
+ */
+ Map<String, MetricsConfig> getInstanceConfigs(String type) {
+ HashMap<String, MetricsConfig> map = new HashMap<String, MetricsConfig>();
+ MetricsConfig sub = subset(type);
+
+ for (String key : sub.keys()) {
+ Matcher matcher = INSTANCE_REGEX.matcher(key);
+ if (matcher.matches()) {
+ String instance = matcher.group(1);
+ if (!map.containsKey(instance)) {
+ map.put(instance, sub.subset(instance));
+ }
+ }
+ }
+ return map;
+ }
+
+ Iterable<String> keys() {
+ return new Iterable<String>() {
+ @SuppressWarnings("unchecked")
+ public Iterator<String> iterator() {
+ return (Iterator<String>) getKeys();
+ }
+ };
+ }
+
+ /**
+ * Will poke parents for defaults
+ * @param key to lookup
+ * @return the value or null
+ */
+ @Override
+ public Object getProperty(String key) {
+ Object value = super.getProperty(key);
+ if (value == null) {
+ LOG.debug("poking parent "+ getParent().getClass().getSimpleName() +
+ " for "+ key);
+ return getParent().getProperty(key.startsWith(PREFIX_DEFAULT) ? key
+ : PREFIX_DEFAULT + key);
+ }
+ return value;
+ }
+
+ <T extends MetricsPlugin> T getPlugin(String name) {
+ String classKey = name.isEmpty() ? "class" : name +".class";
+ String pluginClassName = getString(classKey);
+ if (pluginClassName == null || pluginClassName.isEmpty()) {
+ return null;
+ }
+ try {
+ Class<?> pluginClass = Class.forName(pluginClassName);
+ @SuppressWarnings("unchecked")
+ T plugin = (T) pluginClass.newInstance();
+ plugin.init(name.isEmpty() ? this : subset(name));
+ return plugin;
+ }
+ catch (Exception e) {
+ throw new MetricsConfigException("Error creating plugin: "+
+ pluginClassName, e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return toString(this);
+ }
+
+ String toString(Configuration c) {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(buffer);
+ PropertiesConfiguration tmp = new PropertiesConfiguration();
+ tmp.copy(c);
+ try { tmp.save(ps); }
+ catch (Exception e) {
+ throw new MetricsConfigException(e);
+ }
+ return buffer.toString();
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfigException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfigException.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfigException.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsConfigException.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.MetricsException;
+
+/**
+ * The metrics configuration runtime exception
+ */
+public class MetricsConfigException extends MetricsException {
+ private static final long serialVersionUID = 1L;
+
+ MetricsConfigException(String message) {
+ super(message);
+ }
+
+ MetricsConfigException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ MetricsConfigException(Throwable cause) {
+ super(cause);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordBuilderImpl.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsTag;
+import static org.apache.hadoop.metrics2.lib.MetricsRegistry.*;
+
+class MetricsRecordBuilderImpl extends MetricsRecordBuilder {
+ private final long timestamp;
+ private final String name;
+ private final List<Metric> metrics;
+ private final List<MetricsTag> tags;
+ private final MetricsFilter recordFilter, metricFilter;
+ private final boolean acceptable;
+
+ MetricsRecordBuilderImpl(String name, MetricsFilter rf, MetricsFilter mf,
+ boolean acceptable) {
+ timestamp = System.currentTimeMillis();
+ this.name = name;
+ metrics = new ArrayList<Metric>();
+ tags = new ArrayList<MetricsTag>();
+ recordFilter = rf;
+ metricFilter = mf;
+ this.acceptable = acceptable;
+ }
+
+ @Override
+ public MetricsRecordBuilder tag(String name, String description,
+ String value) {
+ if (acceptable) {
+ tags.add(new MetricsTag(name, description, value));
+ }
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(String name, String description,
+ int value) {
+ if (acceptable && (metricFilter == null || metricFilter.accepts(name))) {
+ metrics.add(new MetricCounterInt(name, description, value));
+ }
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addCounter(String name, String description,
+ long value) {
+ if (acceptable && (metricFilter == null || metricFilter.accepts(name))) {
+ metrics.add(new MetricCounterLong(name, description, value));
+ }
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(String name, String description,
+ int value) {
+ if (acceptable && (metricFilter == null || metricFilter.accepts(name))) {
+ metrics.add(new MetricGaugeInt(name, description, value));
+ }
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(String name, String description,
+ long value) {
+ if (acceptable && (metricFilter == null || metricFilter.accepts(name))) {
+ metrics.add(new MetricGaugeLong(name, description, value));
+ }
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(String name, String description,
+ float value) {
+ if (acceptable && (metricFilter == null || metricFilter.accepts(name))) {
+ metrics.add(new MetricGaugeFloat(name, description, value));
+ }
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder addGauge(String name, String description,
+ double value) {
+ if (acceptable && (metricFilter == null || metricFilter.accepts(name))) {
+ metrics.add(new MetricGaugeDouble(name, description, value));
+ }
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder add(MetricsTag tag) {
+ tags.add(tag);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder add(Metric metric) {
+ metrics.add(metric);
+ return this;
+ }
+
+ @Override
+ public MetricsRecordBuilder setContext(String value) {
+ return tag(CONTEXT_KEY, CONTEXT_DESC, value);
+ }
+
+ public MetricsRecordImpl getRecord() {
+ if (acceptable && (recordFilter == null || recordFilter.accepts(tags))) {
+ return new MetricsRecordImpl(name, timestamp, tags(), metrics());
+ }
+ return null;
+ }
+
+ List<MetricsTag> tags() {
+ return Collections.unmodifiableList(tags);
+ }
+
+ List<Metric> metrics() {
+ return Collections.unmodifiableList(metrics);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordFiltered.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.Iterator;
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.util.TryIterator;
+
+class MetricsRecordFiltered implements MetricsRecord {
+
+ private final MetricsRecord delegate;
+ private final MetricsFilter filter;
+
+ MetricsRecordFiltered(MetricsRecord delegate, MetricsFilter filter) {
+ this.delegate = delegate;
+ this.filter = filter;
+ }
+
+ public long timestamp() {
+ return delegate.timestamp();
+ }
+
+ public String name() {
+ return delegate.name();
+ }
+
+ public String context() {
+ return delegate.context();
+ }
+
+ public Iterable<MetricsTag> tags() {
+ return delegate.tags();
+ }
+
+ public Iterable<Metric> metrics() {
+ return new Iterable<Metric>() {
+ final Iterator<Metric> it = delegate.metrics().iterator();
+ public Iterator<Metric> iterator() {
+ return new TryIterator<Metric>() {
+ public Metric tryNext() {
+ if (it.hasNext()) do {
+ Metric next = it.next();
+ if (filter.accepts(next.name())) {
+ return next;
+ }
+ } while (it.hasNext());
+ return done();
+ }
+ };
+ }
+ };
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsRecordImpl.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.util.Contracts;
+
+public class MetricsRecordImpl implements MetricsRecord {
+
+ protected static final String CONTEXT_KEY = "context";
+ protected static final String DEFAULT_CONTEXT = "default";
+
+ private final long timestamp;
+ private final String name;
+ private final Iterable<MetricsTag> tags;
+ private final Iterable<Metric> metrics;
+
+ /**
+ * Construct a metrics record
+ * @param name of the record
+ * @param timestamp of the record
+ * @param tags of the record
+ * @param metrics of the record
+ */
+ public MetricsRecordImpl(String name, long timestamp,
+ Iterable<MetricsTag> tags,
+ Iterable<Metric> metrics) {
+ this.timestamp = Contracts.checkArg(timestamp, timestamp > 0, "timestamp");
+ this.name = Contracts.checkNotNull(name, "name");
+ this.tags = Contracts.checkNotNull(tags, "tags");
+ this.metrics = Contracts.checkNotNull(metrics, "metrics");
+ }
+
+ public long timestamp() {
+ return timestamp;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public String context() {
+ // usually the first tag
+ for (MetricsTag t : tags) {
+ if (t.name().equals(CONTEXT_KEY)) {
+ return String.valueOf(t.value());
+ }
+ }
+ return DEFAULT_CONTEXT;
+ }
+
+ public Iterable<MetricsTag> tags() {
+ return tags;
+ }
+
+ public Iterable<Metric> metrics() {
+ return metrics;
+ }
+
+ // Mostly for testing
+ @Override public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final MetricsRecordImpl other = (MetricsRecordImpl) obj;
+ if (this.timestamp != other.timestamp()) {
+ return false;
+ }
+ if (!this.name.equals(other.name())) {
+ return false;
+ }
+ if (!this.tags.equals(other.tags())) {
+ return false;
+ }
+ if (!this.metrics.equals(other.metrics())) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override public int hashCode() {
+ return name.hashCode();
+ }
+
+ @Override public String toString() {
+ return "MetricsRecordImpl{" + "timestamp=" + timestamp + " name='" + name +
+ "' tags=" + tags + " metrics=" + metrics + "}\n";
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import org.apache.hadoop.metrics2.lib.MetricMutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MetricMutableStat;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.util.Contracts;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsSink;
+
+/**
+ * An adapter class for metrics sink and associated filters
+ */
+class MetricsSinkAdapter {
+
+ private final Log LOG = LogFactory.getLog(MetricsSinkAdapter.class);
+ private final String name, description, context;
+ private final MetricsSink sink;
+ private final MetricsFilter sourceFilter, recordFilter, metricFilter;
+ private final SinkQueue<MetricsBuffer> queue;
+ private final Thread sinkThread;
+ private volatile boolean stopping = false;
+ private volatile boolean inError = false;
+ private final int period, firstRetryDelay, retryCount;
+ private final float retryBackoff;
+ private final MetricsRegistry registry = new MetricsRegistry("sinkadapter");
+ private final MetricMutableStat latency;
+ private final MetricMutableCounterInt dropped;
+ private final MetricMutableGaugeInt qsize;
+
+ private final Consumer<MetricsBuffer> consumer =
+ new Consumer<MetricsBuffer>() {
+ public void consume(MetricsBuffer buffer) {
+ publishMetrics(buffer);
+ }
+ };
+
+ MetricsSinkAdapter(String name, String description, MetricsSink sink,
+ String context, MetricsFilter sourceFilter,
+ MetricsFilter recordFilter, MetricsFilter metricFilter,
+ int period, int queueCapacity, int retryDelay,
+ float retryBackoff, int retryCount) {
+ this.name = Contracts.checkNotNull(name, "name");
+ this.description = description;
+ this.sink = Contracts.checkNotNull(sink, "sink object");
+ this.context = context;
+ this.sourceFilter = sourceFilter;
+ this.recordFilter = recordFilter;
+ this.metricFilter = metricFilter;
+ this.period = Contracts.checkArg(period, period > 0, "period");
+ firstRetryDelay =
+ Contracts.checkArg(retryDelay, retryDelay > 0, "retry delay");
+ this.retryBackoff =
+ Contracts.checkArg(retryBackoff, retryBackoff > 1, "backoff factor");
+ this.retryCount = retryCount;
+ this.queue = new SinkQueue<MetricsBuffer>(
+ Contracts.checkArg(queueCapacity, queueCapacity > 0, "queue capacity"));
+ latency = registry.newStat(name +"_latency", "End to end latency",
+ "ops", "time");
+ dropped = registry.newCounter(name +"_dropped", "Dropped updates", 0);
+ qsize = registry.newGauge(name + "_qsize", "Queue size", 0);
+
+ sinkThread = new Thread() {
+ @Override public void run() {
+ publishMetricsFromQueue();
+ }
+ };
+ sinkThread.setName(name);
+ }
+
+ boolean putMetrics(MetricsBuffer buffer, long logicalTime) {
+ if (logicalTime % period == 0) {
+ LOG.debug("enqueue, logicalTime="+ logicalTime);
+ if (queue.enqueue(buffer)) return true;
+ dropped.incr();
+ return false;
+ }
+ return true; // OK
+ }
+
+ void publishMetricsFromQueue() {
+ int retryDelay = firstRetryDelay;
+ int n = retryCount;
+ while (!stopping) {
+ try {
+ queue.consumeAll(consumer);
+ retryDelay = firstRetryDelay;
+ n = retryCount;
+ inError = false;
+ }
+ catch (InterruptedException e) {
+ LOG.info(name +" thread interrupted.");
+ }
+ catch (Exception e) {
+ if (n > 0) {
+ if (!inError) {
+ LOG.error("Got sink exception, retry in "+ retryDelay +"s", e);
+ }
+ retryDelay *= retryBackoff;
+ try { Thread.sleep(retryDelay * 1000); }
+ catch (InterruptedException e2) {
+ LOG.info(name +" thread interrupted while waiting for retry", e2);
+ }
+ --n;
+ }
+ else {
+ if (!inError) {
+ LOG.error("Got sink exception and over retry limit!", e);
+ }
+ queue.clear();
+ inError = true; // Don't keep complaining ad infinitum
+ }
+ }
+ }
+ }
+
+ void publishMetrics(MetricsBuffer buffer) {
+ long ts = 0;
+ for (MetricsBuffer.Entry entry : buffer) {
+ if (sourceFilter == null || sourceFilter.accepts(entry.name())) {
+ for (MetricsRecordImpl record : entry.records()) {
+ if ((context == null || context.equals(record.context())) &&
+ (recordFilter == null || recordFilter.accepts(record))) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Pushing record "+ entry.name() +"."+ record.context() +
+ "."+ record.name() +" to "+ name);
+ }
+ sink.putMetrics(metricFilter == null
+ ? record
+ : new MetricsRecordFiltered(record, metricFilter));
+ if (ts == 0) ts = record.timestamp();
+ }
+ }
+ }
+ }
+ if (ts > 0) {
+ sink.flush();
+ latency.add(System.currentTimeMillis() - ts);
+ }
+ LOG.debug("Done");
+ }
+
+ void start() {
+ sinkThread.start();
+ LOG.info("Sink "+ name +" started");
+ }
+
+ void stop() {
+ stopping = true;
+ sinkThread.interrupt();
+ try {
+ sinkThread.join();
+ }
+ catch (InterruptedException e) {
+ LOG.warn("Stop interrupted", e);
+ }
+ }
+
+ String name() {
+ return name;
+ }
+
+ String description() {
+ return description;
+ }
+
+ void sample(MetricsRecordBuilder rb, boolean all) {
+ registry.snapshot(rb, all);
+ }
+
+ MetricsSink sink() {
+ return sink;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.HashMap;
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.InvalidAttributeValueException;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsTag;
+import static org.apache.hadoop.metrics2.impl.MetricsConfig.*;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.metrics2.util.Contracts;
+
+/**
+ * An adapter class for metrics source and associated filter and jmx impl
+ */
+class MetricsSourceAdapter implements DynamicMBean {
+
+ private static final Log LOG = LogFactory.getLog(MetricsSourceAdapter.class);
+
+ private final String prefix, name;
+ private final MetricsSource source;
+ private final MetricsFilter recordFilter, metricFilter;
+ private final HashMap<String, Attribute> attrCache;
+ private final MBeanInfoBuilder infoBuilder;
+ private final Iterable<MetricsTag> injectedTags;
+
+ private Iterable<MetricsRecordImpl> lastRecs;
+ private long jmxCacheTS;
+ private int jmxCacheTTL;
+ private MBeanInfo infoCache;
+ private ObjectName mbeanName;
+
+ MetricsSourceAdapter(String prefix, String name, String description,
+ MetricsSource source, Iterable<MetricsTag> injectedTags,
+ MetricsFilter recordFilter, MetricsFilter metricFilter,
+ int jmxCacheTTL) {
+ this.prefix = Contracts.checkNotNull(prefix, "prefix");
+ this.name = Contracts.checkNotNull(name, "name");
+ this.source = Contracts.checkNotNull(source, "source");
+ attrCache = new HashMap<String, Attribute>();
+ infoBuilder = new MBeanInfoBuilder(name, description);
+ this.injectedTags = injectedTags;
+ this.recordFilter = recordFilter;
+ this.metricFilter = metricFilter;
+ this.jmxCacheTTL = Contracts.checkArg(jmxCacheTTL, jmxCacheTTL > 0,
+ "jmxCacheTTL");
+ }
+
+ MetricsSourceAdapter(String prefix, String name, String description,
+ MetricsSource source, Iterable<MetricsTag> injectedTags,
+ int period, MetricsConfig conf) {
+ this(prefix, name, description, source, injectedTags,
+ (MetricsFilter) conf.getPlugin(RECORD_FILTER_KEY),
+ (MetricsFilter) conf.getPlugin(METRIC_FILTER_KEY), period);
+ }
+
+ void start() {
+ if (mbeanName != null) {
+ LOG.warn("MBean Source "+ name +" already initialized!");
+ }
+ mbeanName = MBeans.register(prefix, name, this);
+ LOG.debug("MBean for source "+ name +" registered.", new Throwable());
+ }
+
+ @Override
+ public synchronized Object getAttribute(String attribute)
+ throws AttributeNotFoundException, MBeanException, ReflectionException {
+ updateJmxCache();
+ Attribute a = attrCache.get(attribute);
+ if (a == null) {
+ throw new AttributeNotFoundException(attribute +" not found");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(attribute +": "+ a.getName() +"="+ a.getValue());
+ }
+ return a;
+ }
+
+ public void setAttribute(Attribute attribute)
+ throws AttributeNotFoundException, InvalidAttributeValueException,
+ MBeanException, ReflectionException {
+ throw new UnsupportedOperationException("Metrics are read-only.");
+ }
+
+ @Override
+ public synchronized AttributeList getAttributes(String[] attributes) {
+ updateJmxCache();
+ AttributeList ret = new AttributeList();
+ for (String key : attributes) {
+ Attribute attr = attrCache.get(key);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(key +": "+ attr.getName() +"="+ attr.getValue());
+ }
+ ret.add(attr);
+ }
+ return ret;
+ }
+
+ @Override
+ public AttributeList setAttributes(AttributeList attributes) {
+ throw new UnsupportedOperationException("Metrics are read-only.");
+ }
+
+ @Override
+ public Object invoke(String actionName, Object[] params, String[] signature)
+ throws MBeanException, ReflectionException {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public synchronized MBeanInfo getMBeanInfo() {
+ updateJmxCache();
+ return infoCache;
+ }
+
+ private void updateJmxCache() {
+ if (System.currentTimeMillis() - jmxCacheTS >= jmxCacheTTL) {
+ if (lastRecs == null) {
+ MetricsBuilderImpl builder = new MetricsBuilderImpl();
+ getMetrics(builder, true);
+ }
+ int cacheSize = attrCache.size(); // because updateAttrCache changes it!
+ int numMetrics = updateAttrCache();
+ if (cacheSize < numMetrics) {
+ updateInfoCache();
+ }
+ jmxCacheTS = System.currentTimeMillis();
+ lastRecs = null;
+ }
+ }
+
+ Iterable<MetricsRecordImpl> getMetrics(MetricsBuilderImpl builder,
+ boolean all) {
+ builder.setRecordFilter(recordFilter).setMetricFilter(metricFilter);
+ synchronized(this) {
+ if (lastRecs == null) {
+ all = true; // Get all the metrics to populate the sink caches
+ }
+ }
+ source.getMetrics(builder, all);
+ for (MetricsRecordBuilderImpl rb : builder) {
+ for (MetricsTag t : injectedTags) {
+ rb.add(t);
+ }
+ }
+ synchronized(this) {
+ lastRecs = builder.getRecords();
+ return lastRecs;
+ }
+ }
+
+ synchronized void stop() {
+ MBeans.unregister(mbeanName);
+ mbeanName = null;
+ }
+
+ synchronized void refreshMBean() {
+ MBeans.unregister(mbeanName);
+ mbeanName = MBeans.register(prefix, name, this);
+ }
+
+ private void updateInfoCache() {
+ LOG.debug("Updating info cache...");
+ infoCache = infoBuilder.reset(lastRecs).get();
+ LOG.debug("Done");
+ }
+
+ private int updateAttrCache() {
+ LOG.debug("Updating attr cache...");
+ int recNo = 0;
+ int numMetrics = 0;
+ for (MetricsRecordImpl record : lastRecs) {
+ for (MetricsTag t : record.tags()) {
+ setAttrCacheTag(t, recNo);
+ ++numMetrics;
+ }
+ for (Metric m : record.metrics()) {
+ setAttrCacheMetric(m, recNo);
+ ++numMetrics;
+ }
+ ++recNo;
+ }
+ LOG.debug("Done. numMetrics="+ numMetrics);
+ return numMetrics;
+ }
+
+ private static String tagName(String name, int recNo) {
+ StringBuilder sb = new StringBuilder(name.length() + 16);
+ sb.append("tag.").append(name);
+ if (recNo > 0) {
+ sb.append('.').append(recNo);
+ }
+ return sb.toString();
+ }
+
+ private void setAttrCacheTag(MetricsTag tag, int recNo) {
+ String key = tagName(tag.name(), recNo);
+ attrCache.put(key, new Attribute(key, tag.value()));
+ }
+
+ private static String metricName(String name, int recNo) {
+ if (recNo == 0) {
+ return name;
+ }
+ StringBuilder sb = new StringBuilder(name.length() + 12);
+ sb.append(name);
+ if (recNo > 0) {
+ sb.append('.').append(recNo);
+ }
+ return sb.toString();
+ }
+
+ private void setAttrCacheMetric(Metric metric, int recNo) {
+ String key = metricName(metric.name(), recNo);
+ attrCache.put(key, new Attribute(key, metric.value()));
+ }
+
+ String name() {
+ return name;
+ }
+
+ MetricsSource source() {
+ return source;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.io.StringWriter;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import javax.management.ObjectName;
+
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math.util.MathUtils;
+
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsFilter;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MetricMutableStat;
+import static org.apache.hadoop.metrics2.impl.MetricsConfig.*;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.metrics2.util.Contracts;
+
+/**
+ * A base class for metrics system singletons
+ */
+public class MetricsSystemImpl implements MetricsSystem {
+
+ private static final Log LOG = LogFactory.getLog(MetricsSystemImpl.class);
+ static final String MS_CONTEXT = "metricssystem";
+ static final String NUM_SOURCES_KEY = "num_sources";
+ static final String NUM_SOURCES_DESC = "Number of metrics sources";
+ static final String NUM_SINKS_KEY = "num_sinks";
+ static final String NUM_SINKS_DESC = "Number of metrics sinks";
+ static final String MS_NAME = "MetricsSystem";
+ static final String MS_STATS_NAME = MS_NAME +",sub=Stats";
+ static final String MS_STATS_DESC = "Metrics system metrics";
+ static final String MS_CONTROL_NAME = MS_NAME +",sub=Control";
+
+ private final Map<String, MetricsSourceAdapter> sources;
+ private final Map<String, MetricsSinkAdapter> sinks;
+ private final List<Callback> callbacks;
+ private final MetricsBuilderImpl metricsBuilder;
+ private final MetricMutableStat sampleStat =
+ new MetricMutableStat("sample", "sampling stats", "ops", "time", true);
+ private final MetricMutableStat publishStat =
+ new MetricMutableStat("publish", "publishing stats", "ops", "time", true);
+ private final MetricMutableCounterLong dropStat =
+ new MetricMutableCounterLong("dropped_pub_all",
+ "number of dropped updates by all sinks", 0L);
+ private final List<MetricsTag> injectedTags;
+
+ // Things that are changed by init()/start()/stop()
+ private String prefix;
+ private MetricsFilter sourceFilter;
+ private MetricsConfig config;
+ private Map<String, MetricsConfig> sourceConfigs, sinkConfigs;
+ private boolean monitoring = false;
+ private Timer timer;
+ private int period; // seconds
+ private long logicalTime; // number of timer invocations * period
+ private ObjectName mbeanName;
+ private boolean publishSelfMetrics = true;
+ private MetricsSourceAdapter sysSource;
+
+ /**
+ * Construct the metrics system
+ * @param prefix for the system
+ */
+ public MetricsSystemImpl(String prefix) {
+ this.prefix = prefix;
+ sources = new LinkedHashMap<String, MetricsSourceAdapter>();
+ sinks = new LinkedHashMap<String, MetricsSinkAdapter>();
+ sourceConfigs = new HashMap<String, MetricsConfig>();
+ sinkConfigs = new HashMap<String, MetricsConfig>();
+ callbacks = new ArrayList<Callback>();
+ injectedTags = new ArrayList<MetricsTag>();
+ metricsBuilder = new MetricsBuilderImpl();
+ if (prefix != null) {
+ // prefix could be null for default ctor, which requires init later
+ initSystemMBean();
+ }
+ }
+
+ /**
+ * Construct the system but not initializing (read config etc.) it.
+ */
+ public MetricsSystemImpl() {
+ this(null);
+ }
+
+ /**
+ * Initialized the metrics system with a prefix.
+ * @param prefix the system will look for configs with the prefix
+ */
+ public synchronized void init(String prefix) {
+ if (monitoring) {
+ LOG.warn(this.prefix +" metrics system already initialized!");
+ return;
+ }
+ Contracts.checkState(this.prefix == null, "prefix should be null so far.");
+ this.prefix = Contracts.checkNotNull(prefix, "prefix");
+ try { start(); }
+ catch (MetricsConfigException e) {
+ // Usually because hadoop-metrics2.properties is missing
+ // We can always start the metrics system later via JMX.
+ LOG.warn("Metrics system not started!", e);
+ }
+ initSystemMBean();
+ }
+
+ @Override
+ public synchronized void start() {
+ Contracts.checkNotNull(prefix, "prefix");
+ if (monitoring) {
+ LOG.warn(prefix +" metrics system already started!",
+ new MetricsException("Illegal start"));
+ return;
+ }
+ for (Callback cb : callbacks) cb.preStart();
+ configure(prefix);
+ startTimer();
+ monitoring = true;
+ LOG.info(prefix +" metrics system started");
+ for (Callback cb : callbacks) cb.postStart();
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (!monitoring) {
+ LOG.warn(prefix +" metrics system not yet started!",
+ new MetricsException("Illegal stop"));
+ return;
+ }
+ for (Callback cb : callbacks) cb.preStop();
+ LOG.info("Stopping "+ prefix +" metrics system...");
+ stopTimer();
+ stopSources();
+ stopSinks();
+ clearConfigs();
+ monitoring = false;
+ LOG.info(prefix +" metrics system stopped.");
+ for (Callback cb : callbacks) cb.postStop();
+ }
+
+ @Override
+ public synchronized <T extends MetricsSource> T register(final String name,
+ final String desc, final T source) {
+ if (monitoring) {
+ registerSource(name, desc, source);
+ }
+ // We want to re-register the source to pick up new config when the
+ // metrics system restarts.
+ register(new AbstractCallback() {
+
+ @Override public void postStart() {
+ registerSource(name, desc, source);
+ }
+
+ });
+ LOG.debug("Registered source "+ name);
+ return source;
+ }
+
+ synchronized void registerSource(String name, String desc,
+ MetricsSource source) {
+ Contracts.checkNotNull(config, "config");
+ MetricsSourceAdapter sa = sources.get(name);
+ if (sa != null) {
+ LOG.warn("Source name "+name+" already exists!");
+ return;
+ }
+ MetricsConfig conf = sourceConfigs.get(name);
+ sa = conf != null
+ ? new MetricsSourceAdapter(prefix, name, desc, source,
+ injectedTags, period, conf)
+ : new MetricsSourceAdapter(prefix, name, desc, source,
+ injectedTags, period, config.subset(SOURCE_KEY));
+ sources.put(name, sa);
+ sa.start();
+ }
+
+ @Override
+ public synchronized <T extends MetricsSink> T register(final String name,
+ final String description, final T sink) {
+ if (config != null) {
+ registerSink(name, description, sink);
+ }
+ // We want to re-register the sink to pick up new config
+ // when the metrics system restarts.
+ register(new AbstractCallback() {
+
+ @Override public void postStart() {
+ registerSink(name, description, sink);
+ }
+
+ });
+ LOG.debug("Registered sink "+ name);
+ return sink;
+ }
+
+ synchronized void registerSink(String name, String desc, MetricsSink sink) {
+ Contracts.checkNotNull(config, "config");
+ MetricsSinkAdapter sa = sinks.get(name);
+ if (sa != null) {
+ LOG.warn("Sink name "+name+" already exists!");
+ return;
+ }
+ MetricsConfig conf = sinkConfigs.get(name);
+ sa = conf != null
+ ? newSink(name, desc, sink, conf)
+ : newSink(name, desc, sink, config.subset(SINK_KEY));
+ sinks.put(name, sa);
+ sa.start();
+ }
+
+ @Override
+ public synchronized void register(final Callback callback) {
+ callbacks.add((Callback) Proxy.newProxyInstance(
+ callback.getClass().getClassLoader(), new Class<?>[] { Callback.class },
+ new InvocationHandler() {
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ try {
+ return method.invoke(callback, args);
+ }
+ catch (Exception e) {
+ LOG.warn("Caught exception in callback "+ method.getName(), e);
+ }
+ return null;
+ }
+ }));
+ }
+
+ @Override
+ public synchronized void refreshMBeans() {
+ for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
+ entry.getValue().refreshMBean();
+ }
+ }
+
+ @Override
+ public synchronized String currentConfig() {
+ PropertiesConfiguration saver = new PropertiesConfiguration();
+ StringWriter writer = new StringWriter();
+ saver.copy(config);
+ try { saver.save(writer); }
+ catch (Exception e) {
+ throw new MetricsConfigException("Error stringify config", e);
+ }
+ return writer.toString();
+ }
+
+ private synchronized void startTimer() {
+ if (timer != null) {
+ LOG.warn(prefix +" metrics system timer already started!");
+ return;
+ }
+ logicalTime = 0;
+ long millis = period * 1000;
+ timer = new Timer("Timer for '"+ prefix +"' metrics system", true);
+ timer.scheduleAtFixedRate(new TimerTask() {
+ public void run() {
+ try {
+ onTimerEvent();
+ }
+ catch (Exception e) {
+ LOG.warn(e);
+ }
+ }
+ }, millis, millis);
+ LOG.info("Scheduled sampling period at "+ period +" second(s).");
+ }
+
+ synchronized void onTimerEvent() {
+ logicalTime += period;
+ if (sinks.size() > 0) {
+ publishMetrics(sampleMetrics());
+ }
+ }
+
+ /**
+ * Sample all the sources for a snapshot of metrics/tags
+ * @return the metrics buffer containing the snapshot
+ */
+ synchronized MetricsBuffer sampleMetrics() {
+ metricsBuilder.clear();
+ MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder();
+
+ for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
+ if (sourceFilter == null || sourceFilter.accepts(entry.getKey())) {
+ sampleMetrics(entry.getValue(), bufferBuilder);
+ }
+ }
+ if (publishSelfMetrics) {
+ sampleMetrics(sysSource, bufferBuilder);
+ }
+ MetricsBuffer buffer = bufferBuilder.get();
+ return buffer;
+ }
+
+ private void sampleMetrics(MetricsSourceAdapter sa,
+ MetricsBufferBuilder bufferBuilder) {
+ long startTime = System.currentTimeMillis();
+ bufferBuilder.add(sa.name(), sa.getMetrics(metricsBuilder, false));
+ metricsBuilder.clear();
+ sampleStat.add(System.currentTimeMillis() - startTime);
+ LOG.debug("Sampled source "+ sa.name());
+ }
+
+ /**
+ * Publish a metrics snapshot to all the sinks
+ * @param buffer the metrics snapshot to publish
+ */
+ synchronized void publishMetrics(MetricsBuffer buffer) {
+ int dropped = 0;
+ for (Entry<String, MetricsSinkAdapter> entry : sinks.entrySet()) {
+ long startTime = System.currentTimeMillis();
+ dropped += entry.getValue().putMetrics(buffer, logicalTime) ? 0 : 1;
+ publishStat.add(System.currentTimeMillis() - startTime);
+ }
+ dropStat.incr(dropped);
+ }
+
+ private synchronized void stopTimer() {
+ if (timer == null) {
+ LOG.warn(prefix +" metrics system timer already stopped!");
+ return;
+ }
+ timer.cancel();
+ timer = null;
+ }
+
+ private synchronized void stopSources() {
+ for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
+ MetricsSourceAdapter sa = entry.getValue();
+ LOG.info("Stopping metrics source "+ entry.getKey() +"("+
+ sa.source().getClass().getName() +")");
+ sa.stop();
+ }
+ sources.clear();
+ }
+
+ private synchronized void stopSinks() {
+ for (Entry<String, MetricsSinkAdapter> entry : sinks.entrySet()) {
+ MetricsSinkAdapter sa = entry.getValue();
+ LOG.info("Stopping metrics sink "+ entry.getKey() +"("+
+ sa.sink().getClass().getName() +")");
+ sa.stop();
+ }
+ sinks.clear();
+ }
+
+ private synchronized void configure(String prefix) {
+ config = MetricsConfig.create(prefix);
+ configureSinks();
+ configureSources();
+ configureSystem();
+ }
+
+ private synchronized void configureSystem() {
+ injectedTags.add(new MetricsTag("hostName", "Local hostname",
+ getHostname()));
+ }
+
+ private synchronized void configureSinks() {
+ Map<String, MetricsConfig> confs = config.getInstanceConfigs(SINK_KEY);
+ int confPeriod = 0;
+ for (Entry<String, MetricsConfig> entry : confs.entrySet()) {
+ MetricsConfig conf = entry.getValue();
+ int sinkPeriod = conf.getInt(PERIOD_KEY, PERIOD_DEFAULT);
+ confPeriod = confPeriod == 0 ? sinkPeriod
+ : MathUtils.gcd(confPeriod, sinkPeriod);
+ String sinkName = conf.getString(NAME_KEY);
+ if (sinkName != null && !sinkName.isEmpty()) {
+ // named config is for internally registered sinks
+ sinkConfigs.put(sinkName, conf);
+ }
+ else {
+ sinkName = "sink"+ entry.getKey();
+ }
+ try {
+ MetricsSinkAdapter sa = newSink(sinkName,
+ conf.getString(DESC_KEY, sinkName), conf);
+ sa.start();
+ sinks.put(sinkName, sa);
+ }
+ catch (Exception e) {
+ LOG.warn("Error creating "+ sinkName, e);
+ }
+ }
+ period = confPeriod > 0 ? confPeriod
+ : config.getInt(PERIOD_KEY, PERIOD_DEFAULT);
+ }
+
+ static MetricsSinkAdapter newSink(String name, String desc, MetricsSink sink,
+ MetricsConfig conf) {
+ return new MetricsSinkAdapter(name, desc, sink, conf.getString(CONTEXT_KEY),
+ (MetricsFilter) conf.getPlugin(SOURCE_FILTER_KEY),
+ (MetricsFilter) conf.getPlugin(RECORD_FILTER_KEY),
+ (MetricsFilter) conf.getPlugin(METRIC_FILTER_KEY),
+ conf.getInt(PERIOD_KEY, PERIOD_DEFAULT),
+ conf.getInt(QUEUE_CAPACITY_KEY, QUEUE_CAPACITY_DEFAULT),
+ conf.getInt(RETRY_DELAY_KEY, RETRY_DELAY_DEFAULT),
+ conf.getFloat(RETRY_BACKOFF_KEY, RETRY_BACKOFF_DEFAULT),
+ conf.getInt(RETRY_COUNT_KEY, RETRY_COUNT_DEFAULT));
+ }
+
+ static MetricsSinkAdapter newSink(String name, String desc,
+ MetricsConfig conf) {
+ return newSink(name, desc, (MetricsSink) conf.getPlugin(""), conf);
+ }
+
+ private void configureSources() {
+ sourceFilter =
+ (MetricsFilter) config.getPlugin(PREFIX_DEFAULT + SOURCE_FILTER_KEY);
+ Map<String, MetricsConfig> confs = config.getInstanceConfigs(SOURCE_KEY);
+ for (Entry<String, MetricsConfig> entry : confs.entrySet()) {
+ sourceConfigs.put(entry.getKey(), entry.getValue());
+ }
+ registerSystemSource();
+ }
+
+ private void clearConfigs() {
+ sinkConfigs.clear();
+ sourceConfigs.clear();
+ injectedTags.clear();
+ config = null;
+ }
+
+ static String getHostname() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ }
+ catch (Exception e) {
+ LOG.error("Error getting localhost name. Using 'localhost'...", e);
+ }
+ return "localhost";
+ }
+
+ private void registerSystemSource() {
+ sysSource = new MetricsSourceAdapter(prefix, MS_STATS_NAME, MS_STATS_DESC,
+ new MetricsSource() {
+ @Override
+ public void getMetrics(MetricsBuilder builder, boolean all) {
+ int numSources, numSinks;
+ synchronized(MetricsSystemImpl.this) {
+ numSources = sources.size();
+ numSinks = sinks.size();
+ }
+ MetricsRecordBuilder rb = builder.addRecord(MS_NAME)
+ .setContext(MS_CONTEXT)
+ .addGauge(NUM_SOURCES_KEY, NUM_SOURCES_DESC, numSources)
+ .addGauge(NUM_SINKS_KEY, NUM_SINKS_DESC, numSinks);
+ synchronized(MetricsSystemImpl.this) {
+ for (Entry<String, MetricsSinkAdapter> entry : sinks.entrySet()) {
+ entry.getValue().sample(rb, all);
+ }
+ }
+ sampleStat.snapshot(rb, all);
+ publishStat.snapshot(rb, all);
+ dropStat.snapshot(rb, all);
+ }
+ }, injectedTags, null, null, period);
+ sysSource.start();
+ }
+
+ private void initSystemMBean() {
+ mbeanName = MBeans.register(prefix, MS_CONTROL_NAME, this);
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ if (monitoring) {
+ try { stop(); }
+ catch (Exception e) {
+ LOG.warn("Error stopping the metrics system", e);
+ }
+ }
+ MBeans.unregister(mbeanName);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/SinkQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/SinkQueue.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/SinkQueue.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/impl/SinkQueue.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import java.util.ConcurrentModificationException;
+
+/**
+ * A half-blocking (nonblocking for producers, blocking for consumers) queue
+ * for metrics sinks.
+ *
+ * New elements are dropped when the queue is full to preserve "interesting"
+ * elements at the onset of queue filling events
+ */
+class SinkQueue<T> {
+ // A fixed size circular buffer to minimize garbage
+ private final T[] data;
+ private int head; // head position
+ private int tail; // tail position
+ private int size; // number of elements
+ private Thread currentConsumer = null;
+
+ @SuppressWarnings("unchecked")
+ SinkQueue(int capacity) {
+ this.data = (T[]) new Object[Math.max(1, capacity)];
+ head = tail = size = 0;
+ }
+
+ synchronized boolean enqueue(T e) {
+ if (data.length == size) {
+ return false;
+ }
+ ++size;
+ tail = (tail + 1) % data.length;
+ data[tail] = e;
+ notify();
+ return true;
+ }
+
+ /**
+ * Consume one element, will block if queue is empty
+ * Only one consumer at a time is allowed
+ * @param consumer the consumer callback object
+ */
+ void consume(Consumer<T> consumer) throws InterruptedException {
+ T e = waitForData();
+
+ try {
+ consumer.consume(e); // can take forever
+ _dequeue();
+ }
+ finally {
+ clearConsumer();
+ }
+ }
+
+ /**
+ * Consume all the elements, will block if queue is empty
+ * @param consumer the consumer callback object
+ * @throws InterruptedException
+ */
+ void consumeAll(Consumer<T> consumer) throws InterruptedException {
+ waitForData();
+
+ try {
+ for (int i = size(); i-- > 0; ) {
+ consumer.consume(front()); // can take forever
+ _dequeue();
+ }
+ }
+ finally {
+ clearConsumer();
+ }
+ }
+
+ /**
+ * Dequeue one element from head of the queue, will block if queue is empty
+ * @return the first element
+ * @throws InterruptedException
+ */
+ synchronized T dequeue() throws InterruptedException {
+ checkConsumer();
+
+ while (0 == size) {
+ wait();
+ }
+ return _dequeue();
+ }
+
+ private synchronized T waitForData() throws InterruptedException {
+ checkConsumer();
+
+ while (0 == size) {
+ wait();
+ }
+ currentConsumer = Thread.currentThread();
+ return front();
+ }
+
+ private synchronized void checkConsumer() {
+ if (currentConsumer != null) {
+ throw new ConcurrentModificationException("The "+
+ currentConsumer.getName() +" thread is consuming the queue.");
+ }
+ }
+
+ private synchronized void clearConsumer() {
+ currentConsumer = null;
+ }
+
+ private synchronized T _dequeue() {
+ if (0 == size) {
+ throw new IllegalStateException("Size must > 0 here.");
+ }
+ --size;
+ head = (head + 1) % data.length;
+ T ret = data[head];
+ data[head] = null; // hint to gc
+ return ret;
+ }
+
+ synchronized T front() {
+ return data[(head + 1) % data.length];
+ }
+
+ synchronized T back() {
+ return data[tail];
+ }
+
+ synchronized void clear() {
+ checkConsumer();
+
+ for (int i = data.length; i-- > 0; ) {
+ data[i] = null;
+ }
+ size = 0;
+ }
+
+ synchronized int size() {
+ return size;
+ }
+
+ int capacity() {
+ return data.length;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/lib/AbstractMetricsSource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/lib/AbstractMetricsSource.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/lib/AbstractMetricsSource.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/lib/AbstractMetricsSource.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.lib;
+
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+
+/**
+ * A convenient base class for writing metrics sources
+ */
+public abstract class AbstractMetricsSource implements MetricsSource {
+
+ protected final MetricsRegistry registry;
+
+ /**
+ * Construct the source with name and a mutable metrics factory
+ * @param name of the default record
+ * @param mf the factory to create mutable metrics
+ */
+ public AbstractMetricsSource(String name, MetricMutableFactory mf) {
+ registry = new MetricsRegistry(name, mf);
+ }
+
+ /**
+ * Construct the source with a name with a default factory
+ * @param name of the default record
+ */
+ public AbstractMetricsSource(String name) {
+ this(name, new MetricMutableFactory());
+ }
+
+ /**
+ * @return the registry for mutable metrics
+ */
+ public MetricsRegistry registry() {
+ return registry;
+ }
+
+ @Override
+ public void getMetrics(MetricsBuilder builder, boolean all) {
+ registry.snapshot(builder.addRecord(registry.name()), all);
+ }
+
+}
\ No newline at end of file