You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2012/09/20 20:59:05 UTC

svn commit: r1388160 - /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Author: jdcryans
Date: Thu Sep 20 18:59:05 2012
New Revision: 1388160

URL: http://svn.apache.org/viewvc?rev=1388160&view=rev
Log:
HBASE-6847  HBASE-6649 broke replication (Devaraj Das via JD)

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1388160&r1=1388159&r2=1388160&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu Sep 20 18:59:05 2012
@@ -338,10 +338,6 @@ public class ReplicationSource extends T
         }
       } finally {
         try {
-          // if current path is null, it means we processEndOfFile hence
-          if (this.currentPath != null && !gotIOE) {
-            this.position = this.reader.getPosition();
-          }
           if (this.reader != null) {
             this.reader.close();
           }
@@ -391,7 +387,8 @@ public class ReplicationSource extends T
     if (this.position != 0) {
       this.reader.seek(this.position);
     }
-    HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
+    long startPosition = this.position;
+    HLog.Entry entry = readNextAndSetPosition();
     while (entry != null) {
       WALEdit edit = entry.getEdit();
       this.metrics.logEditsReadRate.inc(1);
@@ -420,13 +417,13 @@ public class ReplicationSource extends T
         }
       }
       // Stop if too many entries or too big
-      if ((this.reader.getPosition() - this.position)
+      if ((this.reader.getPosition() - startPosition)
           >= this.replicationQueueSizeCapacity ||
           currentNbEntries >= this.replicationQueueNbCapacity) {
         break;
       }
       try {
-        entry = this.reader.next(entriesArray[currentNbEntries]);
+        entry = readNextAndSetPosition();
       } catch (IOException ie) {
         LOG.debug("Break on IOE: " + ie.getMessage());
         break;
@@ -434,12 +431,22 @@ public class ReplicationSource extends T
     }
     LOG.debug("currentNbOperations:" + currentNbOperations +
         " and seenEntries:" + seenEntries +
-        " and size: " + (this.reader.getPosition() - this.position));
+        " and size: " + (this.reader.getPosition() - startPosition));
     // If we didn't get anything and the queue has an object, it means we
     // hit the end of the file for sure
     return seenEntries == 0 && processEndOfFile();
   }
 
+  private HLog.Entry readNextAndSetPosition() throws IOException {
+    HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
+    // Store the position so that in the future the reader can start
+    // reading from here. If the above call to next() throws an
+    // exception, the position won't be changed and retry will happen
+    // from the last known good position
+    this.position = this.reader.getPosition();
+    return entry;
+  } 
+
   private void connectToPeers() {
     // Connect to peer cluster first, unless we have to stop
     while (this.isActive() && this.currentPeers.size() == 0) {