You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/07/24 19:40:23 UTC

svn commit: r1506639 - in /hbase/branches/0.95/hbase-server/src: main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java

Author: stack
Date: Wed Jul 24 17:40:23 2013
New Revision: 1506639

URL: http://svn.apache.org/r1506639
Log:
HBASE-9033 Add tracing to ReplicationSource and enable it in tests

Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1506639&r1=1506638&r2=1506639&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Jul 24 17:40:23 2013
@@ -35,7 +35,6 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -81,7 +80,7 @@ import org.apache.zookeeper.KeeperExcept
 public class ReplicationSource extends Thread
     implements ReplicationSourceInterface {
 
-  private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
+  public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
   // Queue of logs to process
   private PriorityBlockingQueue<Path> queue;
   // container of entries to replicate
@@ -121,6 +120,8 @@ public class ReplicationSource extends T
   private UUID peerClusterId;
   // total number of edits we replicated
   private long totalReplicatedEdits = 0;
+  // total number of edits we replicated
+  private long totalReplicatedOperations = 0;
   // The znode we currently play with
   private String peerClusterZnode;
   // Maximum number of retries before taking bold actions
@@ -206,7 +207,7 @@ public class ReplicationSource extends T
     List<ServerName> addresses = this.zkHelper.getSlavesAddresses(this.peerId);
     Set<ServerName> setOfAddr = new HashSet<ServerName>();
     int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
-    LOG.info("Getting " + nbPeers +
+    LOG.debug("Getting " + nbPeers +
         " rs from peer cluster # " + this.peerId);
     for (int i = 0; i < nbPeers; i++) {
       ServerName sn;
@@ -255,6 +256,10 @@ public class ReplicationSource extends T
       try {
         this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
             this.peerClusterZnode, this.queue.peek().getName()));
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Recovered queue started with log " + this.queue.peek() +
+              " at position " + this.repLogReader.getPosition());
+        }
       } catch (KeeperException e) {
         this.terminate("Couldn't get the position of this recovered queue " +
             this.peerClusterZnode, e);
@@ -282,10 +287,6 @@ public class ReplicationSource extends T
           sleepMultiplier++;
         }
         continue;
-      } else if (oldPath != null && !oldPath.getName().equals(getCurrentPath().getName())) {
-        this.manager.cleanOldLogs(getCurrentPath().getName(),
-                                  this.peerId,
-                                  this.replicationQueueInfo.isQueueRecovered());
       }
       boolean currentWALisBeingWrittenTo = false;
       //For WAL files we own (rather than recovered), take a snapshot of whether the
@@ -402,6 +403,10 @@ public class ReplicationSource extends T
   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
       throws IOException{
     long seenEntries = 0;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Seeking in " + this.currentPath + " at position "
+          + this.repLogReader.getPosition());
+    }
     this.repLogReader.seek();
     HLog.Entry entry =
         this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
@@ -475,6 +480,14 @@ public class ReplicationSource extends T
       if (this.currentPath == null) {
         this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
         this.metrics.setSizeOfLogQueue(queue.size());
+        if (this.currentPath != null) {
+          this.manager.cleanOldLogs(this.currentPath.getName(),
+              this.peerId,
+              this.replicationQueueInfo.isQueueRecovered());
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("New log: " + this.currentPath);
+          }
+        }
       }
     } catch (InterruptedException e) {
       LOG.warn("Interrupted while reading edits", e);
@@ -491,6 +504,9 @@ public class ReplicationSource extends T
   protected boolean openReader(int sleepMultiplier) {
     try {
       try {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Opening log " + this.currentPath);
+        }
         this.reader = repLogReader.openReader(this.currentPath);
       } catch (FileNotFoundException fnfe) {
         if (this.replicationQueueInfo.isQueueRecovered()) {
@@ -643,6 +659,9 @@ public class ReplicationSource extends T
       }
       try {
         AdminService.BlockingInterface rrs = getRS();
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Replicating " + this.currentNbEntries + " entries");
+        }
         ReplicationProtbufUtil.replicateWALEntry(rrs,
             Arrays.copyOf(this.entriesArray, currentNbEntries));
         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
@@ -652,9 +671,14 @@ public class ReplicationSource extends T
           this.lastLoggedPosition = this.repLogReader.getPosition();
         }
         this.totalReplicatedEdits += currentNbEntries;
+        this.totalReplicatedOperations += currentNbOperations;
         this.metrics.shipBatch(this.currentNbOperations);
         this.metrics.setAgeOfLastShippedOp(
             this.entriesArray[currentNbEntries-1].getKey().getWriteTime());
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
+              + this.totalReplicatedOperations + " operations");
+        }
         break;
 
       } catch (IOException ioe) {
@@ -724,6 +748,15 @@ public class ReplicationSource extends T
    */
   protected boolean processEndOfFile() {
     if (this.queue.size() != 0) {
+      if (LOG.isTraceEnabled()) {
+        String filesize = "N/A";
+        try {
+          FileStatus stat = this.fs.getFileStatus(this.currentPath);
+          filesize = stat.getLen()+"";
+        } catch (IOException ex) {}
+        LOG.trace("Reached the end of a log, stats: " + getStats() +
+            ", and the length of the file is " + filesize);
+      }
       this.currentPath = null;
       this.repLogReader.finishCurrentFile();
       this.reader = null;
@@ -851,13 +884,7 @@ public class ReplicationSource extends T
 
   @Override
   public String getStats() {
-    String position = "N/A";
-    try {
-      if (this.reader != null) {
-        position = this.reader.getPosition()+"";
-      }
-    } catch (IOException ioe) {
-    }
+    long position = this.repLogReader.getPosition();
     return "Total replicated edits: " + totalReplicatedEdits +
       ", currently replicating from: " + this.currentPath +
       " at position: " + position;

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java?rev=1506639&r1=1506638&r2=1506639&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java Wed Jul 24 17:40:23 2013
@@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.replicat
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.LargeTests;
 import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import org.apache.log4j.Level;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -35,6 +38,10 @@ import static org.junit.Assert.fail;
 @Category(LargeTests.class)
 public class TestReplicationKillRS extends TestReplicationBase {
 
+  {
+    ((Log4JLogger) ReplicationSource.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   private static final Log LOG = LogFactory.getLog(TestReplicationKillRS.class);
 
   /**