You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2012/10/18 06:52:19 UTC
svn commit: r1399517 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/wal/
main/java/org/apache/hadoop/hbase/replication/regionserver/
test/java/org/apache/hadoop/hbase/replication/regionserver/
Author: jdcryans
Date: Thu Oct 18 04:52:19 2012
New Revision: 1399517
URL: http://svn.apache.org/viewvc?rev=1399517&view=rev
Log:
HBASE-6758 [replication] The replication-executor should make sure the file
that it is replicating is closed before declaring success on that
file (Devaraj Das via JD)
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1399517&r1=1399516&r2=1399517&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Thu Oct 18 04:52:19 2012
@@ -510,12 +510,6 @@ class FSHLog implements HLog, Syncable {
if (nextWriter instanceof SequenceFileLogWriter) {
nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
}
- // Tell our listeners that a new log was created
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.postLogRoll(oldPath, newPath);
- }
- }
synchronized (updateLock) {
// Clean up current writer.
@@ -531,6 +525,13 @@ class FSHLog implements HLog, Syncable {
" for " + FSUtils.getPath(newPath));
this.numEntries.set(0);
}
+ // Tell our listeners that a new log was created
+ if (!this.listeners.isEmpty()) {
+ for (WALActionsListener i : this.listeners) {
+ i.postLogRoll(oldPath, newPath);
+ }
+ }
+
// Can we delete any of the old log files?
if (this.outputfiles.size() > 0) {
if (this.lastSeqWritten.isEmpty()) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1399517&r1=1399516&r2=1399517&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Thu Oct 18 04:52:19 2012
@@ -189,12 +189,12 @@ public class Replication implements WALA
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
- // Not interested
+ getReplicationManager().preLogRoll(newPath);
}
@Override
public void postLogRoll(Path oldPath, Path newPath) throws IOException {
- getReplicationManager().logRolled(newPath);
+ getReplicationManager().postLogRoll(newPath);
}
@Override
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1399517&r1=1399516&r2=1399517&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu Oct 18 04:52:19 2012
@@ -300,6 +300,18 @@ public class ReplicationSource extends T
}
continue;
}
+ boolean currentWALisBeingWrittenTo = false;
+ //For WAL files we own (rather than recovered), take a snapshot of whether the
+ //current WAL file (this.currentPath) is in use (for writing) NOW!
+ //Since the new WAL paths are enqueued only after the prev WAL file
+ //is 'closed', presence of an element in the queue means that
+ //the previous WAL file was closed, else the file is in use (currentPath)
+ //We take the snapshot now so that we are protected against races
+ //where a new file gets enqueued while the current file is being processed
+ //(and where we just finished reading the current file).
+ if (!this.queueRecovered && queue.size() == 0) {
+ currentWALisBeingWrittenTo = true;
+ }
// Open a reader on it
if (!openReader(sleepMultiplier)) {
// Reset the sleep multiplier, else it'd be reused for the next file
@@ -318,7 +330,7 @@ public class ReplicationSource extends T
boolean gotIOE = false;
currentNbEntries = 0;
try {
- if(readAllEntriesToReplicateOrNextFile()) {
+ if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) {
continue;
}
} catch (IOException ioe) {
@@ -367,7 +379,7 @@ public class ReplicationSource extends T
if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
if (this.lastLoggedPosition != this.position) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered);
+ this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.position;
}
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
@@ -376,7 +388,7 @@ public class ReplicationSource extends T
continue;
}
sleepMultiplier = 1;
- shipEdits();
+ shipEdits(currentWALisBeingWrittenTo);
}
if (this.conn != null) {
@@ -393,11 +405,13 @@ public class ReplicationSource extends T
/**
* Read all the entries from the current log files and retain those
* that need to be replicated. Else, process the end of the current file.
+ * @param currentWALisBeingWrittenTo is the current WAL being written to
* @return true if we got nothing and went to the next file, false if we got
* entries
* @throws IOException
*/
- protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
+ protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
+ throws IOException{
long seenEntries = 0;
if (this.position != 0) {
this.reader.seek(this.position);
@@ -447,6 +461,9 @@ public class ReplicationSource extends T
LOG.debug("currentNbOperations:" + currentNbOperations +
" and seenEntries:" + seenEntries +
" and size: " + (this.reader.getPosition() - startPosition));
+ if (currentWALisBeingWrittenTo) {
+ return false;
+ }
// If we didn't get anything and the queue has an object, it means we
// hit the end of the file for sure
return seenEntries == 0 && processEndOfFile();
@@ -620,8 +637,10 @@ public class ReplicationSource extends T
/**
* Do the shipping logic
+ * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
+ * written to when this method was called
*/
- protected void shipEdits() {
+ protected void shipEdits(boolean currentWALisBeingWrittenTo) {
int sleepMultiplier = 1;
if (this.currentNbEntries == 0) {
LOG.warn("Was given 0 edits to ship");
@@ -641,7 +660,7 @@ public class ReplicationSource extends T
Arrays.copyOf(this.entriesArray, currentNbEntries));
if (this.lastLoggedPosition != this.position) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered);
+ this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.position;
}
this.totalReplicatedEdits += currentNbEntries;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1399517&r1=1399516&r2=1399517&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Thu Oct 18 04:52:19 2012
@@ -146,11 +146,16 @@ public class ReplicationSourceManager {
* @param id id of the peer cluster
* @param position current location in the log
* @param queueRecovered indicates if this queue comes from another region server
+ * @param holdLogInZK if true then the log is retained in ZK
*/
- public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
+ public void logPositionAndCleanOldLogs(Path log, String id, long position,
+ boolean queueRecovered, boolean holdLogInZK) {
String key = log.getName();
LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
this.zkHelper.writeReplicationStatus(key, id, position);
+ if (holdLogInZK) {
+ return;
+ }
synchronized (this.hlogsById) {
SortedSet<String> hlogs = this.hlogsById.get(id);
if (!queueRecovered && hlogs.first() != key) {
@@ -252,7 +257,7 @@ public class ReplicationSourceManager {
return this.sources;
}
- void logRolled(Path newLog) throws IOException {
+ void preLogRoll(Path newLog) throws IOException {
if (!this.replicating.get()) {
LOG.warn("Replication stopped, won't add new log");
return;
@@ -278,6 +283,14 @@ public class ReplicationSourceManager {
}
this.latestPath = newLog;
+ }
+
+ void postLogRoll(Path newLog) throws IOException {
+ if (!this.replicating.get()) {
+ LOG.warn("Replication stopped, won't add new log");
+ return;
+ }
+
// This only updates the sources we own, not the recovered ones
for (ReplicationSourceInterface source : this.sources) {
source.enqueueLog(newLog);
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1399517&r1=1399516&r2=1399517&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Thu Oct 18 04:52:19 2012
@@ -203,7 +203,7 @@ public class TestReplicationSourceManage
hlog.rollWriter();
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
- "1", 0, false);
+ "1", 0, false, false);
HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);