You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2012/09/20 20:58:15 UTC
svn commit: r1388159 - in /hbase/branches/0.92: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Author: jdcryans
Date: Thu Sep 20 18:58:14 2012
New Revision: 1388159
URL: http://svn.apache.org/viewvc?rev=1388159&view=rev
Log:
HBASE-6847 HBASE-6649 broke replication (Devaraj Das via JD)
Modified:
hbase/branches/0.92/CHANGES.txt
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Modified: hbase/branches/0.92/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/CHANGES.txt?rev=1388159&r1=1388158&r2=1388159&view=diff
==============================================================================
--- hbase/branches/0.92/CHANGES.txt (original)
+++ hbase/branches/0.92/CHANGES.txt Thu Sep 20 18:58:14 2012
@@ -3,6 +3,7 @@ Release 0.92.3 - Unreleased
BUG FIXES
HBASE-6649 [0.92 UNIT TESTS] TestReplication.queueFailover occasionally fails [Part-1]
(Devaraj Das via Stack)
+ HBASE-6847 HBASE-6649 broke replication (Devaraj Das via JD)
IMPROVEMENTS
Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1388159&r1=1388158&r2=1388159&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu Sep 20 18:58:14 2012
@@ -341,10 +341,6 @@ public class ReplicationSource extends T
}
} finally {
try {
- // if current path is null, it means we processEndOfFile hence
- if (this.currentPath != null && !gotIOE) {
- this.position = this.reader.getPosition();
- }
if (this.reader != null) {
this.reader.close();
}
@@ -394,7 +390,8 @@ public class ReplicationSource extends T
if (this.position != 0) {
this.reader.seek(this.position);
}
- HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
+ long startPosition = this.position;
+ HLog.Entry entry = readNextAndSetPosition();
while (entry != null) {
WALEdit edit = entry.getEdit();
this.metrics.logEditsReadRate.inc(1);
@@ -423,13 +420,13 @@ public class ReplicationSource extends T
}
}
// Stop if too many entries or too big
- if ((this.reader.getPosition() - this.position)
+ if ((this.reader.getPosition() - startPosition)
>= this.replicationQueueSizeCapacity ||
currentNbEntries >= this.replicationQueueNbCapacity) {
break;
}
try {
- entry = this.reader.next(entriesArray[currentNbEntries]);
+ entry = readNextAndSetPosition();
} catch (IOException ie) {
LOG.debug("Break on IOE: " + ie.getMessage());
break;
@@ -437,12 +434,22 @@ public class ReplicationSource extends T
}
LOG.debug("currentNbOperations:" + currentNbOperations +
" and seenEntries:" + seenEntries +
- " and size: " + (this.reader.getPosition() - this.position));
+ " and size: " + (this.reader.getPosition() - startPosition));
// If we didn't get anything and the queue has an object, it means we
// hit the end of the file for sure
return seenEntries == 0 && processEndOfFile();
}
+ private HLog.Entry readNextAndSetPosition() throws IOException {
+ HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
+ // Store the position so that in the future the reader can start
+ // reading from here. If the above call to next() throws an
+ // exception, the position won't be changed and retry will happen
+ // from the last known good position
+ this.position = this.reader.getPosition();
+ return entry;
+ }
+
private void connectToPeers() {
// Connect to peer cluster first, unless we have to stop
while (this.isActive() && this.currentPeers.size() == 0) {