You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:34:00 UTC

svn commit: r1077597 [4/6] - in /hadoop/common/branches/branch-0.20-security-patches: ./ conf/ ivy/ src/core/org/apache/hadoop/ipc/ src/core/org/apache/hadoop/ipc/metrics/ src/core/org/apache/hadoop/log/ src/core/org/apache/hadoop/metrics/ src/core/org...

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/util/TryIterator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/util/TryIterator.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/util/TryIterator.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/metrics2/util/TryIterator.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.util;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * A base class for unmodifiable iterators (throws on remove)
+ *
+ * This class also makes writing filtering iterators easier, where the only
+ * way to discover the end of data is by trying to read it. The same applies
+ * to writing iterator wrappers around stream read calls.
+ *
+ * One only needs to implement the tryNext() method and call done() when done.
+ *
+ * @param <T> the type of the iterator
+ */
+public abstract class TryIterator<T> implements Iterator<T> {
+
+  enum State {
+    PENDING,  // Ready to tryNext().
+    GOT_NEXT, // Got the next element from tryNext() and yet to return it.
+    DONE,     // Done/finished.
+    FAILED,   // An exception occurred in the last op.
+  }
+
+  private State state = State.PENDING;
+  private T next;
+
+  /**
+   * Return the next element. Must call {@link #done()} when done, otherwise
+   * infinite loop could occur. If this method throws an exception, any
+   * further attempts to use the iterator would result in an
+   * {@link IllegalStateException}.
+   *
+   * @return the next element if there is one or return {@link #done()}
+   */
+  protected abstract T tryNext();
+
+  /**
+   * Implementations of {@link #tryNext} <b>must</b> call this method
+   * when there are no more elements left in the iteration.
+   *
+   * @return  null as a convenience to implement {@link #tryNext()}
+   */
+  protected final T done() {
+    state = State.DONE;
+    return null;
+  }
+
+  /**
+   * @return  true if we have a next element or false otherwise.
+   */
+  public final boolean hasNext() {
+    if (state == State.FAILED)
+      throw new IllegalStateException();
+
+    switch (state) {
+      case DONE:      return false;
+      case GOT_NEXT:  return true;
+      default:
+    }
+
+    // handle tryNext
+    state = State.FAILED; // just in case
+    next = tryNext();
+
+    if (state != State.DONE) {
+      state = State.GOT_NEXT;
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * @return  the next element if we have one.
+   */
+  public final T next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    state = State.PENDING;
+    return next;
+  }
+
+  /**
+   * @return the current element without advancing the iterator
+   */
+  public final T current() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    return next;
+  }
+
+  /**
+   * Guaranteed to throw UnsupportedOperationException
+   */
+  public final void remove() {
+    throw new UnsupportedOperationException("Not allowed.");
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UgiInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UgiInstrumentation.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UgiInstrumentation.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UgiInstrumentation.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricMutableStat;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+
+class UgiInstrumentation implements MetricsSource {
+
+  final MetricsRegistry registry = new MetricsRegistry("ugi");
+  final MetricMutableStat loginSuccess = registry.newStat("loginSuccess");
+  final MetricMutableStat loginFailure = registry.newStat("loginFailure");
+
+  @Override
+  public void getMetrics(MetricsBuilder builder, boolean all) {
+    registry.snapshot(builder.addRecord(registry.name()), all);
+  }
+
+  void addLoginSuccess(long elapsed) {
+    loginSuccess.add(elapsed);
+  }
+
+  void addLoginFailure(long elapsed) {
+    loginFailure.add(elapsed);
+  }
+
+  static UgiInstrumentation create(Configuration conf) {
+    return create(conf, DefaultMetricsSystem.INSTANCE);
+  }
+
+  static UgiInstrumentation create(Configuration conf, MetricsSystem ms) {
+    return ms.register("ugi", "User/group metrics", new UgiInstrumentation());
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/UserGroupInformation.java Fri Mar  4 04:33:55 2011
@@ -50,13 +50,6 @@ import javax.security.auth.spi.LoginModu
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
-import org.apache.hadoop.metrics.util.MetricsBase;
-import org.apache.hadoop.metrics.util.MetricsRegistry;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -79,41 +72,6 @@ public class UserGroupInformation {
    */
   private static final float TICKET_RENEW_WINDOW = 0.80f;
   
-  /** 
-   * UgiMetrics maintains UGI activity statistics
-   * and publishes them through the metrics interfaces.
-   */
-  static class UgiMetrics implements Updater {
-    final MetricsTimeVaryingRate loginSuccess;
-    final MetricsTimeVaryingRate loginFailure;
-    private final MetricsRecord metricsRecord;
-    private final MetricsRegistry registry;
-
-    UgiMetrics() {
-      registry = new MetricsRegistry();
-      loginSuccess = new MetricsTimeVaryingRate("loginSuccess", registry,
-          "Rate of successful kerberos logins and time taken in milliseconds");
-      loginFailure = new MetricsTimeVaryingRate("loginFailure", registry,
-          "Rate of failed kerberos logins and time taken in milliseconds");
-      final MetricsContext metricsContext = MetricsUtil.getContext("ugi");
-      metricsRecord = MetricsUtil.createRecord(metricsContext, "ugi");
-      metricsContext.registerUpdater(this);
-    }
-
-    /**
-     * Push the metrics to the monitoring subsystem on doUpdate() call.
-     */
-    @Override
-    public void doUpdates(final MetricsContext context) {
-      synchronized (this) {
-        for (MetricsBase m : registry.getMetricsList()) {
-          m.pushMetric(metricsRecord);
-        }
-      }
-      metricsRecord.update();
-    }
-  }
-  
   /**
    * A login module that looks at the Kerberos, Unix, or Windows principal and
    * adds the corresponding UserName.
@@ -175,7 +133,7 @@ public class UserGroupInformation {
   }
 
   /** Metrics to track UGI activity */
-  static UgiMetrics metrics = new UgiMetrics();
+  static UgiInstrumentation metrics;
   /** Are the static variables that depend on configuration initialized? */
   private static boolean isInitialized = false;
   /** Should we use Kerberos configuration? */
@@ -235,6 +193,7 @@ public class UserGroupInformation {
     }
     isInitialized = true;
     UserGroupInformation.conf = conf;
+    metrics = UgiInstrumentation.create(conf);
   }
 
   /**
@@ -583,13 +542,13 @@ public class UserGroupInformation {
         new LoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, subject);
       start = System.currentTimeMillis();
       login.login();
-      metrics.loginSuccess.inc(System.currentTimeMillis() - start);
+      metrics.addLoginSuccess(System.currentTimeMillis() - start);
       loginUser = new UserGroupInformation(subject);
       loginUser.setLogin(login);
       loginUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
     } catch (LoginException le) {
       if (start > 0) {
-        metrics.loginFailure.inc(System.currentTimeMillis() - start);
+        metrics.addLoginFailure(System.currentTimeMillis() - start);
       }
       throw new IOException("Login failure for " + user + " from keytab " + 
                             path, le);
@@ -667,7 +626,7 @@ public class UserGroupInformation {
        
       start = System.currentTimeMillis();
       login.login();
-      metrics.loginSuccess.inc(System.currentTimeMillis() - start);
+      metrics.addLoginSuccess(System.currentTimeMillis() - start);
       UserGroupInformation newLoginUser = new UserGroupInformation(subject);
       newLoginUser.setLogin(login);
       newLoginUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
@@ -675,7 +634,7 @@ public class UserGroupInformation {
       return newLoginUser;
     } catch (LoginException le) {
       if (start > 0) {
-        metrics.loginFailure.inc(System.currentTimeMillis() - start);
+        metrics.addLoginFailure(System.currentTimeMillis() - start);
       }
       throw new IOException("Login failure for " + user + " from keytab " + 
                             path, le);
@@ -722,11 +681,11 @@ public class UserGroupInformation {
       LOG.info("Initiating re-login for " + keytabPrincipal);
       start = System.currentTimeMillis();
       login.login();
-      metrics.loginSuccess.inc(System.currentTimeMillis() - start);
+      metrics.addLoginSuccess(System.currentTimeMillis() - start);
       setLogin(login);
     } catch (LoginException le) {
       if (start > 0) {
-        metrics.loginFailure.inc(System.currentTimeMillis() - start);
+        metrics.addLoginFailure(System.currentTimeMillis() - start);
       }
       throw new IOException("Login failure for " + keytabPrincipal + 
           " from keytab " + keytabFile, le);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/StringUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/StringUtils.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/StringUtils.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/StringUtils.java Fri Mar  4 04:33:55 2011
@@ -695,9 +695,32 @@ public class StringUtils {
    *
    * @param separator Separator to join with.
    * @param strings Strings to join.
+   * @return  the joined string
    */
   public static String join(CharSequence separator, Iterable<String> strings) {
-    StringBuffer sb = new StringBuffer();
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    for (String s : strings) {
+      if (first) {
+        first = false;
+      } else {
+        sb.append(separator);
+      }
+      sb.append(s);
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Concatenates strings, using a separator.
+   *
+   * @param separator to join with
+   * @param strings to join
+   * @return  the joined string
+   */
+  public static String join(CharSequence separator, String[] strings) {
+    // Ideally we don't have to duplicate the code here if array is iterable.
+    StringBuilder sb = new StringBuilder();
     boolean first = true;
     for (String s : strings) {
       if (first) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Mar  4 04:33:55 2011
@@ -469,7 +469,7 @@ class BlockReceiver implements java.io.C
           } else {
             checksumOut.write(pktBuf, checksumOff, checksumLen);
           }
-          datanode.myMetrics.bytesWritten.inc(len);
+          datanode.myMetrics.incrBytesWritten(len);
         }
       } catch (IOException iex) {
         datanode.checkDiskError(iex);
@@ -553,7 +553,7 @@ class BlockReceiver implements java.io.C
         // Finalize the block. Does this fsync()?
         block.setNumBytes(offsetInBlock);
         datanode.data.finalizeBlock(block);
-        datanode.myMetrics.blocksWritten.inc();
+        datanode.myMetrics.incrBlocksWritten();
       }
 
     } catch (IOException ioe) {
@@ -810,7 +810,7 @@ class BlockReceiver implements java.io.C
                 final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
                 block.setNumBytes(receiver.offsetInBlock);
                 datanode.data.finalizeBlock(block);
-                datanode.myMetrics.blocksWritten.inc();
+                datanode.myMetrics.incrBlocksWritten();
                 datanode.notifyNamenodeReceivedBlock(block, 
                     DataNode.EMPTY_DEL_HINT);
                 if (ClientTraceLog.isInfoEnabled() &&
@@ -944,7 +944,7 @@ class BlockReceiver implements java.io.C
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
               block.setNumBytes(receiver.offsetInBlock);
               datanode.data.finalizeBlock(block);
-              datanode.myMetrics.blocksWritten.inc();
+              datanode.myMetrics.incrBlocksWritten();
               datanode.notifyNamenodeReceivedBlock(block, 
                   DataNode.EMPTY_DEL_HINT);
               if (ClientTraceLog.isInfoEnabled() &&

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Fri Mar  4 04:33:55 2011
@@ -459,13 +459,13 @@ class DataBlockScanner implements Runnab
                  StringUtils.stringifyException(e));
         
         if (second) {
-          datanode.getMetrics().blockVerificationFailures.inc(); 
+          datanode.getMetrics().incrBlockVerificationFailures();
           handleScanFailure(block);
           return;
         } 
       } finally {
         IOUtils.closeStream(blockSender);
-        datanode.getMetrics().blocksVerified.inc();
+        datanode.getMetrics().incrBlocksVerified();
         totalScans++;
         totalVerifications++;
       }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Mar  4 04:33:55 2011
@@ -73,7 +73,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
-import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeInstrumentation;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
 import org.apache.hadoop.hdfs.server.namenode.JspHelper;
@@ -95,6 +95,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
@@ -205,7 +206,7 @@ public class DataNode extends Configured
   long heartBeatInterval;
   private DataStorage storage = null;
   private HttpServer infoServer = null;
-  DataNodeMetrics myMetrics;
+  DataNodeInstrumentation myMetrics;
   private static InetSocketAddress nameNodeAddr;
   private InetSocketAddress selfAddr;
   private static DataNode datanodeObject = null;
@@ -260,7 +261,6 @@ public class DataNode extends Configured
   DataNode(final Configuration conf,
            final AbstractList<File> dataDirs, SecureResources resources) throws IOException {
     super(conf);
-    UserGroupInformation.setConfiguration(conf);
     SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, 
         DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);
 
@@ -440,7 +440,8 @@ public class DataNode extends Configured
     this.infoServer.start();
     // adjust info port
     this.dnRegistration.setInfoPort(this.infoServer.getPort());
-    myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
+    myMetrics = DataNodeInstrumentation.create(conf,
+                                               dnRegistration.getStorageID());
     
     // set service-level authorization security policy
     if (conf.getBoolean(
@@ -566,7 +567,7 @@ public class DataNode extends Configured
     return selfAddr;
   }
     
-  DataNodeMetrics getMetrics() {
+  DataNodeInstrumentation getMetrics() {
     return myMetrics;
   }
   
@@ -848,7 +849,7 @@ public class DataNode extends Configured
                                                        data.getRemaining(),
                                                        xmitsInProgress.get(),
                                                        getXceiverCount());
-          myMetrics.heartbeats.inc(now() - startTime);
+          myMetrics.addHeartBeat(now() - startTime);
           //LOG.info("Just sent heartbeat, with name " + localName);
           if (!processCommand(cmds))
             continue;
@@ -899,7 +900,7 @@ public class DataNode extends Configured
           DatanodeCommand cmd = namenode.blockReport(dnRegistration,
                   BlockListAsLongs.convertToArrayLongs(bReport));
           long brTime = now() - brStartTime;
-          myMetrics.blockReports.inc(brTime);
+          myMetrics.addBlockReport(brTime);
           LOG.info("BlockReport of " + bReport.length +
               " blocks got processed in " + brTime + " msecs");
           //
@@ -996,7 +997,7 @@ public class DataNode extends Configured
     case DatanodeProtocol.DNA_TRANSFER:
       // Send a copy of a block to another datanode
       transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
-      myMetrics.blocksReplicated.inc(bcmd.getBlocks().length);
+      myMetrics.incrBlocksReplicated(bcmd.getBlocks().length);
       break;
     case DatanodeProtocol.DNA_INVALIDATE:
       //
@@ -1013,7 +1014,7 @@ public class DataNode extends Configured
         checkDiskError();
         throw e;
       }
-      myMetrics.blocksRemoved.inc(toDelete.length);
+      myMetrics.incrBlocksRemoved(toDelete.length);
       break;
     case DatanodeProtocol.DNA_SHUTDOWN:
       // shut down the data node
@@ -1404,6 +1405,7 @@ public class DataNode extends Configured
     String[] dataDirs = conf.getStrings(DATA_DIR_KEY);
     dnThreadName = "DataNode: [" +
                         StringUtils.arrayToString(dataDirs) + "]";
+    DefaultMetricsSystem.initialize("DataNode");
     return makeInstance(dataDirs, conf, resources);
   }
 
@@ -1449,6 +1451,7 @@ public class DataNode extends Configured
    */
   public static DataNode makeInstance(String[] dataDirs, Configuration conf, 
       SecureResources resources) throws IOException {
+    UserGroupInformation.setConfiguration(conf);
     LocalFileSystem localFS = FileSystem.getLocal(conf);
     ArrayList<File> dirs = new ArrayList<File>();
     FsPermission dataDirPermission = 
@@ -1612,7 +1615,7 @@ public class DataNode extends Configured
     data.updateBlock(oldblock, newblock);
     if (finalize) {
       data.finalizeBlock(newblock);
-      myMetrics.blocksWritten.inc(); 
+      myMetrics.incrBlocksWritten();
       notifyNamenodeReceivedBlock(newblock, EMPTY_DEL_HINT);
       LOG.info("Received block " + newblock +
                 " of size " + newblock.getNumBytes() +

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Mar  4 04:33:55 2011
@@ -97,32 +97,32 @@ class DataXceiver implements Runnable, F
       switch ( op ) {
       case DataTransferProtocol.OP_READ_BLOCK:
         readBlock( in );
-        datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
+        datanode.myMetrics.addReadBlockOp(DataNode.now() - startTime);
         if (local)
-          datanode.myMetrics.readsFromLocalClient.inc();
+          datanode.myMetrics.incrReadsFromLocalClient();
         else
-          datanode.myMetrics.readsFromRemoteClient.inc();
+          datanode.myMetrics.incrReadsFromRemoteClient();
         break;
       case DataTransferProtocol.OP_WRITE_BLOCK:
         writeBlock( in );
-        datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
+        datanode.myMetrics.addWriteBlockOp(DataNode.now() - startTime);
         if (local)
-          datanode.myMetrics.writesFromLocalClient.inc();
+          datanode.myMetrics.incrWritesFromLocalClient();
         else
-          datanode.myMetrics.writesFromRemoteClient.inc();
+          datanode.myMetrics.incrWritesFromRemoteClient();
         break;
       case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
         replaceBlock(in);
-        datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
+        datanode.myMetrics.addReplaceBlockOp(DataNode.now() - startTime);
         break;
       case DataTransferProtocol.OP_COPY_BLOCK:
             // for balancing purpose; send to a proxy source
         copyBlock(in);
-        datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
+        datanode.myMetrics.addCopyBlockOp(DataNode.now() - startTime);
         break;
       case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
         getBlockChecksum(in);
-        datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
+        datanode.myMetrics.addBlockChecksumOp(DataNode.now() - startTime);
         break;
       default:
         throw new IOException("Unknown opcode " + op + " in data stream");
@@ -207,11 +207,11 @@ class DataXceiver implements Runnable, F
         } catch (IOException ignored) {}
       }
       
-      datanode.myMetrics.bytesRead.inc((int) read);
-      datanode.myMetrics.blocksRead.inc();
+      datanode.myMetrics.incrBytesRead((int) read);
+      datanode.myMetrics.incrBlocksRead();
     } catch ( SocketException ignored ) {
       // Its ok for remote side to close the connection anytime.
-      datanode.myMetrics.blocksRead.inc();
+      datanode.myMetrics.incrBlocksRead();
     } catch ( IOException ioe ) {
       /* What exactly should we do here?
        * Earlier version shutdown() datanode if there is disk error.
@@ -540,8 +540,8 @@ class DataXceiver implements Runnable, F
       long read = blockSender.sendBlock(reply, baseStream, 
                                         dataXceiverServer.balanceThrottler);
 
-      datanode.myMetrics.bytesRead.inc((int) read);
-      datanode.myMetrics.blocksRead.inc();
+      datanode.myMetrics.incrBytesRead((int) read);
+      datanode.myMetrics.incrBlocksRead();
       
       LOG.info("Copied block " + block + " to " + s.getRemoteSocketAddress());
     } catch (IOException ioe) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Fri Mar  4 04:33:55 2011
@@ -43,9 +43,9 @@ import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
-import org.apache.hadoop.metrics.util.MBeanUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -1553,7 +1553,7 @@ public class FSDataset implements FSCons
     }
     try {
       bean = new StandardMBean(this,FSDatasetMBean.class);
-      mbeanName = MBeanUtil.registerMBean("DataNode", "FSDatasetState-" + storageName, bean);
+      mbeanName = MBeans.register("DataNode", "FSDatasetState-" + storageName, bean);
     } catch (NotCompliantMBeanException e) {
       e.printStackTrace();
     }
@@ -1563,7 +1563,7 @@ public class FSDataset implements FSCons
 
   public void shutdown() {
     if (mbeanName != null)
-      MBeanUtil.unregisterMBean(mbeanName);
+      MBeans.unregister(mbeanName);
     
     if(volumes != null) {
       for (FSVolume volume : volumes.volumes) {

Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeInstrumentation.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MetricMutableStat;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.source.JvmMetricsSource;
+
+public class DataNodeInstrumentation implements MetricsSource {
+
+  final MetricsRegistry registry = new MetricsRegistry("datanode");
+
+  final MetricMutableCounterLong bytesWritten =
+      registry.newCounter("bytes_written", "", 0L);
+  final MetricMutableCounterLong bytesRead =
+      registry.newCounter("bytes_read", "", 0L);
+  final MetricMutableCounterInt blocksWritten =
+      registry.newCounter("blocks_written", "", 0);
+  final MetricMutableCounterInt blocksRead =
+      registry.newCounter("blocks_read", "", 0);
+  final MetricMutableCounterInt blocksReplicated =
+      registry.newCounter("blocks_replicated", "", 0);
+  final MetricMutableCounterInt blocksRemoved =
+      registry.newCounter("blocks_removed", "", 0);
+  final MetricMutableCounterInt blocksVerified =
+      registry.newCounter("blocks_verified", "", 0);
+  final MetricMutableCounterInt blockVerificationFailures =
+      registry.newCounter("block_verification_failures", "", 0);
+
+  final MetricMutableCounterInt readsFromLocalClient =
+      registry.newCounter("reads_from_local_client", "", 0);
+  final MetricMutableCounterInt readsFromRemoteClient =
+      registry.newCounter("reads_from_remote_client", "", 0);
+  final MetricMutableCounterInt writesFromLocalClient =
+      registry.newCounter("writes_from_local_client", "", 0);
+  final MetricMutableCounterInt writesFromRemoteClient =
+      registry.newCounter("writes_from_remote_client", "", 0);
+
+  final MetricMutableStat readBlockOp = registry.newStat("readBlockOp");
+  final MetricMutableStat writeBlockOp = registry.newStat("writeBlockOp");
+  final MetricMutableStat blockChecksumOp = registry.newStat("blockChecksumOp");
+  final MetricMutableStat copyBlockOp = registry.newStat("copyBlockOp");
+  final MetricMutableStat replaceBlockOp = registry.newStat("replaceBlockOp");
+  final MetricMutableStat heartbeats = registry.newStat("heartBeats");
+  final MetricMutableStat blockReports = registry.newStat("blockReports");
+
+
+  public DataNodeInstrumentation(Configuration conf, String storageId) {
+    String sessionId = conf.get("session.id");
+    JvmMetricsSource.create("DataNode", sessionId);
+    registry.setContext("dfs").tag("sessionId", "", sessionId);
+  }
+
+  public void shutdown() {
+    // metrics system shutdown would suffice
+  }
+
+  public void resetAllMinMax() {
+    readBlockOp.resetMinMax();
+    writeBlockOp.resetMinMax();
+    blockChecksumOp.resetMinMax();
+    copyBlockOp.resetMinMax();
+    replaceBlockOp.resetMinMax();
+    heartbeats.resetMinMax();
+    blockReports.resetMinMax();
+  }
+
+  public void addHeartBeat(long latency) {
+    heartbeats.add(latency);
+  }
+
+  public void addBlockReport(long latency) {
+    blockReports.add(latency);
+  }
+
+  public void incrBlocksReplicated(int delta) {
+    blocksReplicated.incr(delta);
+  }
+
+  public void incrBlocksWritten() {
+    blocksWritten.incr();
+  }
+
+  public void incrBlocksRemoved(int delta) {
+    blocksRemoved.incr(delta);
+  }
+
+  public void incrBytesWritten(int delta) {
+    bytesWritten.incr(delta);
+  }
+
+  public void incrBlockVerificationFailures() {
+    blockVerificationFailures.incr();
+  }
+
+  public void incrBlocksVerified() {
+    blocksVerified.incr();
+  }
+
+  public void addReadBlockOp(long latency) {
+    readBlockOp.add(latency);
+  }
+
+  public void incrReadsFromLocalClient() {
+    readsFromLocalClient.incr();
+  }
+
+  public void incrReadsFromRemoteClient() {
+    readsFromRemoteClient.incr();
+  }
+
+  public void addWriteBlockOp(long latency) {
+    writeBlockOp.add(latency);
+  }
+
+  public void incrWritesFromLocalClient() {
+    writesFromLocalClient.incr();
+  }
+
+  public void incrWritesFromRemoteClient() {
+    writesFromRemoteClient.incr();
+  }
+
+  public void addReplaceBlockOp(long latency) {
+    replaceBlockOp.add(latency);
+  }
+
+  public void addCopyBlockOp(long latency) {
+    copyBlockOp.add(latency);
+  }
+
+  public void addBlockChecksumOp(long latency) {
+    blockChecksumOp.add(latency);
+  }
+
+  public void incrBytesRead(int delta) {
+    bytesRead.incr(delta);
+  }
+
+  public void incrBlocksRead() {
+    blocksRead.incr();
+  }
+
+  public void getMetrics(MetricsBuilder builder, boolean all) {
+    registry.snapshot(builder.addRecord(registry.name()), all);
+  }
+
+  public static DataNodeInstrumentation create(Configuration conf,
+                                               String storageID) {
+    return create(conf, storageID, DefaultMetricsSystem.INSTANCE);
+  }
+
+  public static DataNodeInstrumentation create(Configuration conf,
+                                               String storageID,
+                                               MetricsSystem ms) {
+    return ms.register("DataNode", "DataNode metrics",
+                       new DataNodeInstrumentation(conf, storageID));
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Mar  4 04:33:55 2011
@@ -115,7 +115,7 @@ class FSDirectory implements FSConstants
 
   private void incrDeletedFileCount(int count) {
     if (namesystem != null)
-      NameNode.getNameNodeMetrics().numFilesDeleted.inc(count);
+      NameNode.getNameNodeMetrics().incrFilesDeleted(count);
   }
     
   /**
@@ -977,7 +977,7 @@ class FSDirectory implements FSConstants
         // Directory creation also count towards FilesCreated
         // to match count of FilesDeleted metric. 
         if (namesystem != null)
-          NameNode.getNameNodeMetrics().numFilesCreated.inc();
+          NameNode.getNameNodeMetrics().incrNumFilesCreated();
         fsImage.getEditLog().logMkDir(cur, inodes[i]);
         NameNode.stateChangeLog.debug(
             "DIR* FSDirectory.mkdirs: created directory " + cur);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Mar  4 04:33:55 2011
@@ -41,7 +41,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeInstrumentation;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -96,7 +96,7 @@ public class FSEditLog {
   private long numTransactions;        // number of transactions
   private long numTransactionsBatchedInSync;
   private long totalTimeTransactions;  // total time for all transactions
-  private NameNodeMetrics metrics;
+  private NameNodeInstrumentation metrics;
 
   private static class TransactionId {
     public long txid;
@@ -915,7 +915,7 @@ public class FSEditLog {
     numTransactions++;
     totalTimeTransactions += (end-start);
     if (metrics != null) // Metrics is non-null only when used inside name node
-      metrics.transactions.inc((end-start));
+      metrics.addTransaction(end-start);
   }
 
   //
@@ -948,7 +948,7 @@ public class FSEditLog {
       if (mytxid <= synctxid) {
         numTransactionsBatchedInSync++;
         if (metrics != null) // Metrics is non-null only when used inside name node
-          metrics.transactionsBatchedInSync.inc();
+          metrics.incrTransactionsBatchedInSync();
         return;
       }
    
@@ -990,7 +990,7 @@ public class FSEditLog {
     }
 
     if (metrics != null) // Metrics is non-null only when used inside name node
-      metrics.syncs.inc(elapsed);
+      metrics.addSync(elapsed);
   }
 
   //

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Mar  4 04:33:55 2011
@@ -32,7 +32,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
-import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -42,7 +41,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.util.*;
-import org.apache.hadoop.metrics.util.MBeanUtil;
+import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.CachedDNSToSwitchMapping;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
@@ -143,7 +142,6 @@ public class FSNamesystem implements FSC
   private String supergroup;
   private PermissionStatus defaultPermission;
   // FSNamesystemMetrics counter variables
-  private FSNamesystemMetrics myFSMetrics;
   private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
   private int totalLoad = 0;
   boolean isAccessTokenEnabled;
@@ -345,8 +343,7 @@ public class FSNamesystem implements FSC
                          getNamespaceEditsDirs(conf), startOpt);
     long timeTakenToLoadFSImage = now() - systemStart;
     LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
-    NameNode.getNameNodeMetrics().fsImageLoadTime.set(
-                              (int) timeTakenToLoadFSImage);
+    NameNode.getNameNodeMetrics().setFsImageLoadTime(timeTakenToLoadFSImage);
     this.safeMode = new SafeModeInfo(conf);
     setBlockTotal();
     pendingReplications = new PendingReplicationBlocks(
@@ -3080,7 +3077,7 @@ public class FSNamesystem implements FSC
           + " does not belong to any file.");
       addToInvalidates(b, node);
     }
-    NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
+    NameNode.getNameNodeMetrics().addBlockReport(now() - startTime);
   }
 
   /**
@@ -4283,7 +4280,7 @@ public class FSNamesystem implements FSC
       long timeInSafemode = now() - systemStart;
       NameNode.stateChangeLog.info("STATE* Leaving safe mode after " 
                                     + timeInSafemode/1000 + " secs.");
-      NameNode.getNameNodeMetrics().safeModeTime.set((int) timeInSafemode);
+      NameNode.getNameNodeMetrics().setSafeModeTime(timeInSafemode);
       
       if (reached >= 0) {
         NameNode.stateChangeLog.info("STATE* Safe mode is OFF."); 
@@ -4777,21 +4774,13 @@ public class FSNamesystem implements FSC
     // package naming for mbeans and their impl.
     StandardMBean bean;
     try {
-      myFSMetrics = new FSNamesystemMetrics(conf);
       bean = new StandardMBean(this,FSNamesystemMBean.class);
-      mbeanName = MBeanUtil.registerMBean("NameNode", "FSNamesystemState", bean);
+      mbeanName = MBeans.register("NameNode", "FSNamesystemState", bean);
     } catch (NotCompliantMBeanException e) {
       e.printStackTrace();
     }
 
-    LOG.info("Registered FSNamesystemStatusMBean");
-  }
-
-  /**
-   * get FSNamesystemMetrics
-   */
-  public FSNamesystemMetrics getFSNamesystemMetrics() {
-    return myFSMetrics;
+    LOG.info("Registered FSNamesystemStateMBean");
   }
 
   /**
@@ -4799,7 +4788,7 @@ public class FSNamesystem implements FSC
    */
   public void shutdown() {
     if (mbeanName != null)
-      MBeanUtil.unregisterMBean(mbeanName);
+      MBeans.unregister(mbeanName);
   }
   
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Mar  4 04:33:55 2011
@@ -32,7 +32,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.CompleteFileStatus;
-import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeInstrumentation;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -66,6 +66,7 @@ import java.net.*;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 import java.util.Iterator;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 
 /**********************************************************
  * NameNode serves as both directory namespace manager and
@@ -161,13 +162,13 @@ public class NameNode implements ClientP
     format(conf, false);
   }
 
-  static NameNodeMetrics myMetrics;
+  static NameNodeInstrumentation myMetrics;
 
   public FSNamesystem getNamesystem() {
     return namesystem;
   }
 
-  public static NameNodeMetrics getNameNodeMetrics() {
+  public static NameNodeInstrumentation getNameNodeMetrics() {
     return myMetrics;
   }
   
@@ -249,7 +250,7 @@ public class NameNode implements ClientP
       ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider());
     }
     
-    myMetrics = new NameNodeMetrics(conf, this);
+    myMetrics = NameNodeInstrumentation.create(conf);
     this.namesystem = new FSNamesystem(this, conf);
 
     if (UserGroupInformation.isSecurityEnabled()) {
@@ -517,7 +518,7 @@ public class NameNode implements ClientP
   public LocatedBlocks   getBlockLocations(String src, 
                                           long offset, 
                                           long length) throws IOException {
-    myMetrics.numGetBlockLocations.inc();
+    myMetrics.incrNumGetBlockLocations();
     return namesystem.getBlockLocations(getClientMachine(), 
                                         src, offset, length);
   }
@@ -531,7 +532,7 @@ public class NameNode implements ClientP
                                          long offset, 
                                          long length)
       throws IOException {
-    myMetrics.numGetBlockLocations.inc();
+    myMetrics.incrNumGetBlockLocations();
     return namesystem.getBlockLocations(src, offset, length, false);
   }
   
@@ -564,8 +565,8 @@ public class NameNode implements ClientP
         new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(),
             null, masked),
         clientName, clientMachine, overwrite, replication, blockSize);
-    myMetrics.numFilesCreated.inc();
-    myMetrics.numCreateFileOps.inc();
+    myMetrics.incrNumFilesCreated();
+    myMetrics.incrNumCreateFileOps();
   }
 
   /** {@inheritDoc} */
@@ -576,7 +577,7 @@ public class NameNode implements ClientP
           +src+" for "+clientName+" at "+clientMachine);
     }
     LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
-    myMetrics.numFilesAppended.inc();
+    myMetrics.incrNumFilesAppended();
     return info;
   }
 
@@ -607,7 +608,7 @@ public class NameNode implements ClientP
                          +src+" for "+clientName);
     LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, clientName);
     if (locatedBlock != null)
-      myMetrics.numAddBlockOps.inc();
+      myMetrics.incrNumAddBlockOps();
     return locatedBlock;
   }
 
@@ -682,7 +683,7 @@ public class NameNode implements ClientP
     }
     boolean ret = namesystem.renameTo(src, dst);
     if (ret) {
-      myMetrics.numFilesRenamed.inc();
+      myMetrics.incrNumFilesRenamed();
     }
     return ret;
   }
@@ -702,7 +703,7 @@ public class NameNode implements ClientP
     }
     boolean ret = namesystem.delete(src, recursive);
     if (ret) 
-      myMetrics.numDeleteFileOps.inc();
+      myMetrics.incrNumDeleteFileOps();
     return ret;
   }
 
@@ -740,9 +741,9 @@ public class NameNode implements ClientP
   public DirectoryListing getListing(String src, byte[] startAfter)
   throws IOException {
     DirectoryListing files = namesystem.getListing(src, startAfter);
-    myMetrics.numGetListingOps.inc();
+    myMetrics.incrNumGetListingOps();
     if (files != null) {
-      myMetrics.numFilesInGetListingOps.inc(files.getPartialListing().length);
+      myMetrics.incrNumFilesInGetListingOps(files.getPartialListing().length);
     }
     return files;
   }
@@ -755,7 +756,7 @@ public class NameNode implements ClientP
    *         or null if file not found
    */
   public HdfsFileStatus getFileInfo(String src)  throws IOException {
-    myMetrics.numFileInfoOps.inc();
+    myMetrics.incrNumFileInfoOps();
     return namesystem.getFileInfo(src);
   }
 
@@ -1161,7 +1162,7 @@ public class NameNode implements ClientP
         System.exit(aborted ? 1 : 0);
       default:
     }
-
+    DefaultMetricsSystem.initialize("NameNode");
     NameNode namenode = new NameNode(conf);
     return namenode;
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Fri Mar  4 04:33:55 2011
@@ -40,7 +40,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics2.source.JvmMetricsSource;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Krb5AndCertsSslSocketConnector;
 import org.apache.hadoop.security.SecurityUtil;
@@ -154,7 +154,7 @@ public class SecondaryNameNode implement
           infoBindAddress);
     }
     // initiate Java VM metrics
-    JvmMetrics.init("SecondaryNameNode", conf.get("session.id"));
+    JvmMetricsSource.create("SecondaryNameNode", conf.get("session.id"));
     
     // Create connection to the namenode.
     shouldRun = true;

Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeInstrumentation.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeInstrumentation.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeInstrumentation.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.metrics;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MetricMutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MetricMutableStat;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.source.JvmMetricsSource;
+
+public class NameNodeInstrumentation implements MetricsSource {
+  static final Log LOG = LogFactory.getLog(NameNodeInstrumentation.class);
+
+  static final String FSNAMESYSTEM_RECORD_NAME = "FSNamesystem";
+  final String sessionId;
+  final MetricsRegistry registry = new MetricsRegistry("namenode");
+  final MetricMutableCounterInt numFilesCreated =
+      registry.newCounter("FilesCreated", "", 0);
+  final MetricMutableCounterInt numFilesAppended =
+      registry.newCounter("FilesAppended", "", 0);
+  final MetricMutableCounterInt numGetBlockLocations =
+      registry.newCounter("GetBlockLocations", "", 0);
+  final MetricMutableCounterInt numFilesRenamed =
+      registry.newCounter("FilesRenamed", "", 0);
+  final MetricMutableCounterInt numGetListingOps =
+      registry.newCounter("GetListingOps", "", 0);
+  final MetricMutableCounterInt numCreateFileOps =
+      registry.newCounter("CreateFileOps", "", 0);
+  final MetricMutableCounterInt numFilesDeleted =
+      registry.newCounter("FilesDeleted", "Files deleted (inc. rename)", 0);
+  final MetricMutableCounterInt numDeleteFileOps =
+      registry.newCounter("DeleteFileOps", "", 0);
+  final MetricMutableCounterInt numFileInfoOps =
+      registry.newCounter("FileInfoOps", "", 0);
+  final MetricMutableCounterInt numAddBlockOps =
+      registry.newCounter("AddBlockOps", "", 0);
+  final MetricMutableStat transactions = registry.newStat("Transactions");
+  final MetricMutableStat syncs = registry.newStat("Syncs");
+  final MetricMutableCounterInt transactionsBatchedInSync =
+      registry.newCounter("JournalTransactionsBatchedInSync", "", 0);
+  final MetricMutableStat blockReport = registry.newStat("blockReport");
+  final MetricMutableGaugeInt safeModeTime =
+      registry.newGauge("SafemodeTime", "Time spent in safe mode", 0);
+  final MetricMutableGaugeInt fsImageLoadTime =
+      registry.newGauge("fsImageLoadTime", "", 0);
+  final MetricMutableCounterInt numFilesInGetListingOps =
+      registry.newCounter("FilesInGetListingOps", "", 0);
+
+  final MetricsSource fsNamesystemMetrics;
+
+  NameNodeInstrumentation(Configuration conf) {
+    sessionId = conf.get("session.id");
+    fsNamesystemMetrics = new FSNamesystemMetrics();
+    JvmMetricsSource.create("NameNode", sessionId);
+    registry.setContext("dfs").tag("sessionId", "", sessionId);
+  }
+
+  public static NameNodeInstrumentation create(Configuration conf) {
+    return create(conf, DefaultMetricsSystem.INSTANCE);
+  }
+
+  /**
+   * Create a v2 metrics instrumentation
+   * @param conf  the configuration object
+   * @param ms  the metrics system instance
+   * @return a metrics
+   */
+  public static NameNodeInstrumentation create(Configuration conf,
+                                               MetricsSystem ms) {
+    NameNodeInstrumentation v2 = new NameNodeInstrumentation(conf);
+    ms.register("FSNamesystemState", "FS name system state",
+                v2.fsNamesystemMetrics());
+    return ms.register("NameNode", "NameNode metrics", v2);
+  }
+
+  public MetricsSource fsNamesystemMetrics() {
+    return fsNamesystemMetrics;
+  }
+
+  public void shutdown() {
+    // metrics system shutdown would suffice
+  }
+
+
+  public final void incrNumGetBlockLocations() {
+    numGetBlockLocations.incr();
+  }
+
+  
+  public final void incrNumFilesCreated() {
+    numFilesCreated.incr();
+  }
+
+  
+  public final void incrNumCreateFileOps() {
+    numCreateFileOps.incr();
+  }
+
+  
+  public final void incrNumFilesAppended() {
+    numFilesAppended.incr();
+  }
+
+  
+  public final void incrNumAddBlockOps() {
+    numAddBlockOps.incr();
+  }
+
+  
+  public final void incrNumFilesRenamed() {
+    numFilesRenamed.incr();
+  }
+
+  
+  public void incrFilesDeleted(int delta) {
+    numFilesDeleted.incr(delta);
+  }
+
+  
+  public final void incrNumDeleteFileOps() {
+    numDeleteFileOps.incr();
+  }
+
+  
+  public final void incrNumGetListingOps() {
+    numGetListingOps.incr();
+  }
+
+  
+  public final void incrNumFilesInGetListingOps(int delta) {
+    numFilesInGetListingOps.incr(delta);
+  }
+
+  
+  public final void incrNumFileInfoOps() {
+    numFileInfoOps.incr();
+  }
+
+  
+  public final void addTransaction(long latency) {
+    transactions.add(latency);
+  }
+
+  
+  public final void incrTransactionsBatchedInSync() {
+    transactionsBatchedInSync.incr();
+  }
+
+  
+  public final void addSync(long elapsed) {
+    syncs.add(elapsed);
+  }
+
+  
+  public final void setFsImageLoadTime(long elapsed) {
+    fsImageLoadTime.set((int) elapsed);
+  }
+
+  
+  public final void addBlockReport(long latency) {
+    blockReport.add(latency);
+  }
+
+  
+  public final void setSafeModeTime(long elapsed) {
+    safeModeTime.set((int) elapsed);
+  }
+
+  public void getMetrics(MetricsBuilder builder, boolean all) {
+    registry.snapshot(builder.addRecord(registry.name()), all);
+  }
+
+  private static int roundBytesToGBytes(long bytes) {
+    return Math.round(((float)bytes/(1024 * 1024 * 1024)));
+  }
+
+  private class FSNamesystemMetrics implements MetricsSource {
+
+    public void getMetrics(MetricsBuilder builder, boolean all) {
+      // Since fsnamesystem metrics are poll based, we just put them here
+      // to avoid an extra copy per metric.
+      FSNamesystem fsNamesystem = FSNamesystem.getFSNamesystem();
+      if (fsNamesystem == null) {
+        LOG.debug("FSNamesystem not ready yet!");
+        return;
+      }
+      builder.addRecord(FSNAMESYSTEM_RECORD_NAME).setContext("dfs")
+        .tag("sessionId", "", sessionId)
+        .addGauge("FilesTotal", "", fsNamesystem.getFilesTotal())
+        .addGauge("BlocksTotal", "", fsNamesystem.getBlocksTotal())
+        .addGauge("CapacityTotalGB", "",
+                  roundBytesToGBytes(fsNamesystem.getCapacityTotal()))
+        .addGauge("CapacityUsedGB", "",
+                  roundBytesToGBytes(fsNamesystem.getCapacityUsed()))
+        .addGauge("CapacityRemainingGB", "",
+                  roundBytesToGBytes(fsNamesystem.getCapacityRemaining()))
+        .addGauge("TotalLoad", "", fsNamesystem.getTotalLoad())
+        .addGauge("CorruptBlocks", "", fsNamesystem.getCorruptReplicaBlocks())
+        .addGauge("ExcessBlocks", "", fsNamesystem.getExcessBlocks())
+        .addGauge("PendingDeletionBlocks", "",
+                  fsNamesystem.getPendingDeletionBlocks())
+        .addGauge("PendingReplicationBlocks", "",
+                  fsNamesystem.getPendingReplicationBlocks())
+        .addGauge("UnderReplicatedBlocks", "",
+                  fsNamesystem.getUnderReplicatedBlocks())
+        .addGauge("ScheduledReplicationBlocks", "",
+                  fsNamesystem.getScheduledReplicationBlocks())
+        .addGauge("MissingBlocks", "", fsNamesystem.getMissingBlocksCount())
+        .addGauge("BlockCapacity", "", fsNamesystem.getBlockCapacity());
+    }
+
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar  4 04:33:55 2011
@@ -176,12 +176,14 @@
   </description>
 </property>
 
+<!--
 <property>
   <name>mapred.tasktracker.instrumentation</name>
-  <value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value>
+  <value>com.example.hadoop.TaskTrackerInstrumentation</value>
   <description>Expert: The instrumentation class to associate with each TaskTracker.
   </description>
 </property>
+-->
 
 <property>
   <name>mapred.tasktracker.memory_calculator_plugin</name>
@@ -387,12 +389,14 @@
   </description>
 </property>
 
+<!--
 <property>
   <name>mapred.jobtracker.instrumentation</name>
-  <value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value>
+  <value>com.example.hadoop.JobTrackerInstrumentation</value>
   <description>Expert: The instrumentation class to associate with each JobTracker.
   </description>
 </property>
+-->
 
 <property>
   <name>mapred.child.java.opts</name>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar  4 04:33:55 2011
@@ -34,9 +34,9 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetricsSource;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -68,6 +68,7 @@ class Child {
     final int SLEEP_LONGER_COUNT = 5;
     int jvmIdInt = Integer.parseInt(args[4]);
     JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
+    String prefix = firstTaskid.isMap() ? "MapTask" : "ReduceTask";
 
     // file name is passed thru env
     String jobTokenFile = 
@@ -144,7 +145,7 @@ class Child {
     Task task = null;
     
     UserGroupInformation childUGI = null;
-    
+
     try {
       while (true) {
         taskid = null;
@@ -198,7 +199,8 @@ class Child {
         task.setConf(job);
 
         // Initiate Java VM metrics
-        JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
+        initMetrics(prefix, jvmId.toString(), job.getSessionId());
+
         LOG.debug("Creating remote user to execute task: " + job.get("user.name"));
         childUGI = UserGroupInformation.createRemoteUser(job.get("user.name"));
         // Add tokens to new user so that it may execute its task correctly.
@@ -268,12 +270,22 @@ class Child {
       }
     } finally {
       RPC.stopProxy(umbilical);
-      MetricsContext metricsContext = MetricsUtil.getContext("mapred");
-      metricsContext.close();
+      shutdownMetrics();
       // Shutting down log4j of the child-vm... 
       // This assumes that on return from Task.run() 
       // there is no more logging done.
       LogManager.shutdown();
     }
   }
+
+  private static void initMetrics(String prefix, String procName,
+                                  String sessionId) {
+    DefaultMetricsSystem.initialize(prefix);  
+    JvmMetricsSource.create(procName, sessionId);
+  }
+
+  private static void shutdownMetrics() {
+    DefaultMetricsSystem.INSTANCE.shutdown();
+  }
+
 }

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InfoMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InfoMap.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InfoMap.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InfoMap.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.LinkedHashMap;
+import org.mortbay.util.ajax.JSON;
+
+class InfoMap extends LinkedHashMap<String, Object> {
+
+  private static final long serialVersionUID = 1L;
+
+  String toJson() {
+    return JSON.toString(this);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 04:33:55 2011
@@ -57,9 +57,6 @@ import org.apache.hadoop.mapreduce.serve
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
@@ -275,8 +272,6 @@ public class JobInProgress {
   }
   private Counters jobCounters = new Counters();
   
-  private MetricsRecord jobMetrics;
-  
   // Maximum no. of fetch-failure notifications after which
   // the map task is killed
   private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
@@ -441,12 +436,6 @@ public class JobInProgress {
 
       this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
 
-      MetricsContext metricsContext = MetricsUtil.getContext("mapred");
-      this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
-      this.jobMetrics.setTag("user", conf.getUser());
-      this.jobMetrics.setTag("sessionId", conf.getSessionId());
-      this.jobMetrics.setTag("jobName", conf.getJobName());
-      this.jobMetrics.setTag("jobId", jobId.toString());
       hasSpeculativeMaps = conf.getMapSpeculativeExecution();
       hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
       // a limit on the input size of the reduce.
@@ -477,34 +466,12 @@ public class JobInProgress {
       FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
     }
   }
-
-  /**
-   * Called periodically by JobTrackerMetrics to update the metrics for
-   * this job.
-   */
-  public void updateMetrics() {
-    Counters counters = getCounters();
-    for (Counters.Group group : counters) {
-      jobMetrics.setTag("group", group.getDisplayName());
-      for (Counters.Counter counter : group) {
-        jobMetrics.setTag("counter", counter.getDisplayName());
-        jobMetrics.setMetric("value", (float) counter.getCounter());
-        jobMetrics.update();
-      }
-    }
-  }
     
   /**
    * Called when the job is complete
    */
   public void cleanUpMetrics() {
-    // Deletes all metric data for this job (in internal table in metrics package).
-    // This frees up RAM and possibly saves network bandwidth, since otherwise
-    // the metrics package implementation might continue to send these job metrics
-    // after the job has finished.
-    jobMetrics.removeTag("group");
-    jobMetrics.removeTag("counter");
-    jobMetrics.remove();
+    // per job metrics is disabled for now.
   }
     
   private void printCache (Map<Node, List<TaskInProgress>> cache) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 04:33:55 2011
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.InputStreamReader;
 import java.io.Writer;
+import java.lang.management.ManagementFactory;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
@@ -109,7 +110,10 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.security.Credentials;
+import org.mortbay.util.ajax.JSON;
 
 /*******************************************************
  * JobTracker is the central location for submitting and 
@@ -118,7 +122,8 @@ import org.apache.hadoop.security.Creden
  *******************************************************/
 public class JobTracker implements MRConstants, InterTrackerProtocol,
     JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol,
-    RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol {
+    RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol,
+    JobTrackerMXBean {
 
   static{
     Configuration.addDefaultResource("mapred-default.xml");
@@ -262,6 +267,9 @@ public class JobTracker implements MRCon
   private int nextJobId = 1;
 
   public static final Log LOG = LogFactory.getLog(JobTracker.class);
+
+  static final String CONF_VERSION_KEY = "mapreduce.jobtracker.conf.version";
+  static final String CONF_VERSION_DEFAULT = "default";
   
   public Clock getClock() {
     return clock;
@@ -285,6 +293,7 @@ public class JobTracker implements MRCon
   
   public static JobTracker startTracker(JobConf conf, String identifier) 
   throws IOException, InterruptedException {
+    DefaultMetricsSystem.initialize("JobTracker");
     JobTracker result = null;
     while (true) {
       try {
@@ -309,6 +318,7 @@ public class JobTracker implements MRCon
     }
     if (result != null) {
       JobEndNotifier.startNotifier();
+      MBeans.register("JobTracker", "JobTrackerInfo", result);
     }
     return result;
   }
@@ -1960,7 +1970,31 @@ public class JobTracker implements MRCon
     }
   }
 
-  private final JobTrackerInstrumentation myInstrumentation;
+  private JobTrackerInstrumentation myInstrumentation;
+
+  private void createInstrumentation() {
+    // Initialize instrumentation
+    JobTrackerInstrumentation tmp;
+    Class<? extends JobTrackerInstrumentation> metricsInst =
+        getInstrumentationClass(conf);
+    LOG.debug("instrumentation class="+ metricsInst);
+    if (metricsInst == null) {
+      myInstrumentation = JobTrackerInstrumentation.create(this, conf);
+      return;
+    }
+    try {
+      java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
+          metricsInst.getConstructor(new Class<?>[]{JobTracker.class,
+          JobConf.class});
+      tmp = c.newInstance(this, conf);
+    } catch (Exception e) {
+      //Reflection can throw lots of exceptions -- handle them all by
+      //falling back on the default.
+      LOG.error("failed to initialize job tracker metrics", e);
+      tmp = JobTrackerInstrumentation.create(this, conf);
+    }
+    myInstrumentation = tmp;
+  }
     
   /////////////////////////////////////////////////////////////////
   // The real JobTracker
@@ -2295,21 +2329,7 @@ public class JobTracker implements MRCon
     
     this.trackerIdentifier = identifier;
 
-    // Initialize instrumentation
-    JobTrackerInstrumentation tmp;
-    Class<? extends JobTrackerInstrumentation> metricsInst =
-      getInstrumentationClass(jobConf);
-    try {
-      java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
-        metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
-      tmp = c.newInstance(this, jobConf);
-    } catch(Exception e) {
-      //Reflection can throw lots of exceptions -- handle them all by 
-      //falling back on the default.
-      LOG.error("failed to initialize job tracker metrics", e);
-      tmp = new JobTrackerMetricsInst(this, jobConf);
-    }
-    myInstrumentation = tmp;
+    createInstrumentation();
     
     // The rpc/web-server ports can be ephemeral ports... 
     // ... ensure we have the correct info
@@ -2507,12 +2527,12 @@ public class JobTracker implements MRCon
     return localFs;
   }
 
-  public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
-    return conf.getClass("mapred.jobtracker.instrumentation",
-        JobTrackerMetricsInst.class, JobTrackerInstrumentation.class);
+  static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
+    return conf.getClass("mapred.jobtracker.instrumentation", null,
+                         JobTrackerInstrumentation.class);
   }
   
-  public static void setInstrumentationClass(Configuration conf, Class<? extends JobTrackerInstrumentation> t) {
+  static void setInstrumentationClass(Configuration conf, Class<? extends JobTrackerInstrumentation> t) {
     conf.setClass("mapred.jobtracker.instrumentation",
         t, JobTrackerInstrumentation.class);
   }
@@ -5208,4 +5228,117 @@ public class JobTracker implements MRCon
     return aclsManager;
   }
 
+  // Begin MXBean implementation
+  @Override
+  public String getHostname() {
+    return StringUtils.simpleHostname(getJobTrackerMachine());
+  }
+
+  @Override
+  public String getVersion() {
+    return VersionInfo.getVersion() +", r"+ VersionInfo.getRevision();
+  }
+
+  @Override
+  public String getConfigVersion() {
+    return conf.get(CONF_VERSION_KEY, CONF_VERSION_DEFAULT);
+  }
+
+  @Override
+  public int getThreadCount() {
+    return ManagementFactory.getThreadMXBean().getThreadCount();
+  }
+
+  @Override
+  public String getSummaryJson() {
+    return getSummary().toJson();
+  }
+
+  InfoMap getSummary() {
+    final ClusterMetrics metrics = getClusterMetrics();
+    InfoMap map = new InfoMap();
+    map.put("nodes", metrics.getTaskTrackerCount()
+            + getBlacklistedTrackerCount());
+    map.put("alive", metrics.getTaskTrackerCount());
+    map.put("blacklisted", getBlacklistedTrackerCount());
+    map.put("graylisted", getGraylistedTrackerCount());
+    map.put("slots", new InfoMap() {{
+      put("map_slots", metrics.getMapSlotCapacity());
+      put("map_slots_used", metrics.getOccupiedMapSlots());
+      put("reduce_slots", metrics.getReduceSlotCapacity());
+      put("reduce_slots_used", metrics.getOccupiedReduceSlots());
+    }});
+    map.put("jobs", metrics.getTotalJobSubmissions());
+    return map;
+  }
+
+  @Override
+  public String getAliveNodesInfoJson() {
+    return JSON.toString(getAliveNodesInfo());
+  }
+
+  List<InfoMap> getAliveNodesInfo() {
+    List<InfoMap> info = new ArrayList<InfoMap>();
+    for (final TaskTrackerStatus  tts : activeTaskTrackers()) {
+      final int mapSlots = tts.getMaxMapSlots();
+      final int redSlots = tts.getMaxReduceSlots();
+      info.add(new InfoMap() {{
+        put("hostname", tts.getHost());
+        put("last_seen", tts.getLastSeen());
+        put("health", tts.getHealthStatus().isNodeHealthy() ? "OK" : "");
+        put("slots", new InfoMap() {{
+          put("map_slots", mapSlots);
+          put("map_slots_used", mapSlots - tts.getAvailableMapSlots());
+          put("reduce_slots", redSlots);
+          put("reduce_slots_used", redSlots - tts.getAvailableReduceSlots());
+        }});
+        put("failures", tts.getFailures());
+      }});
+    }
+    return info;
+  }
+
+  @Override
+  public String getBlacklistedNodesInfoJson() {
+    return JSON.toString(getUnhealthyNodesInfo(blacklistedTaskTrackers()));
+  }
+
+  @Override
+  public String getGraylistedNodesInfoJson() {
+    return JSON.toString(getUnhealthyNodesInfo(graylistedTaskTrackers()));
+  }
+
+  List<InfoMap> getUnhealthyNodesInfo(Collection<TaskTrackerStatus> list) {
+    List<InfoMap> info = new ArrayList<InfoMap>();
+    for (final TaskTrackerStatus tts : list) {
+      info.add(new InfoMap() {{
+        put("hostname", tts.getHost());
+        put("last_seen", tts.getLastSeen());
+        put("reason", tts.getHealthStatus().getHealthReport());
+      }});
+    }
+    return info;
+  }
+  
+  @Override
+  public String getQueueInfoJson() {
+    return getQueueInfo().toJson();
+  }
+
+  InfoMap getQueueInfo() {
+    InfoMap map = new InfoMap();
+    try {
+      for (final JobQueueInfo q : getQueues()) {
+        map.put(q.getQueueName(), new InfoMap() {{
+          put("state", q.getQueueState());
+          put("info", q.getSchedulingInfo());
+        }});
+      }
+    }
+    catch (Exception e) {
+      throw new RuntimeException("Getting queue info", e);
+    }
+    return map;
+  }
+  // End MXbean implementaiton
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=1077597&r1=1077596&r2=1077597&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Fri Mar  4 04:33:55 2011
@@ -17,7 +17,14 @@
  */
 package org.apache.hadoop.mapred;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+
 class JobTrackerInstrumentation {
+  private static final Log LOG =
+      LogFactory.getLog(JobTrackerInstrumentation.class);
 
   protected final JobTracker tracker;
   
@@ -168,4 +175,15 @@ class JobTrackerInstrumentation {
 
   public void heartbeat() {
   }
+
+  static JobTrackerInstrumentation create(JobTracker jt, JobConf conf) {
+    return create(jt, conf, DefaultMetricsSystem.INSTANCE);
+  }
+
+  static JobTrackerInstrumentation create(JobTracker jt, JobConf conf,
+                                          MetricsSystem ms) {
+    return ms.register("JobTrackerMetrics", "JobTracker metrics",
+                       new JobTrackerMetricsSource(jt, conf));
+  }
+
 }

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMXBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMXBean.java?rev=1077597&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMXBean.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMXBean.java Fri Mar  4 04:33:55 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * The MXBean interface for JobTrackerInfo
+ */
+public interface JobTrackerMXBean {
+
+  /**
+   * @return hostname of the jobtracker
+   */
+  String getHostname();
+
+  /**
+   * @return version of the code base
+   */
+  String getVersion();
+
+  /**
+   * @return the config version (from a config property)
+   */
+  String getConfigVersion();
+
+  /**
+   * @return number of threads of the jobtracker jvm
+   */
+  int getThreadCount();
+
+  /**
+   * @return the summary info in json
+   */
+  String getSummaryJson();
+
+  /**
+   * @return the alive nodes info in json
+   */
+  String getAliveNodesInfoJson();
+
+  /**
+   * @return the blacklisted nodes info in json
+   */
+  String getBlacklistedNodesInfoJson();
+
+  /**
+   * @return the graylisted nodes info in json
+   */
+  String getGraylistedNodesInfoJson();
+
+  /**
+   * @return the queue info json
+   */
+  String getQueueInfoJson();
+
+}