You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/07/24 19:40:23 UTC
svn commit: r1506639 - in /hbase/branches/0.95/hbase-server/src:
main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java
Author: stack
Date: Wed Jul 24 17:40:23 2013
New Revision: 1506639
URL: http://svn.apache.org/r1506639
Log:
HBASE-9033 Add tracing to ReplicationSource and enable it in tests
Modified:
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1506639&r1=1506638&r2=1506639&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Jul 24 17:40:23 2013
@@ -35,7 +35,6 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -81,7 +80,7 @@ import org.apache.zookeeper.KeeperExcept
public class ReplicationSource extends Thread
implements ReplicationSourceInterface {
- private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
+ public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
// Queue of logs to process
private PriorityBlockingQueue<Path> queue;
// container of entries to replicate
@@ -121,6 +120,8 @@ public class ReplicationSource extends T
private UUID peerClusterId;
// total number of edits we replicated
private long totalReplicatedEdits = 0;
+ // total number of edits we replicated
+ private long totalReplicatedOperations = 0;
// The znode we currently play with
private String peerClusterZnode;
// Maximum number of retries before taking bold actions
@@ -206,7 +207,7 @@ public class ReplicationSource extends T
List<ServerName> addresses = this.zkHelper.getSlavesAddresses(this.peerId);
Set<ServerName> setOfAddr = new HashSet<ServerName>();
int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
- LOG.info("Getting " + nbPeers +
+ LOG.debug("Getting " + nbPeers +
" rs from peer cluster # " + this.peerId);
for (int i = 0; i < nbPeers; i++) {
ServerName sn;
@@ -255,6 +256,10 @@ public class ReplicationSource extends T
try {
this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
this.peerClusterZnode, this.queue.peek().getName()));
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Recovered queue started with log " + this.queue.peek() +
+ " at position " + this.repLogReader.getPosition());
+ }
} catch (KeeperException e) {
this.terminate("Couldn't get the position of this recovered queue " +
this.peerClusterZnode, e);
@@ -282,10 +287,6 @@ public class ReplicationSource extends T
sleepMultiplier++;
}
continue;
- } else if (oldPath != null && !oldPath.getName().equals(getCurrentPath().getName())) {
- this.manager.cleanOldLogs(getCurrentPath().getName(),
- this.peerId,
- this.replicationQueueInfo.isQueueRecovered());
}
boolean currentWALisBeingWrittenTo = false;
//For WAL files we own (rather than recovered), take a snapshot of whether the
@@ -402,6 +403,10 @@ public class ReplicationSource extends T
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
throws IOException{
long seenEntries = 0;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Seeking in " + this.currentPath + " at position "
+ + this.repLogReader.getPosition());
+ }
this.repLogReader.seek();
HLog.Entry entry =
this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
@@ -475,6 +480,14 @@ public class ReplicationSource extends T
if (this.currentPath == null) {
this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
this.metrics.setSizeOfLogQueue(queue.size());
+ if (this.currentPath != null) {
+ this.manager.cleanOldLogs(this.currentPath.getName(),
+ this.peerId,
+ this.replicationQueueInfo.isQueueRecovered());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("New log: " + this.currentPath);
+ }
+ }
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while reading edits", e);
@@ -491,6 +504,9 @@ public class ReplicationSource extends T
protected boolean openReader(int sleepMultiplier) {
try {
try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Opening log " + this.currentPath);
+ }
this.reader = repLogReader.openReader(this.currentPath);
} catch (FileNotFoundException fnfe) {
if (this.replicationQueueInfo.isQueueRecovered()) {
@@ -643,6 +659,9 @@ public class ReplicationSource extends T
}
try {
AdminService.BlockingInterface rrs = getRS();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Replicating " + this.currentNbEntries + " entries");
+ }
ReplicationProtbufUtil.replicateWALEntry(rrs,
Arrays.copyOf(this.entriesArray, currentNbEntries));
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
@@ -652,9 +671,14 @@ public class ReplicationSource extends T
this.lastLoggedPosition = this.repLogReader.getPosition();
}
this.totalReplicatedEdits += currentNbEntries;
+ this.totalReplicatedOperations += currentNbOperations;
this.metrics.shipBatch(this.currentNbOperations);
this.metrics.setAgeOfLastShippedOp(
this.entriesArray[currentNbEntries-1].getKey().getWriteTime());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
+ + this.totalReplicatedOperations + " operations");
+ }
break;
} catch (IOException ioe) {
@@ -724,6 +748,15 @@ public class ReplicationSource extends T
*/
protected boolean processEndOfFile() {
if (this.queue.size() != 0) {
+ if (LOG.isTraceEnabled()) {
+ String filesize = "N/A";
+ try {
+ FileStatus stat = this.fs.getFileStatus(this.currentPath);
+ filesize = stat.getLen()+"";
+ } catch (IOException ex) {}
+ LOG.trace("Reached the end of a log, stats: " + getStats() +
+ ", and the length of the file is " + filesize);
+ }
this.currentPath = null;
this.repLogReader.finishCurrentFile();
this.reader = null;
@@ -851,13 +884,7 @@ public class ReplicationSource extends T
@Override
public String getStats() {
- String position = "N/A";
- try {
- if (this.reader != null) {
- position = this.reader.getPosition()+"";
- }
- } catch (IOException ioe) {
- }
+ long position = this.repLogReader.getPosition();
return "Total replicated edits: " + totalReplicatedEdits +
", currently replicating from: " + this.currentPath +
" at position: " + position;
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java?rev=1506639&r1=1506638&r2=1506639&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java Wed Jul 24 17:40:23 2013
@@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.replicat
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import org.apache.log4j.Level;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -35,6 +38,10 @@ import static org.junit.Assert.fail;
@Category(LargeTests.class)
public class TestReplicationKillRS extends TestReplicationBase {
+ {
+ ((Log4JLogger) ReplicationSource.LOG).getLogger().setLevel(Level.ALL);
+ }
+
private static final Log LOG = LogFactory.getLog(TestReplicationKillRS.class);
/**