You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:34:00 UTC
svn commit: r1077597 [4/6] - in
/hadoop/common/branches/branch-0.20-security-patches: ./ conf/ ivy/
src/core/org/apache/hadoop/ipc/ src/core/org/apache/hadoop/ipc/metrics/
src/core/org/apache/hadoop/log/ src/core/org/apache/hadoop/metrics/
src/core/org...
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/util/TryIterator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/util/TryIterator.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/util/TryIterator.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/util/TryIterator.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.util;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * A base class for unmodifiable iterators (throws on remove)
+ *
+ * This class also makes writing filtering iterators easier, where the only
+ * way to discover the end of data is by trying to read it. The same applies
+ * to writing iterator wrappers around stream read calls.
+ *
+ * One only needs to implement the tryNext() method and call done() when done.
+ *
+ * @param <T> the type of the iterator
+ */
+public abstract class TryIterator<T> implements Iterator<T> {
+
+ enum State {
+ PENDING, // Ready to tryNext().
+ GOT_NEXT, // Got the next element from tryNext() and yet to return it.
+ DONE, // Done/finished.
+ FAILED, // An exception occurred in the last op.
+ }
+
+ private State state = State.PENDING;
+ private T next;
+
+ /**
+ * Return the next element. Must call {@link #done()} when done, otherwise
+ * infinite loop could occur. If this method throws an exception, any
+ * further attempts to use the iterator would result in an
+ * {@link IllegalStateException}.
+ *
+ * @return the next element if there is one or return {@link #done()}
+ */
+ protected abstract T tryNext();
+
+ /**
+ * Implementations of {@link #tryNext} <b>must</b> call this method
+ * when there are no more elements left in the iteration.
+ *
+ * @return null as a convenience to implement {@link #tryNext()}
+ */
+ protected final T done() {
+ state = State.DONE;
+ return null;
+ }
+
+ /**
+ * @return true if we have a next element or false otherwise.
+ */
+ public final boolean hasNext() {
+ if (state == State.FAILED)
+ throw new IllegalStateException();
+
+ switch (state) {
+ case DONE: return false;
+ case GOT_NEXT: return true;
+ default:
+ }
+
+ // handle tryNext
+ state = State.FAILED; // just in case
+ next = tryNext();
+
+ if (state != State.DONE) {
+ state = State.GOT_NEXT;
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @return the next element if we have one.
+ */
+ public final T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ state = State.PENDING;
+ return next;
+ }
+
+ /**
+ * @return the current element without advancing the iterator
+ */
+ public final T current() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return next;
+ }
+
+ /**
+ * Guaranteed to throw UnsupportedOperationException
+ */
+ public final void remove() {
+ throw new UnsupportedOperationException("Not allowed.");
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UgiInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UgiInstrumentation.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UgiInstrumentation.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UgiInstrumentation.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricMutableStat;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+
+class UgiInstrumentation implements MetricsSource {
+
+ final MetricsRegistry registry = new MetricsRegistry("ugi");
+ final MetricMutableStat loginSuccess = registry.newStat("loginSuccess");
+ final MetricMutableStat loginFailure = registry.newStat("loginFailure");
+
+ @Override
+ public void getMetrics(MetricsBuilder builder, boolean all) {
+ registry.snapshot(builder.addRecord(registry.name()), all);
+ }
+
+ void addLoginSuccess(long elapsed) {
+ loginSuccess.add(elapsed);
+ }
+
+ void addLoginFailure(long elapsed) {
+ loginFailure.add(elapsed);
+ }
+
+ static UgiInstrumentation create(Configuration conf) {
+ return create(conf, DefaultMetricsSystem.INSTANCE);
+ }
+
+ static UgiInstrumentation create(Configuration conf, MetricsSystem ms) {
+ return ms.register("ugi", "User/group metrics", new UgiInstrumentation());
+ }
+
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java Fri Mar 4 04:33:55 2011
@@ -50,13 +50,6 @@ import javax.security.auth.spi.LoginModu
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
-import org.apache.hadoop.metrics.util.MetricsBase;
-import org.apache.hadoop.metrics.util.MetricsRegistry;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -79,41 +72,6 @@ public class UserGroupInformation {
*/
private static final float TICKET_RENEW_WINDOW = 0.80f;
- /**
- * UgiMetrics maintains UGI activity statistics
- * and publishes them through the metrics interfaces.
- */
- static class UgiMetrics implements Updater {
- final MetricsTimeVaryingRate loginSuccess;
- final MetricsTimeVaryingRate loginFailure;
- private final MetricsRecord metricsRecord;
- private final MetricsRegistry registry;
-
- UgiMetrics() {
- registry = new MetricsRegistry();
- loginSuccess = new MetricsTimeVaryingRate("loginSuccess", registry,
- "Rate of successful kerberos logins and time taken in milliseconds");
- loginFailure = new MetricsTimeVaryingRate("loginFailure", registry,
- "Rate of failed kerberos logins and time taken in milliseconds");
- final MetricsContext metricsContext = MetricsUtil.getContext("ugi");
- metricsRecord = MetricsUtil.createRecord(metricsContext, "ugi");
- metricsContext.registerUpdater(this);
- }
-
- /**
- * Push the metrics to the monitoring subsystem on doUpdate() call.
- */
- @Override
- public void doUpdates(final MetricsContext context) {
- synchronized (this) {
- for (MetricsBase m : registry.getMetricsList()) {
- m.pushMetric(metricsRecord);
- }
- }
- metricsRecord.update();
- }
- }
-
/**
* A login module that looks at the Kerberos, Unix, or Windows principal and
* adds the corresponding UserName.
@@ -175,7 +133,7 @@ public class UserGroupInformation {
}
/** Metrics to track UGI activity */
- static UgiMetrics metrics = new UgiMetrics();
+ static UgiInstrumentation metrics;
/** Are the static variables that depend on configuration initialized? */
private static boolean isInitialized = false;
/** Should we use Kerberos configuration? */
@@ -235,6 +193,7 @@ public class UserGroupInformation {
}
isInitialized = true;
UserGroupInformation.conf = conf;
+ metrics = UgiInstrumentation.create(conf);
}
/**
@@ -583,13 +542,13 @@ public class UserGroupInformation {
new LoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, subject);
start = System.currentTimeMillis();
login.login();
- metrics.loginSuccess.inc(System.currentTimeMillis() - start);
+ metrics.addLoginSuccess(System.currentTimeMillis() - start);
loginUser = new UserGroupInformation(subject);
loginUser.setLogin(login);
loginUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
} catch (LoginException le) {
if (start > 0) {
- metrics.loginFailure.inc(System.currentTimeMillis() - start);
+ metrics.addLoginFailure(System.currentTimeMillis() - start);
}
throw new IOException("Login failure for " + user + " from keytab " +
path, le);
@@ -667,7 +626,7 @@ public class UserGroupInformation {
start = System.currentTimeMillis();
login.login();
- metrics.loginSuccess.inc(System.currentTimeMillis() - start);
+ metrics.addLoginSuccess(System.currentTimeMillis() - start);
UserGroupInformation newLoginUser = new UserGroupInformation(subject);
newLoginUser.setLogin(login);
newLoginUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
@@ -675,7 +634,7 @@ public class UserGroupInformation {
return newLoginUser;
} catch (LoginException le) {
if (start > 0) {
- metrics.loginFailure.inc(System.currentTimeMillis() - start);
+ metrics.addLoginFailure(System.currentTimeMillis() - start);
}
throw new IOException("Login failure for " + user + " from keytab " +
path, le);
@@ -722,11 +681,11 @@ public class UserGroupInformation {
LOG.info("Initiating re-login for " + keytabPrincipal);
start = System.currentTimeMillis();
login.login();
- metrics.loginSuccess.inc(System.currentTimeMillis() - start);
+ metrics.addLoginSuccess(System.currentTimeMillis() - start);
setLogin(login);
} catch (LoginException le) {
if (start > 0) {
- metrics.loginFailure.inc(System.currentTimeMillis() - start);
+ metrics.addLoginFailure(System.currentTimeMillis() - start);
}
throw new IOException("Login failure for " + keytabPrincipal +
" from keytab " + keytabFile, le);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/StringUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/StringUtils.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/StringUtils.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/StringUtils.java Fri Mar 4 04:33:55 2011
@@ -695,9 +695,32 @@ public class StringUtils {
*
* @param separator Separator to join with.
* @param strings Strings to join.
+ * @return the joined string
*/
public static String join(CharSequence separator, Iterable<String> strings) {
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (String s : strings) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(separator);
+ }
+ sb.append(s);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Concatenates strings, using a separator.
+ *
+ * @param separator to join with
+ * @param strings to join
+ * @return the joined string
+ */
+ public static String join(CharSequence separator, String[] strings) {
+ // Ideally we don't have to duplicate the code here if array is iterable.
+ StringBuilder sb = new StringBuilder();
boolean first = true;
for (String s : strings) {
if (first) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Mar 4 04:33:55 2011
@@ -469,7 +469,7 @@ class BlockReceiver implements java.io.C
} else {
checksumOut.write(pktBuf, checksumOff, checksumLen);
}
- datanode.myMetrics.bytesWritten.inc(len);
+ datanode.myMetrics.incrBytesWritten(len);
}
} catch (IOException iex) {
datanode.checkDiskError(iex);
@@ -553,7 +553,7 @@ class BlockReceiver implements java.io.C
// Finalize the block. Does this fsync()?
block.setNumBytes(offsetInBlock);
datanode.data.finalizeBlock(block);
- datanode.myMetrics.blocksWritten.inc();
+ datanode.myMetrics.incrBlocksWritten();
}
} catch (IOException ioe) {
@@ -810,7 +810,7 @@ class BlockReceiver implements java.io.C
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(receiver.offsetInBlock);
datanode.data.finalizeBlock(block);
- datanode.myMetrics.blocksWritten.inc();
+ datanode.myMetrics.incrBlocksWritten();
datanode.notifyNamenodeReceivedBlock(block,
DataNode.EMPTY_DEL_HINT);
if (ClientTraceLog.isInfoEnabled() &&
@@ -944,7 +944,7 @@ class BlockReceiver implements java.io.C
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(receiver.offsetInBlock);
datanode.data.finalizeBlock(block);
- datanode.myMetrics.blocksWritten.inc();
+ datanode.myMetrics.incrBlocksWritten();
datanode.notifyNamenodeReceivedBlock(block,
DataNode.EMPTY_DEL_HINT);
if (ClientTraceLog.isInfoEnabled() &&
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Fri Mar 4 04:33:55 2011
@@ -459,13 +459,13 @@ class DataBlockScanner implements Runnab
StringUtils.stringifyException(e));
if (second) {
- datanode.getMetrics().blockVerificationFailures.inc();
+ datanode.getMetrics().incrBlockVerificationFailures();
handleScanFailure(block);
return;
}
} finally {
IOUtils.closeStream(blockSender);
- datanode.getMetrics().blocksVerified.inc();
+ datanode.getMetrics().incrBlocksVerified();
totalScans++;
totalVerifications++;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Mar 4 04:33:55 2011
@@ -73,7 +73,7 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
-import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeInstrumentation;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
import org.apache.hadoop.hdfs.server.namenode.JspHelper;
@@ -95,6 +95,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
@@ -205,7 +206,7 @@ public class DataNode extends Configured
long heartBeatInterval;
private DataStorage storage = null;
private HttpServer infoServer = null;
- DataNodeMetrics myMetrics;
+ DataNodeInstrumentation myMetrics;
private static InetSocketAddress nameNodeAddr;
private InetSocketAddress selfAddr;
private static DataNode datanodeObject = null;
@@ -260,7 +261,6 @@ public class DataNode extends Configured
DataNode(final Configuration conf,
final AbstractList<File> dataDirs, SecureResources resources) throws IOException {
super(conf);
- UserGroupInformation.setConfiguration(conf);
SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY,
DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);
@@ -440,7 +440,8 @@ public class DataNode extends Configured
this.infoServer.start();
// adjust info port
this.dnRegistration.setInfoPort(this.infoServer.getPort());
- myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
+ myMetrics = DataNodeInstrumentation.create(conf,
+ dnRegistration.getStorageID());
// set service-level authorization security policy
if (conf.getBoolean(
@@ -566,7 +567,7 @@ public class DataNode extends Configured
return selfAddr;
}
- DataNodeMetrics getMetrics() {
+ DataNodeInstrumentation getMetrics() {
return myMetrics;
}
@@ -848,7 +849,7 @@ public class DataNode extends Configured
data.getRemaining(),
xmitsInProgress.get(),
getXceiverCount());
- myMetrics.heartbeats.inc(now() - startTime);
+ myMetrics.addHeartBeat(now() - startTime);
//LOG.info("Just sent heartbeat, with name " + localName);
if (!processCommand(cmds))
continue;
@@ -899,7 +900,7 @@ public class DataNode extends Configured
DatanodeCommand cmd = namenode.blockReport(dnRegistration,
BlockListAsLongs.convertToArrayLongs(bReport));
long brTime = now() - brStartTime;
- myMetrics.blockReports.inc(brTime);
+ myMetrics.addBlockReport(brTime);
LOG.info("BlockReport of " + bReport.length +
" blocks got processed in " + brTime + " msecs");
//
@@ -996,7 +997,7 @@ public class DataNode extends Configured
case DatanodeProtocol.DNA_TRANSFER:
// Send a copy of a block to another datanode
transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
- myMetrics.blocksReplicated.inc(bcmd.getBlocks().length);
+ myMetrics.incrBlocksReplicated(bcmd.getBlocks().length);
break;
case DatanodeProtocol.DNA_INVALIDATE:
//
@@ -1013,7 +1014,7 @@ public class DataNode extends Configured
checkDiskError();
throw e;
}
- myMetrics.blocksRemoved.inc(toDelete.length);
+ myMetrics.incrBlocksRemoved(toDelete.length);
break;
case DatanodeProtocol.DNA_SHUTDOWN:
// shut down the data node
@@ -1404,6 +1405,7 @@ public class DataNode extends Configured
String[] dataDirs = conf.getStrings(DATA_DIR_KEY);
dnThreadName = "DataNode: [" +
StringUtils.arrayToString(dataDirs) + "]";
+ DefaultMetricsSystem.initialize("DataNode");
return makeInstance(dataDirs, conf, resources);
}
@@ -1449,6 +1451,7 @@ public class DataNode extends Configured
*/
public static DataNode makeInstance(String[] dataDirs, Configuration conf,
SecureResources resources) throws IOException {
+ UserGroupInformation.setConfiguration(conf);
LocalFileSystem localFS = FileSystem.getLocal(conf);
ArrayList<File> dirs = new ArrayList<File>();
FsPermission dataDirPermission =
@@ -1612,7 +1615,7 @@ public class DataNode extends Configured
data.updateBlock(oldblock, newblock);
if (finalize) {
data.finalizeBlock(newblock);
- myMetrics.blocksWritten.inc();
+ myMetrics.incrBlocksWritten();
notifyNamenodeReceivedBlock(newblock, EMPTY_DEL_HINT);
LOG.info("Received block " + newblock +
" of size " + newblock.getNumBytes() +
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Mar 4 04:33:55 2011
@@ -97,32 +97,32 @@ class DataXceiver implements Runnable, F
switch ( op ) {
case DataTransferProtocol.OP_READ_BLOCK:
readBlock( in );
- datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
+ datanode.myMetrics.addReadBlockOp(DataNode.now() - startTime);
if (local)
- datanode.myMetrics.readsFromLocalClient.inc();
+ datanode.myMetrics.incrReadsFromLocalClient();
else
- datanode.myMetrics.readsFromRemoteClient.inc();
+ datanode.myMetrics.incrReadsFromRemoteClient();
break;
case DataTransferProtocol.OP_WRITE_BLOCK:
writeBlock( in );
- datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
+ datanode.myMetrics.addWriteBlockOp(DataNode.now() - startTime);
if (local)
- datanode.myMetrics.writesFromLocalClient.inc();
+ datanode.myMetrics.incrWritesFromLocalClient();
else
- datanode.myMetrics.writesFromRemoteClient.inc();
+ datanode.myMetrics.incrWritesFromRemoteClient();
break;
case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
replaceBlock(in);
- datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
+ datanode.myMetrics.addReplaceBlockOp(DataNode.now() - startTime);
break;
case DataTransferProtocol.OP_COPY_BLOCK:
// for balancing purpose; send to a proxy source
copyBlock(in);
- datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
+ datanode.myMetrics.addCopyBlockOp(DataNode.now() - startTime);
break;
case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
getBlockChecksum(in);
- datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
+ datanode.myMetrics.addBlockChecksumOp(DataNode.now() - startTime);
break;
default:
throw new IOException("Unknown opcode " + op + " in data stream");
@@ -207,11 +207,11 @@ class DataXceiver implements Runnable, F
} catch (IOException ignored) {}
}
- datanode.myMetrics.bytesRead.inc((int) read);
- datanode.myMetrics.blocksRead.inc();
+ datanode.myMetrics.incrBytesRead((int) read);
+ datanode.myMetrics.incrBlocksRead();
} catch ( SocketException ignored ) {
// Its ok for remote side to close the connection anytime.
- datanode.myMetrics.blocksRead.inc();
+ datanode.myMetrics.incrBlocksRead();
} catch ( IOException ioe ) {
/* What exactly should we do here?
* Earlier version shutdown() datanode if there is disk error.
@@ -540,8 +540,8 @@ class DataXceiver implements Runnable, F
long read = blockSender.sendBlock(reply, baseStream,
dataXceiverServer.balanceThrottler);
- datanode.myMetrics.bytesRead.inc((int) read);
- datanode.myMetrics.blocksRead.inc();
+ datanode.myMetrics.incrBytesRead((int) read);
+ datanode.myMetrics.incrBlocksRead();
LOG.info("Copied block " + block + " to " + s.getRemoteSocketAddress());
} catch (IOException ioe) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Fri Mar 4 04:33:55 2011
@@ -43,9 +43,9 @@ import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
-import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -1553,7 +1553,7 @@ public class FSDataset implements FSCons
}
try {
bean = new StandardMBean(this,FSDatasetMBean.class);
- mbeanName = MBeanUtil.registerMBean("DataNode", "FSDatasetState-" + storageName, bean);
+ mbeanName = MBeans.register("DataNode", "FSDatasetState-" + storageName, bean);
} catch (NotCompliantMBeanException e) {
e.printStackTrace();
}
@@ -1563,7 +1563,7 @@ public class FSDataset implements FSCons
public void shutdown() {
if (mbeanName != null)
- MBeanUtil.unregisterMBean(mbeanName);
+ MBeans.unregister(mbeanName);
if(volumes != null) {
for (FSVolume volume : volumes.volumes) {
Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MetricMutableStat;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.source.JvmMetricsSource;
+
+public class DataNodeInstrumentation implements MetricsSource {
+
+ final MetricsRegistry registry = new MetricsRegistry("datanode");
+
+ final MetricMutableCounterLong bytesWritten =
+ registry.newCounter("bytes_written", "", 0L);
+ final MetricMutableCounterLong bytesRead =
+ registry.newCounter("bytes_read", "", 0L);
+ final MetricMutableCounterInt blocksWritten =
+ registry.newCounter("blocks_written", "", 0);
+ final MetricMutableCounterInt blocksRead =
+ registry.newCounter("blocks_read", "", 0);
+ final MetricMutableCounterInt blocksReplicated =
+ registry.newCounter("blocks_replicated", "", 0);
+ final MetricMutableCounterInt blocksRemoved =
+ registry.newCounter("blocks_removed", "", 0);
+ final MetricMutableCounterInt blocksVerified =
+ registry.newCounter("blocks_verified", "", 0);
+ final MetricMutableCounterInt blockVerificationFailures =
+ registry.newCounter("block_verification_failures", "", 0);
+
+ final MetricMutableCounterInt readsFromLocalClient =
+ registry.newCounter("reads_from_local_client", "", 0);
+ final MetricMutableCounterInt readsFromRemoteClient =
+ registry.newCounter("reads_from_remote_client", "", 0);
+ final MetricMutableCounterInt writesFromLocalClient =
+ registry.newCounter("writes_from_local_client", "", 0);
+ final MetricMutableCounterInt writesFromRemoteClient =
+ registry.newCounter("writes_from_remote_client", "", 0);
+
+ final MetricMutableStat readBlockOp = registry.newStat("readBlockOp");
+ final MetricMutableStat writeBlockOp = registry.newStat("writeBlockOp");
+ final MetricMutableStat blockChecksumOp = registry.newStat("blockChecksumOp");
+ final MetricMutableStat copyBlockOp = registry.newStat("copyBlockOp");
+ final MetricMutableStat replaceBlockOp = registry.newStat("replaceBlockOp");
+ final MetricMutableStat heartbeats = registry.newStat("heartBeats");
+ final MetricMutableStat blockReports = registry.newStat("blockReports");
+
+
+ public DataNodeInstrumentation(Configuration conf, String storageId) {
+ String sessionId = conf.get("session.id");
+ JvmMetricsSource.create("DataNode", sessionId);
+ registry.setContext("dfs").tag("sessionId", "", sessionId);
+ }
+
+ public void shutdown() {
+ // metrics system shutdown would suffice
+ }
+
+ public void resetAllMinMax() {
+ readBlockOp.resetMinMax();
+ writeBlockOp.resetMinMax();
+ blockChecksumOp.resetMinMax();
+ copyBlockOp.resetMinMax();
+ replaceBlockOp.resetMinMax();
+ heartbeats.resetMinMax();
+ blockReports.resetMinMax();
+ }
+
+ public void addHeartBeat(long latency) {
+ heartbeats.add(latency);
+ }
+
+ public void addBlockReport(long latency) {
+ blockReports.add(latency);
+ }
+
+ public void incrBlocksReplicated(int delta) {
+ blocksReplicated.incr(delta);
+ }
+
+ public void incrBlocksWritten() {
+ blocksWritten.incr();
+ }
+
+ public void incrBlocksRemoved(int delta) {
+ blocksRemoved.incr(delta);
+ }
+
+ public void incrBytesWritten(int delta) {
+ bytesWritten.incr(delta);
+ }
+
+ public void incrBlockVerificationFailures() {
+ blockVerificationFailures.incr();
+ }
+
+ public void incrBlocksVerified() {
+ blocksVerified.incr();
+ }
+
+ public void addReadBlockOp(long latency) {
+ readBlockOp.add(latency);
+ }
+
+ public void incrReadsFromLocalClient() {
+ readsFromLocalClient.incr();
+ }
+
+ public void incrReadsFromRemoteClient() {
+ readsFromRemoteClient.incr();
+ }
+
+ public void addWriteBlockOp(long latency) {
+ writeBlockOp.add(latency);
+ }
+
+ public void incrWritesFromLocalClient() {
+ writesFromLocalClient.incr();
+ }
+
+ public void incrWritesFromRemoteClient() {
+ writesFromRemoteClient.incr();
+ }
+
+ public void addReplaceBlockOp(long latency) {
+ replaceBlockOp.add(latency);
+ }
+
+ public void addCopyBlockOp(long latency) {
+ copyBlockOp.add(latency);
+ }
+
+ public void addBlockChecksumOp(long latency) {
+ blockChecksumOp.add(latency);
+ }
+
+ public void incrBytesRead(int delta) {
+ bytesRead.incr(delta);
+ }
+
+ public void incrBlocksRead() {
+ blocksRead.incr();
+ }
+
+ public void getMetrics(MetricsBuilder builder, boolean all) {
+ registry.snapshot(builder.addRecord(registry.name()), all);
+ }
+
+ public static DataNodeInstrumentation create(Configuration conf,
+ String storageID) {
+ return create(conf, storageID, DefaultMetricsSystem.INSTANCE);
+ }
+
+ public static DataNodeInstrumentation create(Configuration conf,
+ String storageID,
+ MetricsSystem ms) {
+ return ms.register("DataNode", "DataNode metrics",
+ new DataNodeInstrumentation(conf, storageID));
+ }
+
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Mar 4 04:33:55 2011
@@ -115,7 +115,7 @@ class FSDirectory implements FSConstants
private void incrDeletedFileCount(int count) {
if (namesystem != null)
- NameNode.getNameNodeMetrics().numFilesDeleted.inc(count);
+ NameNode.getNameNodeMetrics().incrFilesDeleted(count);
}
/**
@@ -977,7 +977,7 @@ class FSDirectory implements FSConstants
// Directory creation also count towards FilesCreated
// to match count of FilesDeleted metric.
if (namesystem != null)
- NameNode.getNameNodeMetrics().numFilesCreated.inc();
+ NameNode.getNameNodeMetrics().incrNumFilesCreated();
fsImage.getEditLog().logMkDir(cur, inodes[i]);
NameNode.stateChangeLog.debug(
"DIR* FSDirectory.mkdirs: created directory " + cur);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Mar 4 04:33:55 2011
@@ -41,7 +41,7 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeInstrumentation;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.permission.*;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -96,7 +96,7 @@ public class FSEditLog {
private long numTransactions; // number of transactions
private long numTransactionsBatchedInSync;
private long totalTimeTransactions; // total time for all transactions
- private NameNodeMetrics metrics;
+ private NameNodeInstrumentation metrics;
private static class TransactionId {
public long txid;
@@ -915,7 +915,7 @@ public class FSEditLog {
numTransactions++;
totalTimeTransactions += (end-start);
if (metrics != null) // Metrics is non-null only when used inside name node
- metrics.transactions.inc((end-start));
+ metrics.addTransaction(end-start);
}
//
@@ -948,7 +948,7 @@ public class FSEditLog {
if (mytxid <= synctxid) {
numTransactionsBatchedInSync++;
if (metrics != null) // Metrics is non-null only when used inside name node
- metrics.transactionsBatchedInSync.inc();
+ metrics.incrTransactionsBatchedInSync();
return;
}
@@ -990,7 +990,7 @@ public class FSEditLog {
}
if (metrics != null) // Metrics is non-null only when used inside name node
- metrics.syncs.inc(elapsed);
+ metrics.addSync(elapsed);
}
//
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Mar 4 04:33:55 2011
@@ -32,7 +32,6 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
-import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -42,7 +41,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.util.*;
-import org.apache.hadoop.metrics.util.MBeanUtil;
+import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
@@ -143,7 +142,6 @@ public class FSNamesystem implements FSC
private String supergroup;
private PermissionStatus defaultPermission;
// FSNamesystemMetrics counter variables
- private FSNamesystemMetrics myFSMetrics;
private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
private int totalLoad = 0;
boolean isAccessTokenEnabled;
@@ -345,8 +343,7 @@ public class FSNamesystem implements FSC
getNamespaceEditsDirs(conf), startOpt);
long timeTakenToLoadFSImage = now() - systemStart;
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
- NameNode.getNameNodeMetrics().fsImageLoadTime.set(
- (int) timeTakenToLoadFSImage);
+ NameNode.getNameNodeMetrics().setFsImageLoadTime(timeTakenToLoadFSImage);
this.safeMode = new SafeModeInfo(conf);
setBlockTotal();
pendingReplications = new PendingReplicationBlocks(
@@ -3080,7 +3077,7 @@ public class FSNamesystem implements FSC
+ " does not belong to any file.");
addToInvalidates(b, node);
}
- NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
+ NameNode.getNameNodeMetrics().addBlockReport(now() - startTime);
}
/**
@@ -4283,7 +4280,7 @@ public class FSNamesystem implements FSC
long timeInSafemode = now() - systemStart;
NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
+ timeInSafemode/1000 + " secs.");
- NameNode.getNameNodeMetrics().safeModeTime.set((int) timeInSafemode);
+ NameNode.getNameNodeMetrics().setSafeModeTime(timeInSafemode);
if (reached >= 0) {
NameNode.stateChangeLog.info("STATE* Safe mode is OFF.");
@@ -4777,21 +4774,13 @@ public class FSNamesystem implements FSC
// package naming for mbeans and their impl.
StandardMBean bean;
try {
- myFSMetrics = new FSNamesystemMetrics(conf);
bean = new StandardMBean(this,FSNamesystemMBean.class);
- mbeanName = MBeanUtil.registerMBean("NameNode", "FSNamesystemState", bean);
+ mbeanName = MBeans.register("NameNode", "FSNamesystemState", bean);
} catch (NotCompliantMBeanException e) {
e.printStackTrace();
}
- LOG.info("Registered FSNamesystemStatusMBean");
- }
-
- /**
- * get FSNamesystemMetrics
- */
- public FSNamesystemMetrics getFSNamesystemMetrics() {
- return myFSMetrics;
+ LOG.info("Registered FSNamesystemStateMBean");
}
/**
@@ -4799,7 +4788,7 @@ public class FSNamesystem implements FSC
*/
public void shutdown() {
if (mbeanName != null)
- MBeanUtil.unregisterMBean(mbeanName);
+ MBeans.unregister(mbeanName);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Mar 4 04:33:55 2011
@@ -32,7 +32,7 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.CompleteFileStatus;
-import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeInstrumentation;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -66,6 +66,7 @@ import java.net.*;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.Iterator;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
/**********************************************************
* NameNode serves as both directory namespace manager and
@@ -161,13 +162,13 @@ public class NameNode implements ClientP
format(conf, false);
}
- static NameNodeMetrics myMetrics;
+ static NameNodeInstrumentation myMetrics;
public FSNamesystem getNamesystem() {
return namesystem;
}
- public static NameNodeMetrics getNameNodeMetrics() {
+ public static NameNodeInstrumentation getNameNodeMetrics() {
return myMetrics;
}
@@ -249,7 +250,7 @@ public class NameNode implements ClientP
ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider());
}
- myMetrics = new NameNodeMetrics(conf, this);
+ myMetrics = NameNodeInstrumentation.create(conf);
this.namesystem = new FSNamesystem(this, conf);
if (UserGroupInformation.isSecurityEnabled()) {
@@ -517,7 +518,7 @@ public class NameNode implements ClientP
public LocatedBlocks getBlockLocations(String src,
long offset,
long length) throws IOException {
- myMetrics.numGetBlockLocations.inc();
+ myMetrics.incrNumGetBlockLocations();
return namesystem.getBlockLocations(getClientMachine(),
src, offset, length);
}
@@ -531,7 +532,7 @@ public class NameNode implements ClientP
long offset,
long length)
throws IOException {
- myMetrics.numGetBlockLocations.inc();
+ myMetrics.incrNumGetBlockLocations();
return namesystem.getBlockLocations(src, offset, length, false);
}
@@ -564,8 +565,8 @@ public class NameNode implements ClientP
new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
null, masked),
clientName, clientMachine, overwrite, replication, blockSize);
- myMetrics.numFilesCreated.inc();
- myMetrics.numCreateFileOps.inc();
+ myMetrics.incrNumFilesCreated();
+ myMetrics.incrNumCreateFileOps();
}
/** {@inheritDoc} */
@@ -576,7 +577,7 @@ public class NameNode implements ClientP
+src+" for "+clientName+" at "+clientMachine);
}
LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
- myMetrics.numFilesAppended.inc();
+ myMetrics.incrNumFilesAppended();
return info;
}
@@ -607,7 +608,7 @@ public class NameNode implements ClientP
+src+" for "+clientName);
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, clientName);
if (locatedBlock != null)
- myMetrics.numAddBlockOps.inc();
+ myMetrics.incrNumAddBlockOps();
return locatedBlock;
}
@@ -682,7 +683,7 @@ public class NameNode implements ClientP
}
boolean ret = namesystem.renameTo(src, dst);
if (ret) {
- myMetrics.numFilesRenamed.inc();
+ myMetrics.incrNumFilesRenamed();
}
return ret;
}
@@ -702,7 +703,7 @@ public class NameNode implements ClientP
}
boolean ret = namesystem.delete(src, recursive);
if (ret)
- myMetrics.numDeleteFileOps.inc();
+ myMetrics.incrNumDeleteFileOps();
return ret;
}
@@ -740,9 +741,9 @@ public class NameNode implements ClientP
public DirectoryListing getListing(String src, byte[] startAfter)
throws IOException {
DirectoryListing files = namesystem.getListing(src, startAfter);
- myMetrics.numGetListingOps.inc();
+ myMetrics.incrNumGetListingOps();
if (files != null) {
- myMetrics.numFilesInGetListingOps.inc(files.getPartialListing().length);
+ myMetrics.incrNumFilesInGetListingOps(files.getPartialListing().length);
}
return files;
}
@@ -755,7 +756,7 @@ public class NameNode implements ClientP
* or null if file not found
*/
public HdfsFileStatus getFileInfo(String src) throws IOException {
- myMetrics.numFileInfoOps.inc();
+ myMetrics.incrNumFileInfoOps();
return namesystem.getFileInfo(src);
}
@@ -1161,7 +1162,7 @@ public class NameNode implements ClientP
System.exit(aborted ? 1 : 0);
default:
}
-
+ DefaultMetricsSystem.initialize("NameNode");
NameNode namenode = new NameNode(conf);
return namenode;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Fri Mar 4 04:33:55 2011
@@ -40,7 +40,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics2.source.JvmMetricsSource;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Krb5AndCertsSslSocketConnector;
import org.apache.hadoop.security.SecurityUtil;
@@ -154,7 +154,7 @@ public class SecondaryNameNode implement
infoBindAddress);
}
// initiate Java VM metrics
- JvmMetrics.init("SecondaryNameNode", conf.get("session.id"));
+ JvmMetricsSource.create("SecondaryNameNode", conf.get("session.id"));
// Create connection to the namenode.
shouldRun = true;
Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeInstrumentation.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeInstrumentation.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeInstrumentation.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.metrics;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MetricMutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MetricMutableStat;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.source.JvmMetricsSource;
+
+public class NameNodeInstrumentation implements MetricsSource {
+ static final Log LOG = LogFactory.getLog(NameNodeInstrumentation.class);
+
+ static final String FSNAMESYSTEM_RECORD_NAME = "FSNamesystem";
+ final String sessionId;
+ final MetricsRegistry registry = new MetricsRegistry("namenode");
+ final MetricMutableCounterInt numFilesCreated =
+ registry.newCounter("FilesCreated", "", 0);
+ final MetricMutableCounterInt numFilesAppended =
+ registry.newCounter("FilesAppended", "", 0);
+ final MetricMutableCounterInt numGetBlockLocations =
+ registry.newCounter("GetBlockLocations", "", 0);
+ final MetricMutableCounterInt numFilesRenamed =
+ registry.newCounter("FilesRenamed", "", 0);
+ final MetricMutableCounterInt numGetListingOps =
+ registry.newCounter("GetListingOps", "", 0);
+ final MetricMutableCounterInt numCreateFileOps =
+ registry.newCounter("CreateFileOps", "", 0);
+ final MetricMutableCounterInt numFilesDeleted =
+ registry.newCounter("FilesDeleted", "Files deleted (inc. rename)", 0);
+ final MetricMutableCounterInt numDeleteFileOps =
+ registry.newCounter("DeleteFileOps", "", 0);
+ final MetricMutableCounterInt numFileInfoOps =
+ registry.newCounter("FileInfoOps", "", 0);
+ final MetricMutableCounterInt numAddBlockOps =
+ registry.newCounter("AddBlockOps", "", 0);
+ final MetricMutableStat transactions = registry.newStat("Transactions");
+ final MetricMutableStat syncs = registry.newStat("Syncs");
+ final MetricMutableCounterInt transactionsBatchedInSync =
+ registry.newCounter("JournalTransactionsBatchedInSync", "", 0);
+ final MetricMutableStat blockReport = registry.newStat("blockReport");
+ final MetricMutableGaugeInt safeModeTime =
+ registry.newGauge("SafemodeTime", "Time spent in safe mode", 0);
+ final MetricMutableGaugeInt fsImageLoadTime =
+ registry.newGauge("fsImageLoadTime", "", 0);
+ final MetricMutableCounterInt numFilesInGetListingOps =
+ registry.newCounter("FilesInGetListingOps", "", 0);
+
+ final MetricsSource fsNamesystemMetrics;
+
+ NameNodeInstrumentation(Configuration conf) {
+ sessionId = conf.get("session.id");
+ fsNamesystemMetrics = new FSNamesystemMetrics();
+ JvmMetricsSource.create("NameNode", sessionId);
+ registry.setContext("dfs").tag("sessionId", "", sessionId);
+ }
+
+ public static NameNodeInstrumentation create(Configuration conf) {
+ return create(conf, DefaultMetricsSystem.INSTANCE);
+ }
+
+ /**
+ * Create a v2 metrics instrumentation
+ * @param conf the configuration object
+ * @param ms the metrics system instance
+ * @return a metrics
+ */
+ public static NameNodeInstrumentation create(Configuration conf,
+ MetricsSystem ms) {
+ NameNodeInstrumentation v2 = new NameNodeInstrumentation(conf);
+ ms.register("FSNamesystemState", "FS name system state",
+ v2.fsNamesystemMetrics());
+ return ms.register("NameNode", "NameNode metrics", v2);
+ }
+
+ public MetricsSource fsNamesystemMetrics() {
+ return fsNamesystemMetrics;
+ }
+
+ public void shutdown() {
+ // metrics system shutdown would suffice
+ }
+
+
+ public final void incrNumGetBlockLocations() {
+ numGetBlockLocations.incr();
+ }
+
+
+ public final void incrNumFilesCreated() {
+ numFilesCreated.incr();
+ }
+
+
+ public final void incrNumCreateFileOps() {
+ numCreateFileOps.incr();
+ }
+
+
+ public final void incrNumFilesAppended() {
+ numFilesAppended.incr();
+ }
+
+
+ public final void incrNumAddBlockOps() {
+ numAddBlockOps.incr();
+ }
+
+
+ public final void incrNumFilesRenamed() {
+ numFilesRenamed.incr();
+ }
+
+
+ public void incrFilesDeleted(int delta) {
+ numFilesDeleted.incr(delta);
+ }
+
+
+ public final void incrNumDeleteFileOps() {
+ numDeleteFileOps.incr();
+ }
+
+
+ public final void incrNumGetListingOps() {
+ numGetListingOps.incr();
+ }
+
+
+ public final void incrNumFilesInGetListingOps(int delta) {
+ numFilesInGetListingOps.incr(delta);
+ }
+
+
+ public final void incrNumFileInfoOps() {
+ numFileInfoOps.incr();
+ }
+
+
+ public final void addTransaction(long latency) {
+ transactions.add(latency);
+ }
+
+
+ public final void incrTransactionsBatchedInSync() {
+ transactionsBatchedInSync.incr();
+ }
+
+
+ public final void addSync(long elapsed) {
+ syncs.add(elapsed);
+ }
+
+
+ public final void setFsImageLoadTime(long elapsed) {
+ fsImageLoadTime.set((int) elapsed);
+ }
+
+
+ public final void addBlockReport(long latency) {
+ blockReport.add(latency);
+ }
+
+
+ public final void setSafeModeTime(long elapsed) {
+ safeModeTime.set((int) elapsed);
+ }
+
+ public void getMetrics(MetricsBuilder builder, boolean all) {
+ registry.snapshot(builder.addRecord(registry.name()), all);
+ }
+
+ private static int roundBytesToGBytes(long bytes) {
+ return Math.round(((float)bytes/(1024 * 1024 * 1024)));
+ }
+
+ private class FSNamesystemMetrics implements MetricsSource {
+
+ public void getMetrics(MetricsBuilder builder, boolean all) {
+ // Since fsnamesystem metrics are poll based, we just put them here
+ // to avoid an extra copy per metric.
+ FSNamesystem fsNamesystem = FSNamesystem.getFSNamesystem();
+ if (fsNamesystem == null) {
+ LOG.debug("FSNamesystem not ready yet!");
+ return;
+ }
+ builder.addRecord(FSNAMESYSTEM_RECORD_NAME).setContext("dfs")
+ .tag("sessionId", "", sessionId)
+ .addGauge("FilesTotal", "", fsNamesystem.getFilesTotal())
+ .addGauge("BlocksTotal", "", fsNamesystem.getBlocksTotal())
+ .addGauge("CapacityTotalGB", "",
+ roundBytesToGBytes(fsNamesystem.getCapacityTotal()))
+ .addGauge("CapacityUsedGB", "",
+ roundBytesToGBytes(fsNamesystem.getCapacityUsed()))
+ .addGauge("CapacityRemainingGB", "",
+ roundBytesToGBytes(fsNamesystem.getCapacityRemaining()))
+ .addGauge("TotalLoad", "", fsNamesystem.getTotalLoad())
+ .addGauge("CorruptBlocks", "", fsNamesystem.getCorruptReplicaBlocks())
+ .addGauge("ExcessBlocks", "", fsNamesystem.getExcessBlocks())
+ .addGauge("PendingDeletionBlocks", "",
+ fsNamesystem.getPendingDeletionBlocks())
+ .addGauge("PendingReplicationBlocks", "",
+ fsNamesystem.getPendingReplicationBlocks())
+ .addGauge("UnderReplicatedBlocks", "",
+ fsNamesystem.getUnderReplicatedBlocks())
+ .addGauge("ScheduledReplicationBlocks", "",
+ fsNamesystem.getScheduledReplicationBlocks())
+ .addGauge("MissingBlocks", "", fsNamesystem.getMissingBlocksCount())
+ .addGauge("BlockCapacity", "", fsNamesystem.getBlockCapacity());
+ }
+
+ }
+
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar 4 04:33:55 2011
@@ -176,12 +176,14 @@
</description>
</property>
+<!--
<property>
<name>mapred.tasktracker.instrumentation</name>
- <value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value>
+ <value>com.example.hadoop.TaskTrackerInstrumentation</value>
<description>Expert: The instrumentation class to associate with each TaskTracker.
</description>
</property>
+-->
<property>
<name>mapred.tasktracker.memory_calculator_plugin</name>
@@ -387,12 +389,14 @@
</description>
</property>
+<!--
<property>
<name>mapred.jobtracker.instrumentation</name>
- <value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value>
+ <value>com.example.hadoop.JobTrackerInstrumentation</value>
<description>Expert: The instrumentation class to associate with each JobTracker.
</description>
</property>
+-->
<property>
<name>mapred.child.java.opts</name>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar 4 04:33:55 2011
@@ -34,9 +34,9 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetricsSource;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -68,6 +68,7 @@ class Child {
final int SLEEP_LONGER_COUNT = 5;
int jvmIdInt = Integer.parseInt(args[4]);
JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
+ String prefix = firstTaskid.isMap() ? "MapTask" : "ReduceTask";
// file name is passed thru env
String jobTokenFile =
@@ -144,7 +145,7 @@ class Child {
Task task = null;
UserGroupInformation childUGI = null;
-
+
try {
while (true) {
taskid = null;
@@ -198,7 +199,8 @@ class Child {
task.setConf(job);
// Initiate Java VM metrics
- JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
+ initMetrics(prefix, jvmId.toString(), job.getSessionId());
+
LOG.debug("Creating remote user to execute task: " + job.get("user.name"));
childUGI = UserGroupInformation.createRemoteUser(job.get("user.name"));
// Add tokens to new user so that it may execute its task correctly.
@@ -268,12 +270,22 @@ class Child {
}
} finally {
RPC.stopProxy(umbilical);
- MetricsContext metricsContext = MetricsUtil.getContext("mapred");
- metricsContext.close();
+ shutdownMetrics();
// Shutting down log4j of the child-vm...
// This assumes that on return from Task.run()
// there is no more logging done.
LogManager.shutdown();
}
}
+
+ private static void initMetrics(String prefix, String procName,
+ String sessionId) {
+ DefaultMetricsSystem.initialize(prefix);
+ JvmMetricsSource.create(procName, sessionId);
+ }
+
+ private static void shutdownMetrics() {
+ DefaultMetricsSystem.INSTANCE.shutdown();
+ }
+
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InfoMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InfoMap.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InfoMap.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InfoMap.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.LinkedHashMap;
+import org.mortbay.util.ajax.JSON;
+
+class InfoMap extends LinkedHashMap<String, Object> {
+
+ private static final long serialVersionUID = 1L;
+
+ String toJson() {
+ return JSON.toString(this);
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 04:33:55 2011
@@ -57,9 +57,6 @@ import org.apache.hadoop.mapreduce.serve
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
@@ -275,8 +272,6 @@ public class JobInProgress {
}
private Counters jobCounters = new Counters();
- private MetricsRecord jobMetrics;
-
// Maximum no. of fetch-failure notifications after which
// the map task is killed
private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
@@ -441,12 +436,6 @@ public class JobInProgress {
this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
- MetricsContext metricsContext = MetricsUtil.getContext("mapred");
- this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
- this.jobMetrics.setTag("user", conf.getUser());
- this.jobMetrics.setTag("sessionId", conf.getSessionId());
- this.jobMetrics.setTag("jobName", conf.getJobName());
- this.jobMetrics.setTag("jobId", jobId.toString());
hasSpeculativeMaps = conf.getMapSpeculativeExecution();
hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
// a limit on the input size of the reduce.
@@ -477,34 +466,12 @@ public class JobInProgress {
FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
}
}
-
- /**
- * Called periodically by JobTrackerMetrics to update the metrics for
- * this job.
- */
- public void updateMetrics() {
- Counters counters = getCounters();
- for (Counters.Group group : counters) {
- jobMetrics.setTag("group", group.getDisplayName());
- for (Counters.Counter counter : group) {
- jobMetrics.setTag("counter", counter.getDisplayName());
- jobMetrics.setMetric("value", (float) counter.getCounter());
- jobMetrics.update();
- }
- }
- }
/**
* Called when the job is complete
*/
public void cleanUpMetrics() {
- // Deletes all metric data for this job (in internal table in metrics package).
- // This frees up RAM and possibly saves network bandwidth, since otherwise
- // the metrics package implementation might continue to send these job metrics
- // after the job has finished.
- jobMetrics.removeTag("group");
- jobMetrics.removeTag("counter");
- jobMetrics.remove();
+ // per job metrics is disabled for now.
}
private void printCache (Map<Node, List<TaskInProgress>> cache) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 04:33:55 2011
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.io.InputStreamReader;
import java.io.Writer;
+import java.lang.management.ManagementFactory;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@@ -109,7 +110,10 @@ import org.apache.hadoop.mapreduce.TaskT
import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.security.Credentials;
+import org.mortbay.util.ajax.JSON;
/*******************************************************
* JobTracker is the central location for submitting and
@@ -118,7 +122,8 @@ import org.apache.hadoop.security.Creden
*******************************************************/
public class JobTracker implements MRConstants, InterTrackerProtocol,
JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol,
- RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol {
+ RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol,
+ JobTrackerMXBean {
static{
Configuration.addDefaultResource("mapred-default.xml");
@@ -262,6 +267,9 @@ public class JobTracker implements MRCon
private int nextJobId = 1;
public static final Log LOG = LogFactory.getLog(JobTracker.class);
+
+ static final String CONF_VERSION_KEY = "mapreduce.jobtracker.conf.version";
+ static final String CONF_VERSION_DEFAULT = "default";
public Clock getClock() {
return clock;
@@ -285,6 +293,7 @@ public class JobTracker implements MRCon
public static JobTracker startTracker(JobConf conf, String identifier)
throws IOException, InterruptedException {
+ DefaultMetricsSystem.initialize("JobTracker");
JobTracker result = null;
while (true) {
try {
@@ -309,6 +318,7 @@ public class JobTracker implements MRCon
}
if (result != null) {
JobEndNotifier.startNotifier();
+ MBeans.register("JobTracker", "JobTrackerInfo", result);
}
return result;
}
@@ -1960,7 +1970,31 @@ public class JobTracker implements MRCon
}
}
- private final JobTrackerInstrumentation myInstrumentation;
+ private JobTrackerInstrumentation myInstrumentation;
+
+ private void createInstrumentation() {
+ // Initialize instrumentation
+ JobTrackerInstrumentation tmp;
+ Class<? extends JobTrackerInstrumentation> metricsInst =
+ getInstrumentationClass(conf);
+ LOG.debug("instrumentation class="+ metricsInst);
+ if (metricsInst == null) {
+ myInstrumentation = JobTrackerInstrumentation.create(this, conf);
+ return;
+ }
+ try {
+ java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
+ metricsInst.getConstructor(new Class<?>[]{JobTracker.class,
+ JobConf.class});
+ tmp = c.newInstance(this, conf);
+ } catch (Exception e) {
+ //Reflection can throw lots of exceptions -- handle them all by
+ //falling back on the default.
+ LOG.error("failed to initialize job tracker metrics", e);
+ tmp = JobTrackerInstrumentation.create(this, conf);
+ }
+ myInstrumentation = tmp;
+ }
/////////////////////////////////////////////////////////////////
// The real JobTracker
@@ -2295,21 +2329,7 @@ public class JobTracker implements MRCon
this.trackerIdentifier = identifier;
- // Initialize instrumentation
- JobTrackerInstrumentation tmp;
- Class<? extends JobTrackerInstrumentation> metricsInst =
- getInstrumentationClass(jobConf);
- try {
- java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
- metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
- tmp = c.newInstance(this, jobConf);
- } catch(Exception e) {
- //Reflection can throw lots of exceptions -- handle them all by
- //falling back on the default.
- LOG.error("failed to initialize job tracker metrics", e);
- tmp = new JobTrackerMetricsInst(this, jobConf);
- }
- myInstrumentation = tmp;
+ createInstrumentation();
// The rpc/web-server ports can be ephemeral ports...
// ... ensure we have the correct info
@@ -2507,12 +2527,12 @@ public class JobTracker implements MRCon
return localFs;
}
- public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
- return conf.getClass("mapred.jobtracker.instrumentation",
- JobTrackerMetricsInst.class, JobTrackerInstrumentation.class);
+ static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
+ return conf.getClass("mapred.jobtracker.instrumentation", null,
+ JobTrackerInstrumentation.class);
}
- public static void setInstrumentationClass(Configuration conf, Class<? extends JobTrackerInstrumentation> t) {
+ static void setInstrumentationClass(Configuration conf, Class<? extends JobTrackerInstrumentation> t) {
conf.setClass("mapred.jobtracker.instrumentation",
t, JobTrackerInstrumentation.class);
}
@@ -5208,4 +5228,117 @@ public class JobTracker implements MRCon
return aclsManager;
}
+ // Begin MXBean implementation
+ @Override
+ public String getHostname() {
+ return StringUtils.simpleHostname(getJobTrackerMachine());
+ }
+
+ @Override
+ public String getVersion() {
+ return VersionInfo.getVersion() +", r"+ VersionInfo.getRevision();
+ }
+
+ @Override
+ public String getConfigVersion() {
+ return conf.get(CONF_VERSION_KEY, CONF_VERSION_DEFAULT);
+ }
+
+ @Override
+ public int getThreadCount() {
+ return ManagementFactory.getThreadMXBean().getThreadCount();
+ }
+
+ @Override
+ public String getSummaryJson() {
+ return getSummary().toJson();
+ }
+
+ InfoMap getSummary() {
+ final ClusterMetrics metrics = getClusterMetrics();
+ InfoMap map = new InfoMap();
+ map.put("nodes", metrics.getTaskTrackerCount()
+ + getBlacklistedTrackerCount());
+ map.put("alive", metrics.getTaskTrackerCount());
+ map.put("blacklisted", getBlacklistedTrackerCount());
+ map.put("graylisted", getGraylistedTrackerCount());
+ map.put("slots", new InfoMap() {{
+ put("map_slots", metrics.getMapSlotCapacity());
+ put("map_slots_used", metrics.getOccupiedMapSlots());
+ put("reduce_slots", metrics.getReduceSlotCapacity());
+ put("reduce_slots_used", metrics.getOccupiedReduceSlots());
+ }});
+ map.put("jobs", metrics.getTotalJobSubmissions());
+ return map;
+ }
+
+ @Override
+ public String getAliveNodesInfoJson() {
+ return JSON.toString(getAliveNodesInfo());
+ }
+
+ List<InfoMap> getAliveNodesInfo() {
+ List<InfoMap> info = new ArrayList<InfoMap>();
+ for (final TaskTrackerStatus tts : activeTaskTrackers()) {
+ final int mapSlots = tts.getMaxMapSlots();
+ final int redSlots = tts.getMaxReduceSlots();
+ info.add(new InfoMap() {{
+ put("hostname", tts.getHost());
+ put("last_seen", tts.getLastSeen());
+ put("health", tts.getHealthStatus().isNodeHealthy() ? "OK" : "");
+ put("slots", new InfoMap() {{
+ put("map_slots", mapSlots);
+ put("map_slots_used", mapSlots - tts.getAvailableMapSlots());
+ put("reduce_slots", redSlots);
+ put("reduce_slots_used", redSlots - tts.getAvailableReduceSlots());
+ }});
+ put("failures", tts.getFailures());
+ }});
+ }
+ return info;
+ }
+
+ @Override
+ public String getBlacklistedNodesInfoJson() {
+ return JSON.toString(getUnhealthyNodesInfo(blacklistedTaskTrackers()));
+ }
+
+ @Override
+ public String getGraylistedNodesInfoJson() {
+ return JSON.toString(getUnhealthyNodesInfo(graylistedTaskTrackers()));
+ }
+
+ List<InfoMap> getUnhealthyNodesInfo(Collection<TaskTrackerStatus> list) {
+ List<InfoMap> info = new ArrayList<InfoMap>();
+ for (final TaskTrackerStatus tts : list) {
+ info.add(new InfoMap() {{
+ put("hostname", tts.getHost());
+ put("last_seen", tts.getLastSeen());
+ put("reason", tts.getHealthStatus().getHealthReport());
+ }});
+ }
+ return info;
+ }
+
+ @Override
+ public String getQueueInfoJson() {
+ return getQueueInfo().toJson();
+ }
+
+ InfoMap getQueueInfo() {
+ InfoMap map = new InfoMap();
+ try {
+ for (final JobQueueInfo q : getQueues()) {
+ map.put(q.getQueueName(), new InfoMap() {{
+ put("state", q.getQueueState());
+ put("info", q.getSchedulingInfo());
+ }});
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Getting queue info", e);
+ }
+ return map;
+ }
+ // End MXbean implementaiton
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Fri Mar 4 04:33:55 2011
@@ -17,7 +17,14 @@
*/
package org.apache.hadoop.mapred;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+
class JobTrackerInstrumentation {
+ private static final Log LOG =
+ LogFactory.getLog(JobTrackerInstrumentation.class);
protected final JobTracker tracker;
@@ -168,4 +175,15 @@ class JobTrackerInstrumentation {
public void heartbeat() {
}
+
+ static JobTrackerInstrumentation create(JobTracker jt, JobConf conf) {
+ return create(jt, conf, DefaultMetricsSystem.INSTANCE);
+ }
+
+ static JobTrackerInstrumentation create(JobTracker jt, JobConf conf,
+ MetricsSystem ms) {
+ return ms.register("JobTrackerMetrics", "JobTracker metrics",
+ new JobTrackerMetricsSource(jt, conf));
+ }
+
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMXBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMXBean.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMXBean.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMXBean.java Fri Mar 4 04:33:55 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * The MXBean interface for JobTrackerInfo
+ */
+public interface JobTrackerMXBean {
+
+ /**
+ * @return hostname of the jobtracker
+ */
+ String getHostname();
+
+ /**
+ * @return version of the code base
+ */
+ String getVersion();
+
+ /**
+ * @return the config version (from a config property)
+ */
+ String getConfigVersion();
+
+ /**
+ * @return number of threads of the jobtracker jvm
+ */
+ int getThreadCount();
+
+ /**
+ * @return the summary info in json
+ */
+ String getSummaryJson();
+
+ /**
+ * @return the alive nodes info in json
+ */
+ String getAliveNodesInfoJson();
+
+ /**
+ * @return the blacklisted nodes info in json
+ */
+ String getBlacklistedNodesInfoJson();
+
+ /**
+ * @return the graylisted nodes info in json
+ */
+ String getGraylistedNodesInfoJson();
+
+ /**
+ * @return the queue info json
+ */
+ String getQueueInfoJson();
+
+}