You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2007/09/15 23:11:48 UTC
svn commit: r575982 - in /lucene/hadoop/trunk/src/contrib/hbase: ./
src/java/org/apache/hadoop/hbase/
Author: stack
Date: Sat Sep 15 14:11:47 2007
New Revision: 575982
URL: http://svn.apache.org/viewvc?rev=575982&view=rev
Log:
HADOOP-1903 Possible data loss if Exception happens between snapshot and flush
to disk.
Added:
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java
Modified:
lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=575982&r1=575981&r2=575982&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sat Sep 15 14:11:47 2007
@@ -48,6 +48,8 @@
HADOOP-1870 Once file system failure has been detected, don't check it again
and get on with shutting down the hbase cluster.
HADOOP-1888 NullPointerException in HMemcacheScanner
+ HADOOP-1903 Possible data loss if Exception happens between snapshot and flush
+ to disk.
IMPROVEMENTS
HADOOP-1737 Make HColumnDescriptor data publically members settable
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java?rev=575982&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java Sat Sep 15 14:11:47 2007
@@ -0,0 +1,32 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+import java.io.IOException;
+
+
+/**
+ * Thrown during flush if the possibility snapshot content was not properly
+ * persisted into store files. Response should include replay of hlog content.
+ */
+public class DroppedSnapshotException extends IOException {
+ public DroppedSnapshotException(String msg) {
+ super(msg);
+ }
+
+ public DroppedSnapshotException() {
+ super();
+ }
+}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=575982&r1=575981&r2=575982&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Sat Sep 15 14:11:47 2007
@@ -399,6 +399,7 @@
* the flush will not appear in the correct logfile.
* @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)}
* @see #completeCacheFlush(Text, Text, long)
+ * @see #abortCacheFlush()
*/
synchronized long startCacheFlush() {
while (this.insideCacheFlush) {
@@ -422,7 +423,7 @@
synchronized void completeCacheFlush(final Text regionName,
final Text tableName, final long logSeqId)
throws IOException {
- if(closed) {
+ if(this.closed) {
return;
}
@@ -430,17 +431,32 @@
throw new IOException("Impossible situation: inside " +
"completeCacheFlush(), but 'insideCacheFlush' flag is false");
}
-
- writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
+ HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId);
+ this.writer.append(key,
new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
System.currentTimeMillis()));
- numEntries.getAndIncrement();
+ this.numEntries.getAndIncrement();
// Remember the most-recent flush for each region.
// This is used to delete obsolete log files.
- regionToLastFlush.put(regionName, logSeqId);
+ this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId));
- insideCacheFlush = false;
+ cleanup();
+ }
+
+ /**
+ * Abort a cache flush.
+ * This method will clear waits on {@link #insideCacheFlush}. Call if the
+ * flush fails. Note that the only recovery for an aborted flush currently
+ * is a restart of the regionserver so the snapshot content dropped by the
+ * failure gets restored to the memcache.
+ */
+ synchronized void abortCacheFlush() {
+ cleanup();
+ }
+
+ private synchronized void cleanup() {
+ this.insideCacheFlush = false;
notifyAll();
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=575982&r1=575981&r2=575982&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Sat Sep 15 14:11:47 2007
@@ -657,7 +657,7 @@
if (checkFileSystem()) {
// If filesystem is OK, is the exception a ConnectionException?
// If so, mark the server as down. No point scanning either
- // if no server to put meta region on.
+ // if no server to put meta region on. TODO.
if (e instanceof ConnectException) {
LOG.debug("Region hosting server is gone.");
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?rev=575982&r1=575981&r2=575982&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Sat Sep 15 14:11:47 2007
@@ -101,6 +101,7 @@
}
Snapshot retval =
new Snapshot(memcache, Long.valueOf(log.startCacheFlush()));
+ // From here on, any failure is catastrophic requiring replay of hlog
this.snapshot = memcache;
history.add(memcache);
memcache = new TreeMap<HStoreKey, byte []>();
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=575982&r1=575981&r2=575982&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Sat Sep 15 14:11:47 2007
@@ -721,6 +721,9 @@
/**
* Each HRegion is given a periodic chance to flush the cache, which it should
* only take if there have been a lot of uncommitted writes.
+ * @throws IOException
+ * @throws DroppedSnapshotException Thrown when replay of hlog is required
+ * because a Snapshot was not properly persisted.
*/
void optionallyFlush() throws IOException {
if(this.memcache.getSize() > this.memcacheFlushSize) {
@@ -754,6 +757,9 @@
* close() the HRegion shortly, so the HRegion should not take on any new and
* potentially long-lasting disk operations. This flush() should be the final
* pre-close() disk operation.
+ * @throws IOException
+ * @throws DroppedSnapshotException Thrown when replay of hlog is required
+ * because a Snapshot was not properly persisted.
*/
void flushcache(boolean disableFutureWrites)
throws IOException {
@@ -815,6 +821,9 @@
* routes.
*
* <p> This method may block for some time.
+ * @throws IOException
+ * @throws DroppedSnapshotException Thrown when replay of hlog is required
+ * because a Snapshot was not properly persisted.
*/
void internalFlushcache() throws IOException {
long startTime = -1;
@@ -833,13 +842,19 @@
//
// When execution returns from snapshotMemcacheForLog() with a non-NULL
// value, the HMemcache will have a snapshot object stored that must be
- // explicitly cleaned up using a call to deleteSnapshot().
+ // explicitly cleaned up using a call to deleteSnapshot() or by calling
+ // abort.
//
HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
if(retval == null || retval.memcacheSnapshot == null) {
LOG.debug("Finished memcache flush; empty snapshot");
return;
}
+
+ // Any failure from here on out will be catastrophic requiring server
+ // restart so hlog content can be replayed and put back into the memcache.
+ // Otherwise, the snapshot content while backed up in the hlog, it will not
+ // be part of the current running servers state.
try {
long logCacheFlushId = retval.sequenceId;
if(LOG.isDebugEnabled()) {
@@ -852,7 +867,7 @@
// A. Flush memcache to all the HStores.
// Keep running vector of all store files that includes both old and the
// just-made new flush store file.
- for(HStore hstore: stores.values()) {
+ for (HStore hstore: stores.values()) {
hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
}
@@ -860,17 +875,18 @@
// This tells future readers that the HStores were emitted correctly,
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
-
- log.completeCacheFlush(this.regionInfo.regionName,
- regionInfo.tableDesc.getName(), logCacheFlushId);
+ this.log.completeCacheFlush(this.regionInfo.regionName,
+ regionInfo.tableDesc.getName(), logCacheFlushId);
} catch (IOException e) {
- LOG.fatal("Interrupted while flushing. Edits lost. FIX! HADOOP-1903", e);
- log.abort();
- throw e;
+ // An exception here means that the snapshot was not persisted.
+ // The hlog needs to be replayed so its content is restored to memcache.
+ // Currently, only a server restart will do this.
+ this.log.abortCacheFlush();
+ throw new DroppedSnapshotException(e.getMessage());
} finally {
// C. Delete the now-irrelevant memcache snapshot; its contents have been
- // dumped to disk-based HStores.
- memcache.deleteSnapshot();
+ // dumped to disk-based HStores or, if error, clear aborted snapshot.
+ this.memcache.deleteSnapshot();
}
// D. Finally notify anyone waiting on memcache to clear:
@@ -1386,7 +1402,7 @@
}
/*
- * Add updates to the log and add values to the memcache.
+ * Add updates first to the hlog and then add values to memcache.
* Warning: Assumption is caller has lock on passed in row.
* @param row Row to update.
* @param timestamp Timestamp to record the updates against
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=575982&r1=575981&r2=575982&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Sat Sep 15 14:11:47 2007
@@ -292,6 +292,16 @@
for(HRegion cur: nonClosedRegionsToFlush) {
try {
cur.optionallyFlush();
+ } catch (DroppedSnapshotException e) {
+ // Cache flush can fail in a few places. If it fails in a critical
+ // section, we get a DroppedSnapshotException and a replay of hlog
+ // is required. Currently the only way to do this is a restart of
+ // the server.
+ LOG.fatal("Replay of hlog required. Forcing server restart", e);
+ if (!checkFileSystem()) {
+ break;
+ }
+ HRegionServer.this.stop();
} catch (IOException iex) {
LOG.error("Cache flush failed",
RemoteExceptionHandler.checkIOException(iex));
@@ -442,11 +452,11 @@
/**
* Sets a flag that will cause all the HRegionServer threads to shut down
- * in an orderly fashion.
- * <p>FOR DEBUGGING ONLY
+ * in an orderly fashion. Used by unit tests and called by {@link Flusher}
+ * if it judges server needs to be restarted.
*/
synchronized void stop() {
- stopRequested.set(true);
+ this.stopRequested.set(true);
notifyAll(); // Wakes run() if it is sleeping
}
@@ -457,7 +467,7 @@
* from under hbase or we OOME.
*/
synchronized void abort() {
- abortRequested = true;
+ this.abortRequested = true;
stop();
}
@@ -621,7 +631,7 @@
if (this.fsOk) {
// Only try to clean up if the file system is available
try {
- log.close();
+ this.log.close();
LOG.info("On abort, closed hlog");
} catch (IOException e) {
LOG.error("Unable to close log in abort",
@@ -661,7 +671,7 @@
}
join();
- LOG.info("main thread exiting");
+ LOG.info(Thread.currentThread().getName() + " exiting");
}
/*
@@ -674,7 +684,7 @@
* run. On its way out, this server will shut down Server. Leases are sort
* of inbetween. It has an internal thread that while it inherits from
* Chore, it keeps its own internal stop mechanism so needs to be stopped
- * by this hosting server.
+ * by this hosting server. Worker logs the exception and exits.
*/
private void startAllServices() {
String n = Thread.currentThread().getName();
@@ -731,6 +741,7 @@
}
}
+
/** Add to the outbound message buffer */
private void reportOpen(HRegion region) {
synchronized(outboundMsgs) {
@@ -790,58 +801,58 @@
public void run() {
try {
- for(ToDoEntry e = null; !stopRequested.get(); ) {
- try {
- e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
- } catch (InterruptedException ex) {
- // continue
- }
- if(e == null || stopRequested.get()) {
- continue;
- }
- try {
- LOG.info(e.msg.toString());
-
- switch(e.msg.getMsg()) {
-
- case HMsg.MSG_REGION_OPEN:
- // Open a region
- openRegion(e.msg.getRegionInfo());
- break;
+ for(ToDoEntry e = null; !stopRequested.get(); ) {
+ try {
+ e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ex) {
+ // continue
+ }
+ if(e == null || stopRequested.get()) {
+ continue;
+ }
+ try {
+ LOG.info(e.msg.toString());
+ switch(e.msg.getMsg()) {
+
+ case HMsg.MSG_REGION_OPEN:
+ // Open a region
+ openRegion(e.msg.getRegionInfo());
+ break;
- case HMsg.MSG_REGION_CLOSE:
- // Close a region
- closeRegion(e.msg.getRegionInfo(), true);
- break;
+ case HMsg.MSG_REGION_CLOSE:
+ // Close a region
+ closeRegion(e.msg.getRegionInfo(), true);
+ break;
- case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT:
- // Close a region, don't reply
- closeRegion(e.msg.getRegionInfo(), false);
- break;
+ case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT:
+ // Close a region, don't reply
+ closeRegion(e.msg.getRegionInfo(), false);
+ break;
- default:
- throw new AssertionError(
- "Impossible state during msg processing. Instruction: "
- + e.msg.toString());
- }
- } catch (IOException ie) {
- ie = RemoteExceptionHandler.checkIOException(ie);
- if(e.tries < numRetries) {
- LOG.warn(ie);
- e.tries++;
- try {
- toDo.put(e);
- } catch (InterruptedException ex) {
- throw new RuntimeException("Putting into msgQueue was interrupted.", ex);
+ default:
+ throw new AssertionError(
+ "Impossible state during msg processing. Instruction: "
+ + e.msg.toString());
}
- } else {
- LOG.error("unable to process message: " + e.msg.toString(), ie);
- if (!checkFileSystem()) {
- break;
+ } catch (IOException ie) {
+ ie = RemoteExceptionHandler.checkIOException(ie);
+ if(e.tries < numRetries) {
+ LOG.warn(ie);
+ e.tries++;
+ try {
+ toDo.put(e);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException("Putting into msgQueue was " +
+ "interrupted.", ex);
+ }
+ } else {
+ LOG.error("unable to process message: " + e.msg.toString(), ie);
+ if (!checkFileSystem()) {
+ break;
+ }
}
}
}
- }
} catch(Throwable t) {
LOG.fatal("Unhandled exception", t);
} finally {