You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2012/10/18 06:52:19 UTC

svn commit: r1399517 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/wal/ main/java/org/apache/hadoop/hbase/replication/regionserver/ test/java/org/apache/hadoop/hbase/replication/regionserver/

Author: jdcryans
Date: Thu Oct 18 04:52:19 2012
New Revision: 1399517

URL: http://svn.apache.org/viewvc?rev=1399517&view=rev
Log:
HBASE-6758  [replication] The replication-executor should make sure the file
            that it is replicating is closed before declaring success on that
            file (Devaraj Das via JD)

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1399517&r1=1399516&r2=1399517&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Thu Oct 18 04:52:19 2012
@@ -510,12 +510,6 @@ class FSHLog implements HLog, Syncable {
       if (nextWriter instanceof SequenceFileLogWriter) {
         nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
       }
-      // Tell our listeners that a new log was created
-      if (!this.listeners.isEmpty()) {
-        for (WALActionsListener i : this.listeners) {
-          i.postLogRoll(oldPath, newPath);
-        }
-      }
 
       synchronized (updateLock) {
         // Clean up current writer.
@@ -531,6 +525,13 @@ class FSHLog implements HLog, Syncable {
           " for " + FSUtils.getPath(newPath));
         this.numEntries.set(0);
       }
+      // Tell our listeners that a new log was created
+      if (!this.listeners.isEmpty()) {
+        for (WALActionsListener i : this.listeners) {
+          i.postLogRoll(oldPath, newPath);
+        }
+      }
+
       // Can we delete any of the old log files?
       if (this.outputfiles.size() > 0) {
         if (this.lastSeqWritten.isEmpty()) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1399517&r1=1399516&r2=1399517&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Thu Oct 18 04:52:19 2012
@@ -189,12 +189,12 @@ public class Replication implements WALA
 
   @Override
   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
-    // Not interested
+    getReplicationManager().preLogRoll(newPath);
   }
 
   @Override
   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
-    getReplicationManager().logRolled(newPath);
+    getReplicationManager().postLogRoll(newPath);
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1399517&r1=1399516&r2=1399517&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu Oct 18 04:52:19 2012
@@ -300,6 +300,18 @@ public class ReplicationSource extends T
         }
         continue;
       }
+      boolean currentWALisBeingWrittenTo = false;
+      //For WAL files we own (rather than recovered), take a snapshot of whether the
+      //current WAL file (this.currentPath) is in use (for writing) NOW!
+      //Since the new WAL paths are enqueued only after the prev WAL file
+      //is 'closed', presence of an element in the queue means that
+      //the previous WAL file was closed, else the file is in use (currentPath)
+      //We take the snapshot now so that we are protected against races
+      //where a new file gets enqueued while the current file is being processed
+      //(and where we just finished reading the current file).
+      if (!this.queueRecovered && queue.size() == 0) {
+        currentWALisBeingWrittenTo = true;
+      }
       // Open a reader on it
       if (!openReader(sleepMultiplier)) {
         // Reset the sleep multiplier, else it'd be reused for the next file
@@ -318,7 +330,7 @@ public class ReplicationSource extends T
       boolean gotIOE = false;
       currentNbEntries = 0;
       try {
-        if(readAllEntriesToReplicateOrNextFile()) {
+        if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) {
           continue;
         }
       } catch (IOException ioe) {
@@ -367,7 +379,7 @@ public class ReplicationSource extends T
       if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
         if (this.lastLoggedPosition != this.position) {
           this.manager.logPositionAndCleanOldLogs(this.currentPath,
-              this.peerClusterZnode, this.position, queueRecovered);
+              this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
           this.lastLoggedPosition = this.position;
         }
         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
@@ -376,7 +388,7 @@ public class ReplicationSource extends T
         continue;
       }
       sleepMultiplier = 1;
-      shipEdits();
+      shipEdits(currentWALisBeingWrittenTo);
 
     }
     if (this.conn != null) {
@@ -393,11 +405,13 @@ public class ReplicationSource extends T
   /**
    * Read all the entries from the current log files and retain those
    * that need to be replicated. Else, process the end of the current file.
+   * @param currentWALisBeingWrittenTo is the current WAL being written to
    * @return true if we got nothing and went to the next file, false if we got
    * entries
    * @throws IOException
    */
-  protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
+  protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
+      throws IOException{
     long seenEntries = 0;
     if (this.position != 0) {
       this.reader.seek(this.position);
@@ -447,6 +461,9 @@ public class ReplicationSource extends T
     LOG.debug("currentNbOperations:" + currentNbOperations +
         " and seenEntries:" + seenEntries +
         " and size: " + (this.reader.getPosition() - startPosition));
+    if (currentWALisBeingWrittenTo) {
+      return false;
+    }
     // If we didn't get anything and the queue has an object, it means we
     // hit the end of the file for sure
     return seenEntries == 0 && processEndOfFile();
@@ -620,8 +637,10 @@ public class ReplicationSource extends T
 
   /**
    * Do the shipping logic
+   * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) 
+   * written to when this method was called
    */
-  protected void shipEdits() {
+  protected void shipEdits(boolean currentWALisBeingWrittenTo) {
     int sleepMultiplier = 1;
     if (this.currentNbEntries == 0) {
       LOG.warn("Was given 0 edits to ship");
@@ -641,7 +660,7 @@ public class ReplicationSource extends T
           Arrays.copyOf(this.entriesArray, currentNbEntries));
         if (this.lastLoggedPosition != this.position) {
           this.manager.logPositionAndCleanOldLogs(this.currentPath,
-              this.peerClusterZnode, this.position, queueRecovered);
+              this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
           this.lastLoggedPosition = this.position;
         }
         this.totalReplicatedEdits += currentNbEntries;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1399517&r1=1399516&r2=1399517&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Thu Oct 18 04:52:19 2012
@@ -146,11 +146,16 @@ public class ReplicationSourceManager {
    * @param id id of the peer cluster
    * @param position current location in the log
    * @param queueRecovered indicates if this queue comes from another region server
+   * @param holdLogInZK if true then the log is retained in ZK
    */
-  public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
+  public void logPositionAndCleanOldLogs(Path log, String id, long position, 
+      boolean queueRecovered, boolean holdLogInZK) {
     String key = log.getName();
     LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
     this.zkHelper.writeReplicationStatus(key, id, position);
+    if (holdLogInZK) {
+     return;
+    }
     synchronized (this.hlogsById) {
       SortedSet<String> hlogs = this.hlogsById.get(id);
       if (!queueRecovered && hlogs.first() != key) {
@@ -252,7 +257,7 @@ public class ReplicationSourceManager {
     return this.sources;
   }
 
-  void logRolled(Path newLog) throws IOException {
+  void preLogRoll(Path newLog) throws IOException {
     if (!this.replicating.get()) {
       LOG.warn("Replication stopped, won't add new log");
       return;
@@ -278,6 +283,14 @@ public class ReplicationSourceManager {
     }
 
     this.latestPath = newLog;
+  }
+
+  void postLogRoll(Path newLog) throws IOException {
+    if (!this.replicating.get()) {
+      LOG.warn("Replication stopped, won't add new log");
+      return;
+    }
+
     // This only updates the sources we own, not the recovered ones
     for (ReplicationSourceInterface source : this.sources) {
       source.enqueueLog(newLog);    

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1399517&r1=1399516&r2=1399517&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Thu Oct 18 04:52:19 2012
@@ -203,7 +203,7 @@ public class TestReplicationSourceManage
     hlog.rollWriter();
 
     manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
-        "1", 0, false);
+        "1", 0, false, false);
 
     HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
         System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);