You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/09/05 06:30:51 UTC

svn commit: r1380980 - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/qjournal/server/ src/test/java/org/apache/hadoop/hdfs/qjournal/server/

Author: todd
Date: Wed Sep  5 04:30:51 2012
New Revision: 1380980

URL: http://svn.apache.org/viewvc?rev=1380980&view=rev
Log:
HDFS-3870. Add metrics to JournalNode. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
Modified:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1380980&r1=1380979&r2=1380980&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Wed Sep  5 04:30:51 2012
@@ -42,3 +42,5 @@ HDFS-3863. Track last "committed" txid i
 HDFS-3869. Expose non-file journal manager details in web UI (todd)
 
 HDFS-3884. Journal format() should reset cached values (todd)
+
+HDFS-3870. Add metrics to JournalNode (todd)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1380980&r1=1380979&r2=1380980&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Wed Sep  5 04:30:51 2012
@@ -25,6 +25,7 @@ import java.io.InputStream;
 import java.io.OutputStreamWriter;
 import java.net.URL;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -50,7 +51,9 @@ import org.apache.hadoop.hdfs.util.BestE
 import org.apache.hadoop.hdfs.util.PersistentLongFile;
 import org.apache.hadoop.io.IOUtils;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.Range;
 import com.google.common.collect.Ranges;
 import com.google.protobuf.ByteString;
@@ -70,6 +73,8 @@ class Journal implements Closeable {
   private long curSegmentTxId = HdfsConstants.INVALID_TXID;
   private long nextTxId = HdfsConstants.INVALID_TXID;
   
+  private final String journalId;
+  
   private final JNStorage storage;
 
   /**
@@ -102,12 +107,19 @@ class Journal implements Closeable {
   
   private final FileJournalManager fjm;
 
-  Journal(File logDir, StorageErrorReporter errorReporter) throws IOException {
+  private final JournalMetrics metrics;
+
+
+  Journal(File logDir, String journalId,
+      StorageErrorReporter errorReporter) throws IOException {
     storage = new JNStorage(logDir, errorReporter);
+    this.journalId = journalId;
 
     refreshCachedData();
     
     this.fjm = storage.getJournalManager();
+    
+    this.metrics = JournalMetrics.create(this);
   }
 
   /**
@@ -183,6 +195,10 @@ class Journal implements Closeable {
   JNStorage getStorage() {
     return storage;
   }
+  
+  String getJournalId() {
+    return journalId;
+  }
 
   /**
    * @return the last epoch which this node has promised not to accept
@@ -201,6 +217,11 @@ class Journal implements Closeable {
   synchronized long getCommittedTxnIdForTests() throws IOException {
     return committedTxnId.get();
   }
+  
+  @VisibleForTesting
+  JournalMetrics getMetricsForTests() {
+    return metrics;
+  }
 
   /**
    * Try to create a new epoch for this journal.
@@ -279,13 +300,34 @@ class Journal implements Closeable {
     Preconditions.checkState(nextTxId == firstTxnId,
         "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
     
+    long lastTxnId = firstTxnId + numTxns - 1;
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Writing txid " + firstTxnId + "-" + (firstTxnId + numTxns - 1));
+      LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
     }
     
     curSegment.writeRaw(records, 0, records.length);
     curSegment.setReadyToFlush();
+    Stopwatch sw = new Stopwatch();
+    sw.start();
     curSegment.flush();
+    sw.stop();
+    
+    metrics.addSync(sw.elapsedTime(TimeUnit.MICROSECONDS));
+    
+    if (committedTxnId.get() > lastTxnId) {
+      // This batch of edits has already been committed on a quorum of other
+      // nodes. So, we are in "catch up" mode. This gets its own metric.
+      metrics.batchesWrittenWhileLagging.incr(1);
+      metrics.currentLagTxns.set(committedTxnId.get() - lastTxnId);
+    } else {
+      metrics.currentLagTxns.set(0L);
+    }
+    
+    metrics.batchesWritten.incr(1);
+    metrics.bytesWritten.incr(records.length);
+    metrics.txnsWritten.incr(numTxns);
+    metrics.lastWrittenTxId.set(lastTxnId);
+    
     nextTxId += numTxns;
   }
 

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java?rev=1380980&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java Wed Sep  5 04:30:51 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.IOException;
+
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+
+@Metrics(about="Journal metrics", context="dfs")
+class JournalMetrics {
+  final MetricsRegistry registry = new MetricsRegistry("JournalNode");
+  
+  @Metric("Number of batches written since startup")
+  MutableCounterLong batchesWritten;
+  
+  @Metric("Number of txns written since startup")
+  MutableCounterLong txnsWritten;
+  
+  @Metric("Number of bytes written since startup")
+  MutableCounterLong bytesWritten;
+  
+  @Metric("Number of batches written where this node was lagging")
+  MutableCounterLong batchesWrittenWhileLagging;
+  
+  private final int[] QUANTILE_INTERVALS = new int[] {
+      1*60, // 1m
+      5*60, // 5m
+      60*60 // 1h
+  };
+  
+  MutableQuantiles[] syncsQuantiles;
+  
+  @Metric("Transaction lag behind the most recent commit")
+  MutableGaugeLong currentLagTxns;
+  
+  @Metric("Last written txid")
+  MutableGaugeLong lastWrittenTxId;
+  
+  private final Journal journal;
+
+  JournalMetrics(Journal journal) {
+    this.journal = journal;
+    
+    syncsQuantiles = new MutableQuantiles[QUANTILE_INTERVALS.length];
+    for (int i = 0; i < syncsQuantiles.length; i++) {
+      int interval = QUANTILE_INTERVALS[i];
+      syncsQuantiles[i] = registry.newQuantiles(
+          "syncs" + interval + "s",
+          "Journal sync time", "ops", "latencyMicros", interval);
+    }
+  }
+  
+  public static JournalMetrics create(Journal j) {
+    JournalMetrics m = new JournalMetrics(j);
+    return DefaultMetricsSystem.instance().register(
+        m.getName(), null, m);
+  }
+
+  String getName() {
+    return "Journal-" + journal.getJournalId();
+  }
+
+  @Metric("Current writer's epoch")
+  public long getLastWriterEpoch() {
+    try {
+      return journal.getLastWriterEpoch();
+    } catch (IOException e) {
+      return -1L;
+    }
+  }
+  
+  @Metric("Last accepted epoch")
+  public long getLastPromisedEpoch() {
+    try {
+      return journal.getLastPromisedEpoch();
+    } catch (IOException e) {
+      return -1L;
+    }
+  }
+  
+  void addSync(long us) {
+    for (MutableQuantiles q : syncsQuantiles) {
+      q.add(us);
+    }
+  }
+}

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java?rev=1380980&r1=1380979&r2=1380980&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java Wed Sep  5 04:30:51 2012
@@ -73,7 +73,7 @@ public class JournalNode implements Tool
     if (journal == null) {
       File logDir = getLogDir(jid);
       LOG.info("Initializing journal in directory " + logDir);      
-      journal = new Journal(logDir, new ErrorReporter());
+      journal = new Journal(logDir, jid, new ErrorReporter());
       journalsById.put(jid, journal);
     }
     

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java?rev=1380980&r1=1380979&r2=1380980&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java Wed Sep  5 04:30:51 2012
@@ -61,7 +61,7 @@ public class TestJournal {
   @Before
   public void setup() throws Exception {
     FileUtil.fullyDelete(TEST_LOG_DIR);
-    journal = new Journal(TEST_LOG_DIR, mockErrorReporter);
+    journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
     journal.format(FAKE_NSINFO);
   }
   
@@ -137,7 +137,7 @@ public class TestJournal {
     journal.close(); // close to unlock the storage dir
     
     // Now re-instantiate, make sure history is still there
-    journal = new Journal(TEST_LOG_DIR, mockErrorReporter);
+    journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
     
     // The storage info should be read, even if no writer has taken over.
     assertEquals(storageString,
@@ -189,7 +189,7 @@ public class TestJournal {
 
     journal.newEpoch(FAKE_NSINFO,  1);
     try {
-      new Journal(TEST_LOG_DIR, mockErrorReporter);
+      new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
       fail("Did not fail to create another journal in same dir");
     } catch (IOException ioe) {
       GenericTestUtils.assertExceptionContains(
@@ -200,7 +200,7 @@ public class TestJournal {
     
     // Journal should no longer be locked after the close() call.
     // Hence, should be able to create a new Journal in the same dir.
-    Journal journal2 = new Journal(TEST_LOG_DIR, mockErrorReporter);
+    Journal journal2 = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
     journal2.newEpoch(FAKE_NSINFO, 2);
   }
   
@@ -227,7 +227,7 @@ public class TestJournal {
     // Check that, even if we re-construct the journal by scanning the
     // disk, we don't allow finalizing incorrectly.
     journal.close();
-    journal = new Journal(TEST_LOG_DIR, mockErrorReporter);
+    journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
     
     try {
       journal.finalizeLogSegment(makeRI(4), 1, 6);

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1380980&r1=1380979&r2=1380980&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Wed Sep  5 04:30:51 2012
@@ -40,8 +40,10 @@ import org.apache.hadoop.hdfs.qjournal.p
 import org.apache.hadoop.hdfs.qjournal.server.Journal;
 import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MetricsAsserts;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -86,12 +88,22 @@ public class TestJournalNode {
   
   @Test
   public void testJournal() throws Exception {
+    MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(
+        journal.getMetricsForTests().getName());
+    MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics);
+    MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
+
     IPCLoggerChannel ch = new IPCLoggerChannel(
         conf, FAKE_NSINFO, JID, jn.getBoundIpcAddress());
     ch.newEpoch(1).get();
     ch.setEpoch(1);
     ch.startLogSegment(1).get();
     ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get();
+    
+    metrics = MetricsAsserts.getMetrics(
+        journal.getMetricsForTests().getName());
+    MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics);
+    MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
   }