You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2009/05/16 21:29:35 UTC

svn commit: r775515 - /hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java

Author: apurtell
Date: Sat May 16 19:29:34 2009
New Revision: 775515

URL: http://svn.apache.org/viewvc?rev=775515&view=rev
Log:
add missing hunk from last commit

Modified:
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=775515&r1=775514&r2=775515&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sat May 16 19:29:34 2009
@@ -104,7 +104,6 @@
   private final Path dir;
   private final Configuration conf;
   private final LogRollListener listener;
-  private final int maxlogentries;
   private final long optionalFlushInterval;
   private final long blocksize;
   private final int flushlogentries;
@@ -132,11 +131,17 @@
 
   private final AtomicLong logSeqNum = new AtomicLong(0);
 
-  private volatile long filenum = 0;
+  private volatile long filenum = -1;
   private volatile long old_filenum = -1;
   
   private final AtomicInteger numEntries = new AtomicInteger(0);
 
+  // Size of edits written so far. Used figuring when to rotate logs.
+  private final AtomicLong editsSize = new AtomicLong(0);
+
+  // If > than this size, roll the log.
+  private final long logrollsize;
+
   // This lock prevents starting a log roll during a cache flush.
   // synchronized is insufficient because a cache flush spans two method calls.
   private final Lock cacheFlushLock = new ReentrantLock();
@@ -144,7 +149,9 @@
   // We synchronize on updateLock to prevent updates and to prevent a log roll
   // during an update
   private final Object updateLock = new Object();
-  
+
+  private final boolean enabled;
+
   /*
    * If more than this many logs, force flush of oldest region to oldest edit
    * goes to disk.  If too many and we crash, then will take forever replaying.
@@ -182,12 +189,13 @@
     this.dir = dir;
     this.conf = conf;
     this.listener = listener;
-    this.maxlogentries =
-      conf.getInt("hbase.regionserver.maxlogentries", 100000);
     this.flushlogentries =
       conf.getInt("hbase.regionserver.flushlogentries", 100);
     this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
       this.fs.getDefaultBlockSize());
+    // Roll at 95% of block size.
+    float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
+    this.logrollsize = (long)(this.blocksize * multi);
     this.optionalFlushInterval =
       conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
     this.lastLogFlushTime = System.currentTimeMillis();
@@ -196,15 +204,16 @@
     }
     fs.mkdirs(dir);
     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 64);
+    this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
     LOG.info("HLog configuration: blocksize=" + this.blocksize +
-      ", maxlogentries=" + this.maxlogentries + ", flushlogentries=" +
-      this.flushlogentries + ", optionallogflushinternal=" +
-      this.optionalFlushInterval + "ms");
+      ", rollsize=" + this.logrollsize +
+      ", enabled=" + this.enabled +
+      ", flushlogentries=" + this.flushlogentries +
+      ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
     rollWriter();
   }
 
   /**
-   * Accessor for tests. Not a part of the public API.
    * @return Current state of the monotonically increasing file id.
    */
   public long getFilenum() {
@@ -212,17 +221,13 @@
   }
 
   /**
-   * Get the compression type for the hlog files.
-   * Commit logs SHOULD NOT be compressed.  You'll lose edits if the compression
-   * record is not complete.  In gzip, record is 32k so you could lose up to
-   * 32k of edits (All of this is moot till we have sync/flush in hdfs but
-   * still...).
+   * Get the compression type for the hlog files
    * @param c Configuration to use.
    * @return the kind of compression to use
    */
   private static CompressionType getCompressionType(final Configuration c) {
-    String name = c.get("hbase.io.seqfile.compression.type");
-    return name == null? CompressionType.NONE: CompressionType.valueOf(name);
+    // Compression makes no sense for commit log.  Always return NONE.
+    return CompressionType.NONE;
   }
 
   /**
@@ -277,23 +282,24 @@
       }
       synchronized (updateLock) {
         // Clean up current writer.
-        Path oldFile = cleanupCurrentWriter();
-        // Create a new one.
-        this.old_filenum = this.filenum;
+        Path oldFile = cleanupCurrentWriter(this.filenum);
+        if (this.filenum >= 0) {
+          this.old_filenum = this.filenum;
+        }
         this.filenum = System.currentTimeMillis();
         Path newPath = computeFilename(this.filenum);
-
         this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
           HLogKey.class, KeyValue.class,
           fs.getConf().getInt("io.file.buffer.size", 4096),
           fs.getDefaultReplication(), this.blocksize,
           SequenceFile.CompressionType.NONE, new DefaultCodec(), null,
           new Metadata());
-
         LOG.info((oldFile != null?
-          "Closed " + oldFile + ", entries=" + this.numEntries.get() + ". ": "") +
-          "New log writer: " + FSUtils.getPath(newPath));
-
+            "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
+            this.numEntries.get() +
+            ", calcsize=" + this.editsSize.get() + ", filesize=" +
+            this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
+          "New hlog " + FSUtils.getPath(newPath));
         // Can we delete any of the old log files?
         if (this.outputfiles.size() > 0) {
           if (this.lastSeqWritten.size() <= 0) {
@@ -310,6 +316,7 @@
           }
         }
         this.numEntries.set(0);
+        this.editsSize.set(0);
         updateLock.notifyAll();
       }
     } finally {
@@ -337,7 +344,7 @@
     if (LOG.isDebugEnabled()) {
       // Find region associated with oldest key -- helps debugging.
       oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
-      LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
+      LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove " +
         " out of total " + this.outputfiles.size() + "; " +
         "oldest outstanding seqnum is " + oldestOutstandingSeqNum +
         " from region " + Bytes.toString(oldestRegion));
@@ -351,7 +358,7 @@
     if (countOfLogs > this.maxLogs) {
       regionToFlush = oldestRegion != null?
         oldestRegion: getOldestRegion(oldestOutstandingSeqNum);
-      LOG.info("Too many logs: logs=" + countOfLogs + ", maxlogs=" +
+      LOG.info("Too many hlogs: logs=" + countOfLogs + ", maxlogs=" +
         this.maxLogs + "; forcing flush of region with oldest edits: " +
         Bytes.toString(regionToFlush));
     }
@@ -382,7 +389,8 @@
    * @return Path to current writer or null if none.
    * @throws IOException
    */
-  private Path cleanupCurrentWriter() throws IOException {
+  private Path cleanupCurrentWriter(final long currentfilenum)
+  throws IOException {
     Path oldFile = null;
     if (this.writer != null) {
       // Close the current writer, get a new one.
@@ -393,12 +401,12 @@
         // shut ourselves down to minimize loss.  Alternative is to try and
         // keep going.  See HBASE-930.
         FailedLogCloseException flce =
-          new FailedLogCloseException("#" + this.filenum);
+          new FailedLogCloseException("#" + currentfilenum);
         flce.initCause(e);
         throw e; 
       }
-      oldFile = computeFilename(old_filenum);
-      if (filenum > 0) {
+      if (currentfilenum >= 0) {
+        oldFile = computeFilename(currentfilenum);
         this.outputfiles.put(Long.valueOf(this.logSeqNum.get() - 1), oldFile);
       }
     }
@@ -406,7 +414,7 @@
   }
 
   private void deleteLogFile(final Path p, final Long seqno) throws IOException {
-    LOG.info("removing old log file " + FSUtils.getPath(p) +
+    LOG.info("removing old hlog file " + FSUtils.getPath(p) +
       " whose highest sequence/edit id is " + seqno);
     this.fs.delete(p, true);
   }
@@ -418,6 +426,7 @@
    * @return Path
    */
   public Path computeFilename(final long fn) {
+    if (fn < 0) return null;
     return new Path(dir, HLOG_DATFILE + fn);
   }
 
@@ -442,7 +451,7 @@
       synchronized (updateLock) {
         this.closed = true;
         if (LOG.isDebugEnabled()) {
-          LOG.debug("closing log writer in " + this.dir.toString());
+          LOG.debug("closing hlog writer in " + this.dir.toString());
         }
         this.writer.close();
         updateLock.notifyAll();
@@ -457,11 +466,12 @@
    * 
    * @param regionInfo
    * @param logEdit
+   * @param now
    * @throws IOException
    */
-  public void append(HRegionInfo regionInfo, KeyValue logEdit)
+  public void append(HRegionInfo regionInfo, KeyValue logEdit, final long now)
   throws IOException {
-    this.append(regionInfo, new byte[0], logEdit);
+    this.append(regionInfo, new byte[0], logEdit, now);
   }
 
   /** Append an entry to the log.
@@ -469,9 +479,11 @@
    * @param regionInfo
    * @param row
    * @param logEdit
+   * @param now Time of this edit write.
    * @throws IOException
    */
-  public void append(HRegionInfo regionInfo, byte [] row, KeyValue logEdit)
+  public void append(HRegionInfo regionInfo, byte [] row, KeyValue logEdit,
+    final long now)
   throws IOException {
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
@@ -485,14 +497,13 @@
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
       this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
-      HLogKey logKey = new HLogKey(regionName, tableName, seqNum);
+      HLogKey logKey = new HLogKey(regionName, tableName, seqNum, now);
       boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion();
-      doWrite(logKey, logEdit, sync);
+      doWrite(logKey, logEdit, sync, now);
       this.numEntries.incrementAndGet();
       updateLock.notifyAll();
     }
-
-    if (this.numEntries.get() > this.maxlogentries) {
+    if (this.editsSize.get() > this.logrollsize) {
       if (listener != null) {
         listener.logRollRequested();
       }
@@ -520,10 +531,11 @@
    * @param tableName
    * @param edits
    * @param sync
+   * @param now
    * @throws IOException
    */
   void append(byte [] regionName, byte [] tableName, List<KeyValue> edits,
-    boolean sync)
+    boolean sync, final long now)
   throws IOException {
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
@@ -537,13 +549,14 @@
       this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0]));
       int counter = 0;
       for (KeyValue kv: edits) {
-        HLogKey logKey = new HLogKey(regionName, tableName, seqNum[counter++]);
-        doWrite(logKey, kv, sync);
+        HLogKey logKey =
+          new HLogKey(regionName, tableName, seqNum[counter++], now);
+        doWrite(logKey, kv, sync, now);
         this.numEntries.incrementAndGet();
       }
       updateLock.notifyAll();
     }
-    if (this.numEntries.get() > this.maxlogentries) {
+    if (this.editsSize.get() > this.logrollsize) {
         requestLogRoll();
     }
   }
@@ -558,19 +571,19 @@
     if (!this.closed) {
       long now = System.currentTimeMillis();
       synchronized (updateLock) {
-        if (((now - this.optionalFlushInterval) >
-            this.lastLogFlushTime) && this.unflushedEntries.get() > 0) {
+        if (((now - this.optionalFlushInterval) > this.lastLogFlushTime) &&
+            this.unflushedEntries.get() > 0) {
           try {
             sync();
           } catch (IOException e) {
-            LOG.error("Error flushing HLog", e);
+            LOG.error("Error flushing hlog", e);
           }
         }
       }
       long took = System.currentTimeMillis() - now;
       if (took > 1000) {
         LOG.warn(Thread.currentThread().getName() + " took " + took +
-          "ms optional sync'ing HLog; editcount=" + this.numEntries.get());
+          "ms optional sync'ing hlog; editcount=" + this.numEntries.get());
       }
     }
   }
@@ -581,10 +594,14 @@
     }
   }
   
-  private void doWrite(HLogKey logKey, KeyValue logEdit, boolean sync)
+  private void doWrite(HLogKey logKey, KeyValue logEdit, boolean sync,
+      final long now)
   throws IOException {
+    if (!this.enabled) {
+      return;
+    }
     try {
-      long now = System.currentTimeMillis();
+      this.editsSize.addAndGet(logKey.heapSize() + logEdit.heapSize());
       this.writer.append(logKey, logEdit);
       if (sync || this.unflushedEntries.incrementAndGet() >= flushlogentries) {
         sync();
@@ -592,10 +609,10 @@
       long took = System.currentTimeMillis() - now;
       if (took > 1000) {
         LOG.warn(Thread.currentThread().getName() + " took " + took +
-          "ms appending an edit to HLog; editcount=" + this.numEntries.get());
+          "ms appending an edit to hlog; editcount=" + this.numEntries.get());
       }
     } catch (IOException e) {
-      LOG.fatal("Could not append. Requesting close of log", e);
+      LOG.fatal("Could not append. Requesting close of hlog", e);
       requestLogRoll();
       throw e;
     }
@@ -667,8 +684,8 @@
         return;
       }
       synchronized (updateLock) {
-        this.writer.append(new HLogKey(regionName, tableName, logSeqId),
-          completeCacheFlushLogEdit());
+        this.writer.append(new HLogKey(regionName, tableName, logSeqId,
+          System.currentTimeMillis()), completeCacheFlushLogEdit());
         this.numEntries.incrementAndGet();
         Long seq = this.lastSeqWritten.get(regionName);
         if (seq != null && logSeqId >= seq.longValue()) {
@@ -729,7 +746,7 @@
       // Nothing to do
       return;
     }
-    LOG.info("Splitting " + logfiles.length + " log(s) in " +
+    LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
       srcDir.toString());
     splitLog(rootDir, logfiles, fs, conf);
     try {
@@ -741,7 +758,7 @@
       throw io;
     }
     long endMillis = System.currentTimeMillis();
-    LOG.info("log file splitting completed in " + (endMillis - millis) +
+    LOG.info("hlog file splitting completed in " + (endMillis - millis) +
         " millis for " + srcDir.toString());
   }
   
@@ -762,8 +779,8 @@
     try {
       for (int i = 0; i < logfiles.length; i++) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length + ": " +
-            logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
+          LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
+            ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
         }
         // Check for possibly empty file. With appends, currently Hadoop reports
         // a zero length even if the file has been sync'd. Revisit if 
@@ -777,7 +794,7 @@
           try {
             int count = 0;
             while (in.next(key, val)) {
-              byte[] regionName = key.getRegionName();
+              byte [] regionName = key.getRegionName();
               LinkedList<HLogEntry> queue = logEntries.get(regionName);
               if (queue == null) {
                 queue = new LinkedList<HLogEntry>();
@@ -787,7 +804,8 @@
               queue.push(new HLogEntry(val, key));
               count++;
             }
-            LOG.debug("Pushed " + count + " entries");
+            LOG.debug("Pushed " + count + " entries from " +
+              logfiles[i].getPath());
           } catch (IOException e) {
             e = RemoteExceptionHandler.checkIOException(e);
             if (!(e instanceof EOFException)) {
@@ -797,7 +815,7 @@
           }
         } catch (IOException e) {
           if (length <= 0) {
-            LOG.warn("Empty log, continuing: " + logfiles[i]);
+            LOG.warn("Empty hlog, continuing: " + logfiles[i]);
             continue;
           }
           throw e;
@@ -838,7 +856,7 @@
                   Path oldlogfile = null;
                   SequenceFile.Reader old = null;
                   if (fs.exists(logfile)) {
-                    LOG.warn("Old log file " + logfile
+                    LOG.warn("Old hlog file " + logfile
                         + " already exists. Copying existing file to new file");
                     oldlogfile = new Path(logfile.toString() + ".old");
                     fs.rename(logfile, oldlogfile);
@@ -852,7 +870,7 @@
                   // iterate.
                   logWriters.put(key, w);
                   if (LOG.isDebugEnabled()) {
-                    LOG.debug("Creating new log file writer for path "
+                    LOG.debug("Creating new hlog file writer for path "
                         + logfile + " and region " + Bytes.toString(key));
                   }
 
@@ -893,10 +911,10 @@
       // Wait for all threads to terminate
       try {
         for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS) ; i++) {
-          LOG.debug("Waiting for log writers to terminate, iteration #" + i);
+          LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
         }
       }catch(InterruptedException ex) {
-        LOG.warn("Log writers were interrupted, possible data loss!");
+        LOG.warn("Hlog writers were interrupted, possible data loss!");
       }
     } finally {
       for (SequenceFile.Writer w : logWriters.values()) {