You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/13 21:58:32 UTC
svn commit: r1445918 [11/29] - in /hbase/branches/hbase-7290: ./ bin/ conf/
dev-support/ hbase-client/ hbase-common/
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/
hbase-common/src/ma...
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java Wed Feb 13 20:58:23 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.executor.
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
@@ -44,7 +45,7 @@ import org.apache.zookeeper.KeeperExcept
public class OpenRegionHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(OpenRegionHandler.class);
- private final RegionServerServices rsServices;
+ protected final RegionServerServices rsServices;
private final HRegionInfo regionInfo;
private final HTableDescriptor htd;
@@ -85,30 +86,50 @@ public class OpenRegionHandler extends E
@Override
public void process() throws IOException {
+ boolean openSuccessful = false;
+ final String regionName = regionInfo.getRegionNameAsString();
+
try {
- final String name = regionInfo.getRegionNameAsString();
if (this.server.isStopped() || this.rsServices.isStopping()) {
return;
}
final String encodedName = regionInfo.getEncodedName();
+ // 3 different difficult situations can occur
+ // 1) The opening was cancelled. This is an expected situation
+ // 2) The region was hijacked, we no longer have the znode
+ // 3) The region is now marked as online while we're suppose to open. This would be a bug.
+
// Check that this region is not already online
- HRegion region = this.rsServices.getFromOnlineRegions(encodedName);
+ if (this.rsServices.getFromOnlineRegions(encodedName) != null) {
+ LOG.error("Region " + encodedName +
+ " was already online when we started processing the opening. " +
+ "Marking this new attempt as failed");
+ tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, this.version);
+ return;
+ }
+ // Check that we're still supposed to open the region and transition.
// If fails, just return. Someone stole the region from under us.
- // Calling transitionZookeeperOfflineToOpening initalizes this.version.
- if (!transitionZookeeperOfflineToOpening(encodedName,
- versionOfOfflineNode)) {
- LOG.warn("Region was hijacked? It no longer exists, encodedName=" +
- encodedName);
+ // Calling transitionZookeeperOfflineToOpening initializes this.version.
+ if (!isRegionStillOpening()){
+ LOG.error("Region " + encodedName + " opening cancelled");
+ tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, this.version);
+ return;
+ }
+
+ if (!transitionZookeeperOfflineToOpening(encodedName, versionOfOfflineNode)) {
+ LOG.warn("Region was hijacked? Opening cancelled for encodedName=" + encodedName);
+ // This is a desperate attempt: the znode is unlikely to be ours. But we can't do more.
+ tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, this.version);
return;
}
// Open region. After a successful open, failures in subsequent
// processing needs to do a close as part of cleanup.
- region = openRegion();
+ HRegion region = openRegion();
if (region == null) {
- tryTransitionToFailedOpen(regionInfo);
+ tryTransitionFromOpeningToFailedOpen(regionInfo);
return;
}
boolean failed = true;
@@ -120,37 +141,63 @@ public class OpenRegionHandler extends E
if (failed || this.server.isStopped() ||
this.rsServices.isStopping()) {
cleanupFailedOpen(region);
- tryTransitionToFailedOpen(regionInfo);
+ tryTransitionFromOpeningToFailedOpen(regionInfo);
return;
}
- if (!transitionToOpened(region)) {
+
+ if (!isRegionStillOpening() || !transitionToOpened(region)) {
// If we fail to transition to opened, it's because of one of two cases:
// (a) we lost our ZK lease
// OR (b) someone else opened the region before us
- // In either case, we don't need to transition to FAILED_OPEN state.
- // In case (a), the Master will process us as a dead server. In case
- // (b) the region is already being handled elsewhere anyway.
+ // OR (c) someone cancelled the open
+ // In all cases, we try to transition to failed_open to be safe.
cleanupFailedOpen(region);
+ tryTransitionFromOpeningToFailedOpen(regionInfo);
return;
}
- // One more check to make sure we are opening instead of closing
- if (!isRegionStillOpening()) {
- LOG.warn("Open region aborted since it isn't opening any more");
- cleanupFailedOpen(region);
- return;
- }
+ // We have a znode in the opened state now. We can't really delete it as the master job.
+ // Transitioning to failed open would create a race condition if the master has already
+ // acted the transition to opened.
+ // Cancelling the open is dangerous, because we would have a state where the master thinks
+ // the region is opened while the region is actually closed. It is a dangerous state
+ // to be in. For this reason, from now on, we're not going back. There is a message in the
+ // finally close to let the admin knows where we stand.
+
// Successful region open, and add it to OnlineRegions
this.rsServices.addToOnlineRegions(region);
+ openSuccessful = true;
// Done! Successful region open
- LOG.debug("Opened " + name + " on server:" +
+ LOG.debug("Opened " + regionName + " on server:" +
this.server.getServerName());
+
+
} finally {
- this.rsServices.getRegionsInTransitionInRS().
+ final Boolean current = this.rsServices.getRegionsInTransitionInRS().
remove(this.regionInfo.getEncodedNameAsBytes());
+
+ // Let's check if we have met a race condition on open cancellation....
+ // A better solution would be to not have any race condition.
+ // this.rsServices.getRegionsInTransitionInRS().remove(
+ // this.regionInfo.getEncodedNameAsBytes(), Boolean.TRUE);
+ // would help, but we would still have a consistency issue to manage with
+ // 1) this.rsServices.addToOnlineRegions(region);
+ // 2) the ZK state.
+ if (openSuccessful) {
+ if (current == null) { // Should NEVER happen, but let's be paranoid.
+ LOG.error("Bad state: we've just opened a region that was NOT in transition. Region=" +
+ regionName
+ );
+ } else if (Boolean.FALSE.equals(current)) { // Can happen, if we're really unlucky.
+ LOG.error("Race condition: we've finished to open a region, while a close was requested "
+ + " on region=" + regionName + ". It can be a critical error, as a region that" +
+ " should be closed is now opened."
+ );
+ }
+ }
}
}
@@ -226,7 +273,8 @@ public class OpenRegionHandler extends E
/**
* Thread to run region post open tasks. Call {@link #getException()} after
* the thread finishes to check for exceptions running
- * {@link RegionServerServices#postOpenDeployTasks(HRegion, org.apache.hadoop.hbase.catalog.CatalogTracker, boolean)}
+ * {@link RegionServerServices#postOpenDeployTasks(
+ * HRegion, org.apache.hadoop.hbase.catalog.CatalogTracker, boolean)}
* .
*/
static class PostOpenDeployTasksThread extends Thread {
@@ -277,10 +325,6 @@ public class OpenRegionHandler extends E
* @throws IOException
*/
private boolean transitionToOpened(final HRegion r) throws IOException {
- if (!isRegionStillOpening()) {
- LOG.warn("Open region aborted since it isn't opening any more");
- return false;
- }
boolean result = false;
HRegionInfo hri = r.getRegionInfo();
final String name = hri.getRegionNameAsString();
@@ -310,11 +354,12 @@ public class OpenRegionHandler extends E
* @param hri Region we're working on.
* @return whether znode is successfully transitioned to FAILED_OPEN state.
*/
- private boolean tryTransitionToFailedOpen(final HRegionInfo hri) {
+ private boolean tryTransitionFromOpeningToFailedOpen(final HRegionInfo hri) {
boolean result = false;
final String name = hri.getRegionNameAsString();
try {
- LOG.info("Opening of region " + hri + " failed, marking as FAILED_OPEN in ZK");
+ LOG.info("Opening of region " + hri + " failed, transitioning" +
+ " from OPENING to FAILED_OPEN in ZK, expecting version " + this.version);
if (ZKAssign.transitionNode(
this.server.getZooKeeper(), hri,
this.server.getServerName(),
@@ -335,6 +380,43 @@ public class OpenRegionHandler extends E
}
/**
+ * Try to transition to open. This function is static to make it usable before creating the
+ * handler.
+ *
+ * This is not guaranteed to succeed, we just do our best.
+ *
+ * @param rsServices
+ * @param hri Region we're working on.
+ * @param versionOfOfflineNode version to checked.
+ * @return whether znode is successfully transitioned to FAILED_OPEN state.
+ */
+ public static boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices,
+ final HRegionInfo hri, final int versionOfOfflineNode) {
+ boolean result = false;
+ final String name = hri.getRegionNameAsString();
+ try {
+ LOG.info("Opening of region " + hri + " failed, transitioning" +
+ " from OFFLINE to FAILED_OPEN in ZK, expecting version " + versionOfOfflineNode);
+ if (ZKAssign.transitionNode(
+ rsServices.getZooKeeper(), hri,
+ rsServices.getServerName(),
+ EventType.M_ZK_REGION_OFFLINE,
+ EventType.RS_ZK_REGION_FAILED_OPEN,
+ versionOfOfflineNode) == -1) {
+ LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
+ "It's likely that the master already timed out this open " +
+ "attempt, and thus another RS already has the region.");
+ } else {
+ result = true;
+ }
+ } catch (KeeperException e) {
+ LOG.error("Failed transitioning node " + name + " from OFFLINE to FAILED_OPEN", e);
+ }
+ return result;
+ }
+
+
+ /**
* @return Instance of HRegion if successful open else null.
*/
HRegion openRegion() {
@@ -343,7 +425,8 @@ public class OpenRegionHandler extends E
// Instantiate the region. This also periodically tickles our zk OPENING
// state so master doesn't timeout this region in transition.
region = HRegion.openHRegion(this.regionInfo, this.htd,
- this.rsServices.getWAL(), this.server.getConfiguration(),
+ this.rsServices.getWAL(this.regionInfo),
+ this.server.getConfiguration(),
this.rsServices,
new CancelableProgressable() {
public boolean progress() {
@@ -379,7 +462,7 @@ public class OpenRegionHandler extends E
private boolean isRegionStillOpening() {
byte[] encodedName = regionInfo.getEncodedNameAsBytes();
Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName);
- return action != null && action.booleanValue();
+ return Boolean.TRUE.equals(action); // true means opening for RIT
}
/**
@@ -392,10 +475,6 @@ public class OpenRegionHandler extends E
*/
boolean transitionZookeeperOfflineToOpening(final String encodedName,
int versionOfOfflineNode) {
- if (!isRegionStillOpening()) {
- LOG.warn("Open region aborted since it isn't opening any more");
- return false;
- }
// TODO: should also handle transition from CLOSED?
try {
// Initialize the znode version.
@@ -405,6 +484,8 @@ public class OpenRegionHandler extends E
} catch (KeeperException e) {
LOG.error("Error transition from OFFLINE to OPENING for region=" +
encodedName, e);
+ this.version = -1;
+ return false;
}
boolean b = isGoodVersion();
if (!b) {
@@ -436,6 +517,7 @@ public class OpenRegionHandler extends E
server.abort("Exception refreshing OPENING; region=" + encodedName +
", context=" + context, e);
this.version = -1;
+ return false;
}
boolean b = isGoodVersion();
if (!b) {
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java Wed Feb 13 20:58:23 2013
@@ -144,7 +144,12 @@ public class Compressor {
// the status byte also acts as the higher order byte of the dictionary
// entry
short dictIdx = toShort(status, in.readByte());
- byte[] entry = dict.getEntry(dictIdx);
+ byte[] entry;
+ try {
+ entry = dict.getEntry(dictIdx);
+ } catch (Exception ex) {
+ throw new IOException("Unable to uncompress the log entry", ex);
+ }
if (entry == null) {
throw new IOException("Missing dictionary entry for index "
+ dictIdx);
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Wed Feb 13 20:58:23 2013
@@ -24,8 +24,10 @@ import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URLEncoder;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -57,6 +59,7 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.DrainBarrier;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HasThread;
@@ -132,28 +135,52 @@ class FSHLog implements HLog, Syncable {
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
final static Object [] NO_ARGS = new Object []{};
- /*
+ /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */
+ private DrainBarrier closeBarrier = new DrainBarrier();
+
+ /**
* Current log file.
*/
Writer writer;
- /*
+ /**
* Map of all log files but the current one.
*/
final SortedMap<Long, Path> outputfiles =
Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
- /*
- * Map of encoded region names to their most recent sequence/edit id in their
- * memstore.
+
+ /**
+ * This lock synchronizes all operations on oldestUnflushedSeqNums and oldestFlushingSeqNums,
+ * with the exception of append's putIfAbsent into oldestUnflushedSeqNums.
+ * We only use these to find out the low bound seqNum, or to find regions with old seqNums to
+ * force flush them, so we don't care about these numbers messing with anything. */
+ private final Object oldestSeqNumsLock = new Object();
+
+ /**
+ * This lock makes sure only one log roll runs at the same time. Should not be taken while
+ * any other lock is held. We don't just use synchronized because that results in bogus and
+ * tedious findbugs warning when it thinks synchronized controls writer thread safety */
+ private final Object rollWriterLock = new Object();
+
+ /**
+ * Map of encoded region names to their most recent sequence/edit id in their memstore.
*/
- private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
+ private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedSeqNums =
new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+ /**
+ * Map of encoded region names to their most recent sequence/edit id in their memstore;
+ * contains the regions that are currently flushing. That way we can store two numbers for
+ * flushing and non-flushing (oldestUnflushedSeqNums) memstore for the same region.
+ */
+ private final Map<byte[], Long> oldestFlushingSeqNums = new HashMap<byte[], Long>();
private volatile boolean closed = false;
private final AtomicLong logSeqNum = new AtomicLong(0);
+ private boolean forMeta = false;
+
// The timestamp (in ms) when the log file was created.
private volatile long filenum = -1;
@@ -176,10 +203,6 @@ class FSHLog implements HLog, Syncable {
// of the default Hdfs block size.
private final long logrollsize;
- // This lock prevents starting a log roll during a cache flush.
- // synchronized is insufficient because a cache flush spans two method calls.
- private final Lock cacheFlushLock = new ReentrantLock();
-
// We synchronize on updateLock to prevent updates and to prevent a log roll
// during an update
// locked during appends
@@ -211,15 +234,15 @@ class FSHLog implements HLog, Syncable {
*
* @param fs filesystem handle
* @param root path for stored and archived hlogs
- * @param logName dir where hlogs are stored
+ * @param logDir dir where hlogs are stored
* @param conf configuration to use
* @throws IOException
*/
- public FSHLog(final FileSystem fs, final Path root, final String logName,
+ public FSHLog(final FileSystem fs, final Path root, final String logDir,
final Configuration conf)
throws IOException {
- this(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME,
- conf, null, true, null);
+ this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
+ conf, null, true, null, false);
}
/**
@@ -227,16 +250,16 @@ class FSHLog implements HLog, Syncable {
*
* @param fs filesystem handle
* @param root path for stored and archived hlogs
- * @param logName dir where hlogs are stored
- * @param oldLogName dir where hlogs are archived
+ * @param logDir dir where hlogs are stored
+ * @param oldLogDir dir where hlogs are archived
* @param conf configuration to use
* @throws IOException
*/
- public FSHLog(final FileSystem fs, final Path root, final String logName,
- final String oldLogName, final Configuration conf)
+ public FSHLog(final FileSystem fs, final Path root, final String logDir,
+ final String oldLogDir, final Configuration conf)
throws IOException {
- this(fs, root, logName, oldLogName,
- conf, null, true, null);
+ this(fs, root, logDir, oldLogDir,
+ conf, null, true, null, false);
}
/**
@@ -248,7 +271,7 @@ class FSHLog implements HLog, Syncable {
*
* @param fs filesystem handle
* @param root path for stored and archived hlogs
- * @param logName dir where hlogs are stored
+ * @param logDir dir where hlogs are stored
* @param conf configuration to use
* @param listeners Listeners on WAL events. Listeners passed here will
* be registered before we do anything else; e.g. the
@@ -258,11 +281,11 @@ class FSHLog implements HLog, Syncable {
* If prefix is null, "hlog" will be used
* @throws IOException
*/
- public FSHLog(final FileSystem fs, final Path root, final String logName,
+ public FSHLog(final FileSystem fs, final Path root, final String logDir,
final Configuration conf, final List<WALActionsListener> listeners,
final String prefix) throws IOException {
- this(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME,
- conf, listeners, true, prefix);
+ this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
+ conf, listeners, true, prefix, false);
}
/**
@@ -274,7 +297,8 @@ class FSHLog implements HLog, Syncable {
*
* @param fs filesystem handle
* @param root path to where logs and oldlogs
- * @param oldLogName path to where hlogs are archived
+ * @param logDir dir where hlogs are stored
+ * @param oldLogDir dir where hlogs are archived
* @param conf configuration to use
* @param listeners Listeners on WAL events. Listeners passed here will
* be registered before we do anything else; e.g. the
@@ -283,18 +307,20 @@ class FSHLog implements HLog, Syncable {
* @param prefix should always be hostname and port in distributed env and
* it will be URL encoded before being used.
* If prefix is null, "hlog" will be used
+ * @param forMeta if this hlog is meant for meta updates
* @throws IOException
*/
- private FSHLog(final FileSystem fs, final Path root, final String logName,
- final String oldLogName, final Configuration conf,
+ public FSHLog(final FileSystem fs, final Path root, final String logDir,
+ final String oldLogDir, final Configuration conf,
final List<WALActionsListener> listeners,
- final boolean failIfLogDirExists, final String prefix)
+ final boolean failIfLogDirExists, final String prefix, boolean forMeta)
throws IOException {
super();
this.fs = fs;
this.rootDir = root;
- this.dir = new Path(this.rootDir, logName);
- this.oldLogDir = new Path(this.rootDir, oldLogName);
+ this.dir = new Path(this.rootDir, logDir);
+ this.oldLogDir = new Path(this.rootDir, oldLogDir);
+ this.forMeta = forMeta;
this.conf = conf;
if (listeners != null) {
@@ -333,15 +359,16 @@ class FSHLog implements HLog, Syncable {
// If prefix is null||empty then just name it hlog
this.prefix = prefix == null || prefix.isEmpty() ?
"hlog" : URLEncoder.encode(prefix, "UTF8");
-
- if (failIfLogDirExists && this.fs.exists(dir)) {
+
+ boolean dirExists = false;
+ if (failIfLogDirExists && (dirExists = this.fs.exists(dir))) {
throw new IOException("Target HLog directory already exists: " + dir);
}
- if (!fs.mkdirs(dir)) {
+ if (!dirExists && !fs.mkdirs(dir)) {
throw new IOException("Unable to mkdir " + dir);
}
- if (!fs.exists(oldLogDir)) {
+ if (!fs.exists(this.oldLogDir)) {
if (!fs.mkdirs(this.oldLogDir)) {
throw new IOException("Unable to mkdir " + this.oldLogDir);
}
@@ -437,7 +464,7 @@ class FSHLog implements HLog, Syncable {
!this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
// This could spin on occasion but better the occasional spin than locking
// every increment of sequence number.
- LOG.debug("Changed sequenceid from " + logSeqNum + " to " + newvalue);
+ LOG.debug("Changed sequenceid from " + id + " to " + newvalue);
}
}
@@ -466,87 +493,77 @@ class FSHLog implements HLog, Syncable {
@Override
public byte [][] rollWriter(boolean force)
throws FailedLogCloseException, IOException {
- // Return if nothing to flush.
- if (!force && this.writer != null && this.numEntries.get() <= 0) {
- return null;
- }
- byte [][] regionsToFlush = null;
- this.cacheFlushLock.lock();
- try {
- this.logRollRunning = true;
- if (closed) {
- LOG.debug("HLog closed. Skipping rolling of writer");
- return regionsToFlush;
- }
- // Do all the preparation outside of the updateLock to block
- // as less as possible the incoming writes
- long currentFilenum = this.filenum;
- Path oldPath = null;
- if (currentFilenum > 0) {
- oldPath = computeFilename(currentFilenum);
+ synchronized (rollWriterLock) {
+ // Return if nothing to flush.
+ if (!force && this.writer != null && this.numEntries.get() <= 0) {
+ return null;
}
- this.filenum = System.currentTimeMillis();
- Path newPath = computeFilename();
-
- // Tell our listeners that a new log is about to be created
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.preLogRoll(oldPath, newPath);
+ byte [][] regionsToFlush = null;
+ try {
+ this.logRollRunning = true;
+ boolean isClosed = closed;
+ if (isClosed || !closeBarrier.beginOp()) {
+ LOG.debug("HLog " + (isClosed ? "closed" : "closing") + ". Skipping rolling of writer");
+ return regionsToFlush;
+ }
+ // Do all the preparation outside of the updateLock to block
+ // as less as possible the incoming writes
+ long currentFilenum = this.filenum;
+ Path oldPath = null;
+ if (currentFilenum > 0) {
+ //computeFilename will take care of meta hlog filename
+ oldPath = computeFilename(currentFilenum);
+ }
+ this.filenum = System.currentTimeMillis();
+ Path newPath = computeFilename();
+
+ // Tell our listeners that a new log is about to be created
+ if (!this.listeners.isEmpty()) {
+ for (WALActionsListener i : this.listeners) {
+ i.preLogRoll(oldPath, newPath);
+ }
}
- }
- FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
- // Can we get at the dfsclient outputstream? If an instance of
- // SFLW, it'll have done the necessary reflection to get at the
- // protected field name.
- FSDataOutputStream nextHdfsOut = null;
- if (nextWriter instanceof SequenceFileLogWriter) {
- nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
- }
-
- synchronized (updateLock) {
- // Clean up current writer.
- Path oldFile = cleanupCurrentWriter(currentFilenum);
- this.writer = nextWriter;
- this.hdfs_out = nextHdfsOut;
-
- LOG.info((oldFile != null?
- "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
- this.numEntries.get() +
- ", filesize=" +
- this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
- " for " + FSUtils.getPath(newPath));
- this.numEntries.set(0);
- }
- // Tell our listeners that a new log was created
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.postLogRoll(oldPath, newPath);
+ FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
+ // Can we get at the dfsclient outputstream? If an instance of
+ // SFLW, it'll have done the necessary reflection to get at the
+ // protected field name.
+ FSDataOutputStream nextHdfsOut = null;
+ if (nextWriter instanceof SequenceFileLogWriter) {
+ nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
+ }
+
+ Path oldFile = null;
+ int oldNumEntries = 0;
+ synchronized (updateLock) {
+ // Clean up current writer.
+ oldNumEntries = this.numEntries.get();
+ oldFile = cleanupCurrentWriter(currentFilenum);
+ this.writer = nextWriter;
+ this.hdfs_out = nextHdfsOut;
+ this.numEntries.set(0);
+ }
+ LOG.info("Rolled log" + (oldFile != null ? " for file=" + FSUtils.getPath(oldFile)
+ + ", entries=" + oldNumEntries + ", filesize=" + this.fs.getFileStatus(oldFile).getLen()
+ : "" ) + "; new path=" + FSUtils.getPath(newPath));
+
+ // Tell our listeners that a new log was created
+ if (!this.listeners.isEmpty()) {
+ for (WALActionsListener i : this.listeners) {
+ i.postLogRoll(oldPath, newPath);
+ }
}
- }
- // Can we delete any of the old log files?
- if (this.outputfiles.size() > 0) {
- if (this.lastSeqWritten.isEmpty()) {
- LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
- // If so, then no new writes have come in since all regions were
- // flushed (and removed from the lastSeqWritten map). Means can
- // remove all but currently open log file.
- for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
- archiveLogFile(e.getValue(), e.getKey());
- }
- this.outputfiles.clear();
- } else {
- regionsToFlush = cleanOldLogs();
+ // Can we delete any of the old log files?
+ if (getNumLogFiles() > 0) {
+ cleanOldLogs();
+ regionsToFlush = getRegionsToForceFlush();
}
- }
- } finally {
- try {
- this.logRollRunning = false;
} finally {
- this.cacheFlushLock.unlock();
+ this.logRollRunning = false;
+ closeBarrier.endOp();
}
+ return regionsToFlush;
}
- return regionsToFlush;
}
/**
@@ -561,6 +578,9 @@ class FSHLog implements HLog, Syncable {
*/
protected Writer createWriterInstance(final FileSystem fs, final Path path,
final Configuration conf) throws IOException {
+ if (forMeta) {
+ //TODO: set a higher replication for the hlog files (HBASE-6773)
+ }
return HLogFactory.createWriter(fs, path, conf);
}
@@ -571,36 +591,64 @@ class FSHLog implements HLog, Syncable {
* encoded region names to flush.
* @throws IOException
*/
- private byte [][] cleanOldLogs() throws IOException {
- Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
+ private void cleanOldLogs() throws IOException {
+ long oldestOutstandingSeqNum = Long.MAX_VALUE;
+ synchronized (oldestSeqNumsLock) {
+ Long oldestFlushing = (oldestFlushingSeqNums.size() > 0)
+ ? Collections.min(oldestFlushingSeqNums.values()) : Long.MAX_VALUE;
+ Long oldestUnflushed = (oldestUnflushedSeqNums.size() > 0)
+ ? Collections.min(oldestUnflushedSeqNums.values()) : Long.MAX_VALUE;
+ oldestOutstandingSeqNum = Math.min(oldestFlushing, oldestUnflushed);
+ }
+
// Get the set of all log files whose last sequence number is smaller than
// the oldest edit's sequence number.
TreeSet<Long> sequenceNumbers = new TreeSet<Long>(this.outputfiles.headMap(
oldestOutstandingSeqNum).keySet());
// Now remove old log files (if any)
- int logsToRemove = sequenceNumbers.size();
- if (logsToRemove > 0) {
- if (LOG.isDebugEnabled()) {
- // Find associated region; helps debugging.
- byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
- LOG.debug("Found " + logsToRemove + " hlogs to remove" +
+ if (LOG.isDebugEnabled()) {
+ if (sequenceNumbers.size() > 0) {
+ LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove" +
" out of total " + this.outputfiles.size() + ";" +
- " oldest outstanding sequenceid is " + oldestOutstandingSeqNum +
- " from region " + Bytes.toStringBinary(oldestRegion));
+ " oldest outstanding sequenceid is " + oldestOutstandingSeqNum);
}
- for (Long seq : sequenceNumbers) {
- archiveLogFile(this.outputfiles.remove(seq), seq);
+ }
+ for (Long seq : sequenceNumbers) {
+ archiveLogFile(this.outputfiles.remove(seq), seq);
+ }
+ }
+
+ /**
+ * Return regions that have edits that are equal or less than a certain sequence number.
+ * Static due to some old unit test.
+ * @param walSeqNum The sequence number to compare with.
+ * @param regionsToSeqNums Encoded region names to sequence ids
+ * @return All regions whose seqNum <= walSeqNum. Null if no regions found.
+ */
+ static byte[][] findMemstoresWithEditsEqualOrOlderThan(
+ final long walSeqNum, final Map<byte[], Long> regionsToSeqNums) {
+ List<byte[]> regions = null;
+ for (Map.Entry<byte[], Long> e : regionsToSeqNums.entrySet()) {
+ if (e.getValue().longValue() <= walSeqNum) {
+ if (regions == null) regions = new ArrayList<byte[]>();
+ regions.add(e.getKey());
}
}
+ return regions == null ? null : regions
+ .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
+ }
+ private byte[][] getRegionsToForceFlush() throws IOException {
// If too many log files, figure which regions we need to flush.
// Array is an array of encoded region names.
byte [][] regions = null;
- int logCount = this.outputfiles.size();
+ int logCount = getNumLogFiles();
if (logCount > this.maxLogs && logCount > 0) {
// This is an array of encoded region names.
- regions = HLogUtil.findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
- this.lastSeqWritten);
+ synchronized (oldestSeqNumsLock) {
+ regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
+ this.oldestUnflushedSeqNums);
+ }
if (regions != null) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < regions.length; i++) {
@@ -616,29 +664,6 @@ class FSHLog implements HLog, Syncable {
}
/*
- * @return Logs older than this id are safe to remove.
- */
- private Long getOldestOutstandingSeqNum() {
- return Collections.min(this.lastSeqWritten.values());
- }
-
- /**
- * @param oldestOutstandingSeqNum
- * @return (Encoded) name of oldest outstanding region.
- */
- private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
- byte [] oldestRegion = null;
- for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
- if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
- // Key is encoded region name.
- oldestRegion = e.getKey();
- break;
- }
- }
- return oldestRegion;
- }
-
- /*
* Cleans up current writer closing and adding to outputfiles.
* Presumes we're operating inside an updateLock scope.
* @return Path to current writer or null if none.
@@ -729,7 +754,11 @@ class FSHLog implements HLog, Syncable {
if (filenum < 0) {
throw new RuntimeException("hlog file number can't be < 0");
}
- return new Path(dir, prefix + "." + filenum);
+ String child = prefix + "." + filenum;
+ if (forMeta) {
+ child += HLog.META_HLOG_FILE_EXTN;
+ }
+ return new Path(dir, child);
}
@Override
@@ -766,33 +795,39 @@ class FSHLog implements HLog, Syncable {
@Override
public void close() throws IOException {
+ if (this.closed) {
+ return;
+ }
try {
logSyncerThread.close();
// Make sure we synced everything
logSyncerThread.join(this.optionalFlushInterval*2);
} catch (InterruptedException e) {
LOG.error("Exception while waiting for syncer thread to die", e);
+ Thread.currentThread().interrupt();
}
-
- cacheFlushLock.lock();
try {
- // Tell our listeners that the log is closing
- if (!this.listeners.isEmpty()) {
- for (WALActionsListener i : this.listeners) {
- i.logCloseRequested();
- }
+ // Prevent all further flushing and rolling.
+ closeBarrier.stopAndDrainOps();
+ } catch (InterruptedException e) {
+ LOG.error("Exception while waiting for cache flushes and log rolls", e);
+ Thread.currentThread().interrupt();
+ }
+
+ // Tell our listeners that the log is closing
+ if (!this.listeners.isEmpty()) {
+ for (WALActionsListener i : this.listeners) {
+ i.logCloseRequested();
}
- synchronized (updateLock) {
- this.closed = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("closing hlog writer in " + this.dir.toString());
- }
- if (this.writer != null) {
- this.writer.close();
- }
+ }
+ synchronized (updateLock) {
+ this.closed = true;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closing hlog writer in " + this.dir.toString());
+ }
+ if (this.writer != null) {
+ this.writer.close();
}
- } finally {
- cacheFlushLock.unlock();
}
}
@@ -824,7 +859,7 @@ class FSHLog implements HLog, Syncable {
// memstore). When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
- this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
+ this.oldestUnflushedSeqNums.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
Long.valueOf(seqNum));
doWrite(regionInfo, logKey, logEdit, htd);
txid = this.unflushedEntries.incrementAndGet();
@@ -896,7 +931,7 @@ class FSHLog implements HLog, Syncable {
// Use encoded name. Its shorter, guaranteed unique and a subset of
// actual name.
byte [] encodedRegionName = info.getEncodedNameAsBytes();
- this.lastSeqWritten.putIfAbsent(encodedRegionName, seqNum);
+ this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
doWrite(info, logKey, edits, htd);
this.numEntries.incrementAndGet();
@@ -1028,7 +1063,11 @@ class FSHLog implements HLog, Syncable {
Writer tempWriter;
synchronized (this.updateLock) {
if (this.closed) return;
- tempWriter = this.writer; // guaranteed non-null
+ // Guaranteed non-null.
+ // Note that parallel sync can close tempWriter.
+ // The current method of dealing with this is to catch exceptions.
+ // See HBASE-4387, HBASE-5623, HBASE-7329.
+ tempWriter = this.writer;
}
// if the transaction that we are interested in is already
// synced, then return immediately.
@@ -1064,9 +1103,11 @@ class FSHLog implements HLog, Syncable {
}
try {
tempWriter.sync();
- } catch(IOException io) {
+ } catch(IOException ex) {
synchronized (this.updateLock) {
// HBASE-4387, HBASE-5623, retry with updateLock held
+ // TODO: we don't actually need to do it for concurrent close - what is the point
+ // of syncing new unrelated writer? Keep behavior for now.
tempWriter = this.writer;
tempWriter.sync();
}
@@ -1074,6 +1115,9 @@ class FSHLog implements HLog, Syncable {
this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
+ // TODO: preserving the old behavior for now, but this check is strange. It's not
+ // protected by any locks here, so for all we know rolling locks might start
+ // as soon as we enter the "if". Is this best-effort optimization check?
if (!this.logRollRunning) {
checkLowReplication();
try {
@@ -1236,107 +1280,61 @@ class FSHLog implements HLog, Syncable {
return outputfiles.size();
}
- private byte[] getSnapshotName(byte[] encodedRegionName) {
- byte snp[] = new byte[encodedRegionName.length + 3];
- // an encoded region name has only hex digits. s, n or p are not hex
- // and therefore snapshot-names will never collide with
- // encoded-region-names
- snp[0] = 's'; snp[1] = 'n'; snp[2] = 'p';
- for (int i = 0; i < encodedRegionName.length; i++) {
- snp[i+3] = encodedRegionName[i];
- }
- return snp;
- }
-
@Override
- public long startCacheFlush(final byte[] encodedRegionName) {
- this.cacheFlushLock.lock();
- Long seq = this.lastSeqWritten.remove(encodedRegionName);
- // seq is the lsn of the oldest edit associated with this region. If a
- // snapshot already exists - because the last flush failed - then seq will
- // be the lsn of the oldest edit in the snapshot
- if (seq != null) {
- // keeping the earliest sequence number of the snapshot in
- // lastSeqWritten maintains the correctness of
- // getOldestOutstandingSeqNum(). But it doesn't matter really because
- // everything is being done inside of cacheFlush lock.
- Long oldseq =
- lastSeqWritten.put(getSnapshotName(encodedRegionName), seq);
- if (oldseq != null) {
- LOG.error("Logic Error Snapshot seq id from earlier flush still" +
- " present! for region " + Bytes.toString(encodedRegionName) +
- " overwritten oldseq=" + oldseq + "with new seq=" + seq);
- Runtime.getRuntime().halt(1);
- }
+ public Long startCacheFlush(final byte[] encodedRegionName) {
+ Long oldRegionSeqNum = null;
+ if (!closeBarrier.beginOp()) {
+ return null;
+ }
+ synchronized (oldestSeqNumsLock) {
+ oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName);
+ if (oldRegionSeqNum != null) {
+ Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum);
+ assert oldValue == null : "Flushing map not cleaned up for "
+ + Bytes.toString(encodedRegionName);
+ }
+ }
+ if (oldRegionSeqNum == null) {
+ // TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either
+ // the region is already flushing (which would make this call invalid), or there
+ // were no appends after last flush, so why are we starting flush? Maybe we should
+ // assert not null, and switch to "long" everywhere. Less rigorous, but safer,
+ // alternative is telling the caller to stop. For now preserve old logic.
+ LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
+ + Bytes.toString(encodedRegionName) + "]");
}
return obtainSeqNum();
}
@Override
- public void completeCacheFlush(final byte [] encodedRegionName,
- final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
- throws IOException {
- try {
- if (this.closed) {
- return;
- }
- long txid = 0;
- synchronized (updateLock) {
- long now = EnvironmentEdgeManager.currentTimeMillis();
- WALEdit edit = completeCacheFlushLogEdit();
- HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
- System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
- logSyncerThread.append(new Entry(key, edit));
- txid = this.unflushedEntries.incrementAndGet();
- long took = EnvironmentEdgeManager.currentTimeMillis() - now;
- long len = 0;
- for (KeyValue kv : edit.getKeyValues()) {
- len += kv.getLength();
- }
- this.metrics.finishAppend(took, len);
- this.numEntries.incrementAndGet();
- }
- // sync txn to file system
- this.sync(txid);
-
- } finally {
- // updateLock not needed for removing snapshot's entry
- // Cleaning up of lastSeqWritten is in the finally clause because we
- // don't want to confuse getOldestOutstandingSeqNum()
- this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
- this.cacheFlushLock.unlock();
+ public void completeCacheFlush(final byte [] encodedRegionName)
+ {
+ synchronized (oldestSeqNumsLock) {
+ this.oldestFlushingSeqNums.remove(encodedRegionName);
}
- }
-
- private WALEdit completeCacheFlushLogEdit() {
- KeyValue kv = new KeyValue(HLog.METAROW, HLog.METAFAMILY, null,
- System.currentTimeMillis(), HLogUtil.COMPLETE_CACHE_FLUSH);
- WALEdit e = new WALEdit();
- e.add(kv);
- return e;
+ closeBarrier.endOp();
}
@Override
public void abortCacheFlush(byte[] encodedRegionName) {
- Long snapshot_seq =
- this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
- if (snapshot_seq != null) {
- // updateLock not necessary because we are racing against
- // lastSeqWritten.putIfAbsent() in append() and we will always win
- // before releasing cacheFlushLock make sure that the region's entry in
- // lastSeqWritten points to the earliest edit in the region
- Long current_memstore_earliest_seq =
- this.lastSeqWritten.put(encodedRegionName, snapshot_seq);
- if (current_memstore_earliest_seq != null &&
- (current_memstore_earliest_seq.longValue() <=
- snapshot_seq.longValue())) {
- LOG.error("Logic Error region " + Bytes.toString(encodedRegionName) +
- "acquired edits out of order current memstore seq=" +
- current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq);
- Runtime.getRuntime().halt(1);
- }
+ Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
+ synchronized (oldestSeqNumsLock) {
+ seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName);
+ if (seqNumBeforeFlushStarts != null) {
+ currentSeqNum =
+ this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts);
+ }
+ }
+ closeBarrier.endOp();
+ if ((currentSeqNum != null)
+ && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
+ String errorStr = "Region " + Bytes.toString(encodedRegionName) +
+ "acquired edits out of order current memstore seq=" + currentSeqNum
+ + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
+ LOG.error(errorStr);
+ assert false : errorStr;
+ Runtime.getRuntime().halt(1);
}
- this.cacheFlushLock.unlock();
}
@Override
@@ -1401,6 +1399,12 @@ class FSHLog implements HLog, Syncable {
return lastDeferredTxid > syncedTillHere;
}
+ @Override
+ public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
+ Long result = oldestUnflushedSeqNums.get(encodedRegionName);
+ return result == null ? HConstants.NO_SEQNUM : result.longValue();
+ }
+
/**
* Pass one or more log file names and it will either dump out a text version
* on <code>stdout</code> or split the specified log files.
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Wed Feb 13 20:58:23 2013
@@ -50,6 +50,8 @@ public interface HLog {
/** File Extension used while splitting an HLog into regions (HBASE-2312) */
public static final String SPLITTING_EXT = "-splitting";
public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
+ /** The META region's HLog filename extension */
+ public static final String META_HLOG_FILE_EXTN = ".meta";
/*
* Name of directory that holds recovered edits written by the wal log
@@ -71,6 +73,7 @@ public interface HLog {
void seek(long pos) throws IOException;
long getPosition() throws IOException;
+ void reset() throws IOException;
}
public interface Writer {
@@ -159,14 +162,14 @@ public interface HLog {
}
}
- /*
+ /**
* registers WALActionsListener
*
* @param listener
*/
public void registerWALActionsListener(final WALActionsListener listener);
- /*
+ /**
* unregisters WALActionsListener
*
* @param listener
@@ -197,18 +200,10 @@ public interface HLog {
/**
* Roll the log writer. That is, start writing log messages to a new file.
*
- * Because a log cannot be rolled during a cache flush, and a cache flush
- * spans two method calls, a special lock needs to be obtained so that a cache
- * flush cannot start when the log is being rolled and the log cannot be
- * rolled during a cache flush.
- *
* <p>
- * Note that this method cannot be synchronized because it is possible that
- * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
- * start which would obtain the lock on this but block on obtaining the
- * cacheFlushLock and then completeCacheFlush could be called which would wait
- * for the lock on this and consequently never release the cacheFlushLock
- *
+ * The implementation is synchronized in order to make sure there's one rollWriter
+ * running at any given time.
+ *
* @return If lots of logs, flush the returned regions so next time through we
* can clean logs. Returns null if nothing to flush. Names are actual
* region names as returned by {@link HRegionInfo#getEncodedName()}
@@ -220,17 +215,9 @@ public interface HLog {
/**
* Roll the log writer. That is, start writing log messages to a new file.
*
- * Because a log cannot be rolled during a cache flush, and a cache flush
- * spans two method calls, a special lock needs to be obtained so that a cache
- * flush cannot start when the log is being rolled and the log cannot be
- * rolled during a cache flush.
- *
* <p>
- * Note that this method cannot be synchronized because it is possible that
- * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
- * start which would obtain the lock on this but block on obtaining the
- * cacheFlushLock and then completeCacheFlush could be called which would wait
- * for the lock on this and consequently never release the cacheFlushLock
+ * The implementation is synchronized in order to make sure there's one rollWriter
+ * running at any given time.
*
* @param force
* If true, force creation of a new writer even if no entries have
@@ -334,53 +321,33 @@ public interface HLog {
public long obtainSeqNum();
/**
- * By acquiring a log sequence ID, we can allow log messages to continue while
- * we flush the cache.
- *
- * Acquire a lock so that we do not roll the log between the start and
- * completion of a cache-flush. Otherwise the log-seq-id for the flush will
- * not appear in the correct logfile.
- *
- * Ensuring that flushes and log-rolls don't happen concurrently also allows
- * us to temporarily put a log-seq-number in lastSeqWritten against the region
- * being flushed that might not be the earliest in-memory log-seq-number for
- * that region. By the time the flush is completed or aborted and before the
- * cacheFlushLock is released it is ensured that lastSeqWritten again has the
- * oldest in-memory edit's lsn for the region that was being flushed.
- *
- * In this method, by removing the entry in lastSeqWritten for the region
- * being flushed we ensure that the next edit inserted in this region will be
- * correctly recorded in
- * {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)} The
- * lsn of the earliest in-memory lsn - which is now in the memstore snapshot -
- * is saved temporarily in the lastSeqWritten map while the flush is active.
- *
- * @return sequence ID to pass
- * {@link #completeCacheFlush(byte[], byte[], long, boolean)} (byte[],
- * byte[], long)}
- * @see #completeCacheFlush(byte[], byte[], long, boolean)
- * @see #abortCacheFlush(byte[])
+ * WAL keeps track of the sequence numbers that were not yet flushed from memstores
+ * in order to be able to do cleanup. This method tells WAL that some region is about
+ * to flush memstore.
+ *
+ * We stash the oldest seqNum for the region, and let the the next edit inserted in this
+ * region be recorded in {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)}
+ * as new oldest seqnum. In case of flush being aborted, we put the stashed value back;
+ * in case of flush succeeding, the seqNum of that first edit after start becomes the
+ * valid oldest seqNum for this region.
+ *
+ * @return current seqNum, to pass on to flushers (who will put it into the metadata of
+ * the resulting file as an upper-bound seqNum for that file), or NULL if flush
+ * should not be started.
*/
- public long startCacheFlush(final byte[] encodedRegionName);
+ public Long startCacheFlush(final byte[] encodedRegionName);
/**
- * Complete the cache flush
- *
- * Protected by cacheFlushLock
- *
- * @param encodedRegionName
- * @param tableName
- * @param logSeqId
- * @throws IOException
+ * Complete the cache flush.
+ * @param encodedRegionName Encoded region name.
*/
- public void completeCacheFlush(final byte[] encodedRegionName,
- final byte[] tableName, final long logSeqId, final boolean isMetaRegion)
- throws IOException;
+ public void completeCacheFlush(final byte[] encodedRegionName);
/**
* Abort a cache flush. Call if the flush fails. Note that the only recovery
* for an aborted flush currently is a restart of the regionserver so the
- * snapshot content dropped by the failure gets restored to the memstore.
+ * snapshot content dropped by the failure gets restored to the memstore.v
+ * @param encodedRegionName Encoded region name.
*/
public void abortCacheFlush(byte[] encodedRegionName);
@@ -395,4 +362,11 @@ public interface HLog {
* @return lowReplicationRollEnabled
*/
public boolean isLowReplicationRollEnabled();
+
+ /** Gets the earliest sequence number in the memstore for this particular region.
+ * This can serve as best-effort "recent" WAL number for this region.
+ * @param encodedRegionName The region to get the number for.
+ * @return The number if present, HConstants.NO_SEQNUM if absent.
+ */
+ public long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java Wed Feb 13 20:58:23 2013
@@ -26,9 +26,9 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
@@ -50,6 +50,13 @@ public class HLogFactory {
final String prefix) throws IOException {
return new FSHLog(fs, root, logName, conf, listeners, prefix);
}
+
+ public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName,
+ final Configuration conf, final List<WALActionsListener> listeners,
+ final String prefix) throws IOException {
+ return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME,
+ conf, listeners, false, prefix, true);
+ }
/*
* WAL Reader
@@ -62,7 +69,9 @@ public class HLogFactory {
}
/**
- * Create a reader for the WAL.
+ * Create a reader for the WAL. If you are reading from a file that's being written to
+ * and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method
+ * then just seek back to the last known good position.
* @return A WAL reader. Close when done with it.
* @throws IOException
*/
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Wed Feb 13 20:58:23 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
@@ -302,6 +303,11 @@ public class HLogSplitter {
+ ": " + logPath + ", length=" + logLength);
Reader in = null;
try {
+ //actually, for meta-only hlogs, we don't need to go thru the process
+ //of parsing and segregating by regions since all the logs are for
+ //meta only. However, there is a sequence number that can be obtained
+ //only by parsing.. so we parse for all files currently
+ //TODO: optimize this part somehow
in = getReader(fs, log, conf, skipErrors);
if (in != null) {
parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java Wed Feb 13 20:58:23 2013
@@ -46,8 +46,6 @@ import org.apache.hadoop.hbase.util.Byte
public class HLogUtil {
static final Log LOG = LogFactory.getLog(HLogUtil.class);
- static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH");
-
/**
* @param family
* @return true if the column is a meta column
@@ -76,7 +74,8 @@ public class HLogUtil {
/**
* Pattern used to validate a HLog file name
*/
- private static final Pattern pattern = Pattern.compile(".*\\.\\d*");
+ private static final Pattern pattern =
+ Pattern.compile(".*\\.\\d*("+HLog.META_HLOG_FILE_EXTN+")*");
/**
* @param filename
@@ -243,32 +242,6 @@ public class HLogUtil {
}
/**
- * Return regions (memstores) that have edits that are equal or less than the
- * passed <code>oldestWALseqid</code>.
- *
- * @param oldestWALseqid
- * @param regionsToSeqids
- * Encoded region names to sequence ids
- * @return All regions whose seqid is < than <code>oldestWALseqid</code> (Not
- * necessarily in order). Null if no regions found.
- */
- static byte[][] findMemstoresWithEditsEqualOrOlderThan(
- final long oldestWALseqid, final Map<byte[], Long> regionsToSeqids) {
- // This method is static so it can be unit tested the easier.
- List<byte[]> regions = null;
- for (Map.Entry<byte[], Long> e : regionsToSeqids.entrySet()) {
- if (e.getValue().longValue() <= oldestWALseqid) {
- if (regions == null)
- regions = new ArrayList<byte[]>();
- // Key is encoded region name.
- regions.add(e.getKey());
- }
- }
- return regions == null ? null : regions
- .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
- }
-
- /**
* Returns sorted set of edit files made by wal-log splitter, excluding files
* with '.temp' suffix.
*
@@ -312,4 +285,11 @@ public class HLogUtil {
}
return filesSorted;
}
+
+ public static boolean isMetaFile(Path p) {
+ if (p.getName().endsWith(HLog.META_HLOG_FILE_EXTN)) {
+ return true;
+ }
+ return false;
+ }
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Wed Feb 13 20:58:23 2013
@@ -140,15 +140,17 @@ public class SequenceFileLogReader imple
Configuration conf;
WALReader reader;
+ FileSystem fs;
// Needed logging exceptions
Path path;
int edit = 0;
long entryStart = 0;
+ boolean emptyCompressionContext = true;
/**
* Compression context to use reading. Can be null if no compression.
*/
- private CompressionContext compressionContext = null;
+ protected CompressionContext compressionContext = null;
protected Class<? extends HLogKey> keyClass;
@@ -174,6 +176,7 @@ public class SequenceFileLogReader imple
this.conf = conf;
this.path = path;
reader = new WALReader(fs, path, conf);
+ this.fs = fs;
// If compression is enabled, new dictionaries are created here.
boolean compression = reader.isWALCompressionEnabled();
@@ -238,11 +241,22 @@ public class SequenceFileLogReader imple
throw addFileInfoToException(ioe);
}
edit++;
+ if (compressionContext != null && emptyCompressionContext) {
+ emptyCompressionContext = false;
+ }
return b? e: null;
}
@Override
public void seek(long pos) throws IOException {
+ if (compressionContext != null && emptyCompressionContext) {
+ while (next() != null) {
+ if (getPosition() == pos) {
+ emptyCompressionContext = false;
+ break;
+ }
+ }
+ }
try {
reader.seek(pos);
} catch (IOException ioe) {
@@ -252,7 +266,7 @@ public class SequenceFileLogReader imple
@Override
public long getPosition() throws IOException {
- return reader.getPosition();
+ return reader != null ? reader.getPosition() : 0;
}
protected IOException addFileInfoToException(final IOException ioe)
@@ -287,4 +301,11 @@ public class SequenceFileLogReader imple
return ioe;
}
+
+ @Override
+ public void reset() throws IOException {
+ // Resetting the reader lets us see newly added data if the file is being written to
+ // We also keep the same compressionContext which was previously populated for this file
+ reader = new WALReader(fs, path, conf);
+ }
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java Wed Feb 13 20:58:23 2013
@@ -40,7 +40,7 @@ import org.apache.hadoop.io.compress.Com
import org.apache.hadoop.io.compress.DefaultCodec;
/**
- * Implementation of {@link FSHLog.Writer} that delegates to
+ * Implementation of {@link HLog.Writer} that delegates to
* SequenceFile.Writer.
*/
@InterfaceAudience.Private
@@ -244,7 +244,12 @@ public class SequenceFileLogWriter imple
@Override
public void sync() throws IOException {
- this.writer.syncFs();
+ try {
+ this.writer.syncFs();
+ } catch (NullPointerException npe) {
+ // Concurrent close...
+ throw new IOException(npe);
+ }
}
@Override
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Wed Feb 13 20:58:23 2013
@@ -90,7 +90,7 @@ public class ReplicationPeer implements
}
private void readPeerStateZnode() throws DeserializationException {
- this.peerEnabled.set(ReplicationZookeeper.isPeerEnabled(this.peerStateTracker.getData(false)));
+ this.peerEnabled.set(ReplicationZookeeper.isStateEnabled(this.peerStateTracker.getData(false)));
}
/**
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Wed Feb 13 20:58:23 2013
@@ -43,13 +43,12 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -87,7 +86,7 @@ import com.google.protobuf.InvalidProtoc
* </pre>
*/
@InterfaceAudience.Private
-public class ReplicationZookeeper implements Closeable{
+public class ReplicationZookeeper implements Closeable {
private static final Log LOG =
LogFactory.getLog(ReplicationZookeeper.class);
// Name of znode we use to lock when failover
@@ -111,24 +110,24 @@ public class ReplicationZookeeper implem
// peers' id node; e.g. /hbase/replication/peers/PEER_ID/peer-state
private String peerStateNodeName;
private final Configuration conf;
- // Is this cluster replicating at the moment?
- private AtomicBoolean replicating;
// The key to our own cluster
private String ourClusterKey;
// Abortable
private Abortable abortable;
- private ReplicationStatusTracker statusTracker;
+ private final ReplicationStateInterface replicationState;
/**
* ZNode content if enabled state.
*/
// Public so it can be seen by test code.
- public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
+ public static final byte[] ENABLED_ZNODE_BYTES =
+ toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
/**
* ZNode content if disabled state.
*/
- static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
+ static final byte[] DISABLED_ZNODE_BYTES =
+ toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
/**
* Constructor used by clients of replication (like master and HBase clients)
@@ -140,8 +139,9 @@ public class ReplicationZookeeper implem
final ZooKeeperWatcher zk) throws KeeperException {
this.conf = conf;
this.zookeeper = zk;
- this.replicating = new AtomicBoolean();
setZNodes(abortable);
+ this.replicationState =
+ new ReplicationStateImpl(this.zookeeper, getRepStateNode(), abortable, new AtomicBoolean());
}
/**
@@ -157,9 +157,10 @@ public class ReplicationZookeeper implem
this.abortable = server;
this.zookeeper = server.getZooKeeper();
this.conf = server.getConfiguration();
- this.replicating = replicating;
setZNodes(server);
+ this.replicationState =
+ new ReplicationStateImpl(this.zookeeper, getRepStateNode(), server, replicating);
this.peerClusters = new HashMap<String, ReplicationPeer>();
ZKUtil.createWithParents(this.zookeeper,
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
@@ -180,11 +181,6 @@ public class ReplicationZookeeper implem
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
-
- // Set a tracker on replicationStateNodeNode
- this.statusTracker = new ReplicationStatusTracker(this.zookeeper, abortable);
- statusTracker.start();
- readReplicationStateZnode();
}
private void connectExistingPeers() throws IOException, KeeperException {
@@ -366,18 +362,6 @@ public class ReplicationZookeeper implem
}
/**
- * Set the new replication state for this cluster
- * @param newState
- */
- public void setReplicating(boolean newState) throws KeeperException {
- ZKUtil.createWithParents(this.zookeeper,
- ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
- byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
- ZKUtil.setData(this.zookeeper,
- ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName), stateBytes);
- }
-
- /**
* Remove the peer from zookeeper. which will trigger the watchers on every
* region server and close their sources
* @param id
@@ -411,8 +395,12 @@ public class ReplicationZookeeper implem
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
toByteArray(clusterKey));
+ // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+ // peer-state znode. This happens while adding a peer.
+ // The peer state data is set as "ENABLED" by default.
+ ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
+ ENABLED_ZNODE_BYTES);
// A peer is enabled by default
- ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id), ENABLED_ZNODE_BYTES);
} catch (KeeperException e) {
throw new IOException("Unable to add peer", e);
}
@@ -637,40 +625,27 @@ public class ReplicationZookeeper implem
return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
}
- /**
- * This reads the state znode for replication and sets the atomic boolean
- */
- private void readReplicationStateZnode() {
- try {
- this.replicating.set(getReplication());
- LOG.info("Replication is now " + (this.replicating.get()?
- "started" : "stopped"));
- } catch (KeeperException e) {
- this.abortable.abort("Failed getting data on from " + getRepStateNode(), e);
- }
+ private String getRepStateNode() {
+ return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
}
/**
- * Get the replication status of this cluster. If the state znode doesn't
- * exist it will also create it and set it true.
+ * Get the replication status of this cluster. If the state znode doesn't exist it will also
+ * create it and set it true.
* @return returns true when it's enabled, else false
* @throws KeeperException
*/
public boolean getReplication() throws KeeperException {
- byte [] data = this.statusTracker.getData(false);
- if (data == null || data.length == 0) {
- setReplicating(true);
- return true;
- }
- try {
- return isPeerEnabled(data);
- } catch (DeserializationException e) {
- throw ZKUtil.convert(e);
- }
+ return this.replicationState.getState();
}
- private String getRepStateNode() {
- return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
+ /**
+ * Set the new replication state for this cluster
+ * @param newState
+ * @throws KeeperException
+ */
+ public void setReplication(boolean newState) throws KeeperException {
+ this.replicationState.setState(newState);
}
/**
@@ -817,6 +792,58 @@ public class ReplicationZookeeper implem
}
/**
+ * It "atomically" copies all the hlogs queues from another region server and returns them all
+ * sorted per peer cluster (appended with the dead server's znode).
+ * @param znode
+ * @return HLog queues sorted per peer cluster
+ */
+ public SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
+ SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
+ String deadRSZnodePath = ZKUtil.joinZNode(rsZNode, znode);// hbase/replication/rs/deadrs
+ List<String> peerIdsToProcess = null;
+ List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+ try {
+ peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
+ if (peerIdsToProcess == null) return null; // node already processed
+ for (String peerId : peerIdsToProcess) {
+ String newPeerId = peerId + "-" + znode;
+ String newPeerZnode = ZKUtil.joinZNode(this.rsServerNameZnode, newPeerId);
+ // check the logs queue for the old peer cluster
+ String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
+ List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
+ if (hlogs == null || hlogs.size() == 0) continue; // empty log queue.
+ // create the new cluster znode
+ SortedSet<String> logQueue = new TreeSet<String>();
+ queues.put(newPeerId, logQueue);
+ ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
+ listOfOps.add(op);
+ // get the offset of the logs and set it to new znodes
+ for (String hlog : hlogs) {
+ String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog);
+ byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode);
+ LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset));
+ String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog);
+ listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
+ // add ops for deleting
+ listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode));
+ logQueue.add(hlog);
+ }
+ // add delete op for peer
+ listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+ }
+ // add delete op for dead rs
+ listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
+ LOG.debug(" The multi list size is: " + listOfOps.size());
+ ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
+ LOG.info("Atomically moved the dead regionserver logs. ");
+ } catch (KeeperException e) {
+ // Multi call failed; it looks like some other regionserver took away the logs.
+ LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
+ }
+ return queues;
+ }
+
+ /**
* This methods copies all the hlogs queues from another region server
* and returns them all sorted per peer cluster (appended with the dead
* server's znode)
@@ -1021,6 +1048,15 @@ public class ReplicationZookeeper implem
public Map<String, ReplicationPeer> getPeerClusters() {
return this.peerClusters;
}
+
+ /**
+ * Determine if a ZK path points to a peer node.
+ * @param path path to be checked
+ * @return true if the path points to a peer node, otherwise false
+ */
+ public boolean isPeerPath(String path) {
+ return path.split("/").length == peersZNode.split("/").length + 1;
+ }
/**
* Extracts the znode name of a peer cluster from a ZK path
@@ -1051,8 +1087,7 @@ public class ReplicationZookeeper implem
@Override
public void close() throws IOException {
- if (statusTracker != null)
- statusTracker.stop();
+ if (replicationState != null) replicationState.close();
}
/**
@@ -1067,7 +1102,10 @@ public class ReplicationZookeeper implem
static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
throws NodeExistsException, KeeperException {
if (ZKUtil.checkExists(zookeeper, path) == -1) {
- ZKUtil.createAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES);
+ // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+ // peer-state znode. This happens while adding a peer.
+ // The peer state data is set as "ENABLED" by default.
+ ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES);
return true;
}
return false;
@@ -1079,26 +1117,8 @@ public class ReplicationZookeeper implem
* serialized ENABLED state.
* @throws DeserializationException
*/
- static boolean isPeerEnabled(final byte[] bytes) throws DeserializationException {
+ static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
}
-
- /**
- * Tracker for status of the replication
- */
- public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
- public ReplicationStatusTracker(ZooKeeperWatcher watcher,
- Abortable abortable) {
- super(watcher, getRepStateNode(), abortable);
- }
-
- @Override
- public synchronized void nodeDataChanged(String path) {
- if (path.equals(node)) {
- super.nodeDataChanged(path);
- readReplicationStateZnode();
- }
- }
- }
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Wed Feb 13 20:58:23 2013
@@ -148,7 +148,7 @@ public class ReplicationLogCleaner exten
this.zkHelper.getZookeeperWatcher().close();
}
// Not sure why we're deleting a connection that we never acquired or used
- HConnectionManager.deleteConnection(this.getConf(), true);
+ HConnectionManager.deleteConnection(this.getConf());
}
@Override