You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/05/15 06:25:58 UTC
svn commit: r1482676 [4/5] - in /hbase/branches/0.95:
hbase-client/src/main/java/org/apache/hadoop/hbase/
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/
hbase-client/src/main/ja...
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Wed May 15 04:25:57 2013
@@ -23,9 +23,11 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -34,6 +36,8 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
@@ -41,6 +45,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
@@ -51,12 +57,30 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
+import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
+import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
@@ -67,9 +91,17 @@ import org.apache.hadoop.hbase.util.Canc
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKTable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -102,6 +134,9 @@ public class HLogSplitter {
OutputSink outputSink;
EntryBuffers entryBuffers;
+ private Set<String> disablingOrDisabledTables = new HashSet<String>();
+ private ZooKeeperWatcher watcher;
+
// If an exception is thrown by one of the other threads, it will be
// stored here.
protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
@@ -115,10 +150,21 @@ public class HLogSplitter {
// For checking the latest flushed sequence id
protected final LastSequenceId sequenceIdChecker;
+ protected boolean distributedLogReplay;
+
+ // Map encodedRegionName -> lastFlushedSequenceId
+ Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
+
+ // Number of writer threads
+ private final int numWriterThreads;
+
+ // Min batch size when replay WAL edits
+ private final int minBatchSize;
+
/**
* Create a new HLogSplitter using the given {@link Configuration} and the
- * <code>hbase.hlog.splitter.impl</code> property to derived the instance
- * class to use.
+ * <code>hbase.hlog.splitter.impl</code> property to derived the instance class to use.
+ * distributedLogReplay won't be enabled by this constructor.
* <p>
* @param conf
* @param rootDir hbase directory
@@ -161,17 +207,37 @@ public class HLogSplitter {
public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
Path oldLogDir, FileSystem fs, LastSequenceId idChecker) {
+ this(conf, rootDir, srcDir, oldLogDir, fs, idChecker, null);
+ }
+
+ public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
+ Path oldLogDir, FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) {
this.conf = conf;
this.rootDir = rootDir;
this.srcDir = srcDir;
this.oldLogDir = oldLogDir;
this.fs = fs;
this.sequenceIdChecker = idChecker;
+ this.watcher = zkw;
entryBuffers = new EntryBuffers(
conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
128*1024*1024));
- outputSink = new OutputSink();
+
+ this.minBatchSize = conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 512);
+ this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
+ HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
+
+ this.numWriterThreads = conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
+ if (zkw != null && this.distributedLogReplay) {
+ outputSink = new LogReplayOutputSink(numWriterThreads);
+ } else {
+ if (this.distributedLogReplay) {
+ LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
+ }
+ this.distributedLogReplay = false;
+ outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
+ }
}
/**
@@ -260,26 +326,26 @@ public class HLogSplitter {
}
/**
- * Splits the HLog edits in the given list of logfiles (that are a mix of edits
- * on multiple regions) by region and then splits them per region directories,
- * in batches of (hbase.hlog.split.batch.size)
+ * Splits or Replays the HLog edits in the given list of logfiles (that are a mix of edits on
+ * multiple regions) by region and then splits(or replay when distributedLogReplay is true) them
+ * per region directories, in batches.
* <p>
- * This process is split into multiple threads. In the main thread, we loop
- * through the logs to be split. For each log, we:
+ * This process is split into multiple threads. In the main thread, we loop through the logs to be
+ * split. For each log, we:
* <ul>
- * <li> Recover it (take and drop HDFS lease) to ensure no other process can write</li>
- * <li> Read each edit (see {@link #parseHLog}</li>
- * <li> Mark as "processed" or "corrupt" depending on outcome</li>
+ * <li>Recover it (take and drop HDFS lease) to ensure no other process can write</li>
+ * <li>Read each edit (see {@link #parseHLog}</li>
+ * <li>Mark as "processed" or "corrupt" depending on outcome</li>
* </ul>
* <p>
- * Each edit is passed into the EntryBuffers instance, which takes care of
- * memory accounting and splitting the edits by region.
+ * Each edit is passed into the EntryBuffers instance, which takes care of memory accounting and
+ * splitting the edits by region.
* <p>
- * The OutputSink object then manages N other WriterThreads which pull chunks
- * of edits from EntryBuffers and write them to the output region directories.
+ * The OutputSink object then manages N other WriterThreads which pull chunks of edits from
+ * EntryBuffers and write them to either recovered.edits files or replay them to newly assigned
+ * region servers directly
* <p>
- * After the process is complete, the log files are archived to a separate
- * directory.
+ * After the process is complete, the log files are archived to a separate directory.
*/
private List<Path> splitLog(final FileStatus[] logfiles, CountDownLatch latch)
throws IOException {
@@ -368,8 +434,7 @@ public class HLogSplitter {
/**
* Splits a HLog file into region's recovered-edits directory
* <p>
- * If the log file has N regions then N recovered.edits files will be
- * produced.
+ * If the log file has N regions then N recovered.edits files will be produced.
* <p>
* @param rootDir
* @param logfile
@@ -377,22 +442,23 @@ public class HLogSplitter {
* @param conf
* @param reporter
* @param idChecker
+ * @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we
+ * dump out recoved.edits files for regions to replay on.
* @return false if it is interrupted by the progress-able.
* @throws IOException
*/
- static public boolean splitLogFile(Path rootDir, FileStatus logfile,
- FileSystem fs, Configuration conf, CancelableProgressable reporter,
- LastSequenceId idChecker)
+ static public boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
+ Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
+ ZooKeeperWatcher zkw)
throws IOException {
- HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */, fs, idChecker);
+ HLogSplitter s = new HLogSplitter(conf, rootDir, null, null/* oldLogDir */, fs, idChecker, zkw);
return s.splitLogFile(logfile, reporter);
}
/**
* Splits a HLog file into region's recovered-edits directory
* <p>
- * If the log file has N regions then N recovered.edits files will be
- * produced.
+ * If the log file has N regions then N recovered.edits files will be produced.
* <p>
* @param rootDir
* @param logfile
@@ -402,10 +468,10 @@ public class HLogSplitter {
* @return false if it is interrupted by the progress-able.
* @throws IOException
*/
- static public boolean splitLogFile(Path rootDir, FileStatus logfile,
- FileSystem fs, Configuration conf, CancelableProgressable reporter)
+ static public boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
+ Configuration conf, CancelableProgressable reporter)
throws IOException {
- return HLogSplitter.splitLogFile(rootDir, logfile, fs, conf, reporter, null);
+ return HLogSplitter.splitLogFile(rootDir, logfile, fs, conf, reporter, null, null);
}
public boolean splitLogFile(FileStatus logfile,
@@ -427,6 +493,7 @@ public class HLogSplitter {
"into a temporary staging area.");
long logLength = logfile.getLen();
LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
+ LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
status.setStatus("Opening log file");
if (reporter != null && !reporter.progress()) {
progress_failed = true;
@@ -445,25 +512,37 @@ public class HLogSplitter {
LOG.warn("Nothing to split in log file " + logPath);
return true;
}
+ if(watcher != null) {
+ try {
+ disablingOrDisabledTables = ZKTable.getDisabledOrDisablingTables(watcher);
+ } catch (KeeperException e) {
+ throw new IOException("Can't get disabling/disabled tables", e);
+ }
+ }
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
int numOpenedFilesLastCheck = 0;
outputSink.setReporter(reporter);
outputSink.startWriterThreads();
outputSinkStarted = true;
- // Report progress every so many edits and/or files opened (opening a file
- // takes a bit of time).
- Map<byte[], Long> lastFlushedSequenceIds =
- new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
Entry entry;
-
- while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
+ Long lastFlushedSequenceId = -1L;
+ ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logPath);
+ String serverNameStr = (serverName == null) ? "" : serverName.getServerName();
+ while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
byte[] region = entry.getKey().getEncodedRegionName();
- Long lastFlushedSequenceId = -1l;
- if (sequenceIdChecker != null) {
- lastFlushedSequenceId = lastFlushedSequenceIds.get(region);
- if (lastFlushedSequenceId == null) {
+ String key = Bytes.toString(region);
+ lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
+ if (lastFlushedSequenceId == null) {
+ if (this.distributedLogReplay) {
+ lastFlushedSequenceId = SplitLogManager.getLastFlushedSequenceId(this.watcher,
+ serverNameStr, key);
+ } else if (sequenceIdChecker != null) {
lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
- lastFlushedSequenceIds.put(region, lastFlushedSequenceId);
+ }
+ if (lastFlushedSequenceId != null) {
+ lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
+ } else {
+ lastFlushedSequenceId = -1L;
}
}
if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
@@ -472,12 +551,13 @@ public class HLogSplitter {
}
entryBuffers.appendEntry(entry);
editsCount++;
+ int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
// If sufficient edits have passed, check if we should report progress.
if (editsCount % interval == 0
- || (outputSink.logWriters.size() - numOpenedFilesLastCheck) > numOpenedFilesBeforeReporting) {
- numOpenedFilesLastCheck = outputSink.logWriters.size();
- String countsStr = (editsCount - editsSkipped) +
- " edits, skipped " + editsSkipped + " edits.";
+ || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
+ numOpenedFilesLastCheck = this.getNumOpenWriters();
+ String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
+ + " edits, skipped " + editsSkipped + " edits.";
status.setStatus("Split " + countsStr);
if (reporter != null && !reporter.progress()) {
progress_failed = true;
@@ -502,9 +582,8 @@ public class HLogSplitter {
progress_failed = outputSink.finishWritingAndClose() == null;
}
String msg = "Processed " + editsCount + " edits across "
- + outputSink.getOutputCounts().size() + " regions; log file="
- + logPath + " is corrupted = " + isCorrupted + " progress failed = "
- + progress_failed;
+ + outputSink.getNumberOfRecoveredRegions() + " regions; log file=" + logPath
+ + " is corrupted = " + isCorrupted + " progress failed = " + progress_failed;
LOG.info(msg);
status.markComplete(msg);
}
@@ -832,6 +911,18 @@ public class HLogSplitter {
}
/**
+ * Get current open writers
+ * @return
+ */
+ private int getNumOpenWriters() {
+ int result = 0;
+ if (this.outputSink != null) {
+ result += this.outputSink.getNumOpenWriters();
+ }
+ return result;
+ }
+
+ /**
* Class which accumulates edits and separates them into a buffer per region
* while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
* a predefined threshold.
@@ -880,20 +971,23 @@ public class HLogSplitter {
totalBuffered += incrHeap;
while (totalBuffered > maxHeapUsage && thrown.get() == null) {
LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
- dataAvailable.wait(3000);
+ dataAvailable.wait(2000);
}
dataAvailable.notifyAll();
}
checkForErrors();
}
+ /**
+ * @return RegionEntryBuffer a buffer of edits to be written or replayed.
+ */
synchronized RegionEntryBuffer getChunkToWrite() {
- long biggestSize=0;
- byte[] biggestBufferKey=null;
+ long biggestSize = 0;
+ byte[] biggestBufferKey = null;
for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
long size = entry.getValue().heapSize();
- if (size > biggestSize && !currentlyWriting.contains(entry.getKey())) {
+ if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
biggestSize = size;
biggestBufferKey = entry.getKey();
}
@@ -968,9 +1062,11 @@ public class HLogSplitter {
class WriterThread extends Thread {
private volatile boolean shouldStop = false;
+ private OutputSink outputSink = null;
- WriterThread(int i) {
+ WriterThread(OutputSink sink, int i) {
super("WriterThread-" + i);
+ outputSink = sink;
}
public void run() {
@@ -989,9 +1085,11 @@ public class HLogSplitter {
if (buffer == null) {
// No data currently available, wait on some more to show up
synchronized (dataAvailable) {
- if (shouldStop) return;
+ if (shouldStop && !this.outputSink.flush()) {
+ return;
+ }
try {
- dataAvailable.wait(1000);
+ dataAvailable.wait(500);
} catch (InterruptedException ie) {
if (!shouldStop) {
throw new RuntimeException(ie);
@@ -1012,39 +1110,7 @@ public class HLogSplitter {
private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
- List<Entry> entries = buffer.entryBuffer;
- if (entries.isEmpty()) {
- LOG.warn(this.getName() + " got an empty buffer, skipping");
- return;
- }
-
- WriterAndPath wap = null;
-
- long startTime = System.nanoTime();
- try {
- int editsCount = 0;
-
- for (Entry logEntry : entries) {
- if (wap == null) {
- wap = outputSink.getWriterAndPath(logEntry);
- if (wap == null) {
- // getWriterAndPath decided we don't need to write these edits
- // Message was already logged
- return;
- }
- }
- wap.w.append(logEntry);
- outputSink.updateRegionMaximumEditLogSeqNum(logEntry);
- editsCount++;
- }
- // Pass along summary statistics
- wap.incrementEdits(editsCount);
- wap.incrementNanoTime(System.nanoTime() - startTime);
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- LOG.fatal(this.getName() + " Got while writing log entry to log", e);
- throw e;
- }
+ outputSink.append(buffer);
}
void finish() {
@@ -1055,28 +1121,6 @@ public class HLogSplitter {
}
}
- private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir,
- FileSystem fs, Configuration conf)
- throws IOException {
- Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
- if (regionedits == null) {
- return null;
- }
- if (fs.exists(regionedits)) {
- LOG.warn("Found existing old edits file. It could be the "
- + "result of a previous failed split attempt. Deleting "
- + regionedits + ", length="
- + fs.getFileStatus(regionedits).getLen());
- if (!fs.delete(regionedits, false)) {
- LOG.warn("Failed delete of old " + regionedits);
- }
- }
- Writer w = createWriter(fs, regionedits, conf);
- LOG.debug("Creating writer path=" + regionedits + " region="
- + Bytes.toStringBinary(region));
- return (new WriterAndPath(regionedits, w));
- }
-
Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
List<String> components = new ArrayList<String>(10);
do {
@@ -1109,35 +1153,35 @@ public class HLogSplitter {
}
/**
- * Class that manages the output streams from the log splitting process.
+ * The following class is an abstraction class to provide a common interface to support both
+ * existing recovered edits file sink and region server WAL edits replay sink
*/
- class OutputSink {
- private final Map<byte[], WriterAndPath> logWriters = Collections.synchronizedMap(
- new TreeMap<byte[], WriterAndPath>(Bytes.BYTES_COMPARATOR));
- private final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
+ abstract class OutputSink {
+
+ protected Map<byte[], SinkWriter> writers = Collections
+ .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
+
+ protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
.synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
- private final List<WriterThread> writerThreads = Lists.newArrayList();
+
+ protected final List<WriterThread> writerThreads = Lists.newArrayList();
/* Set of regions which we've decided should not output edits */
- private final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(
- new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
+ protected final Set<byte[]> blacklistedRegions = Collections
+ .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
- private boolean closeAndCleanCompleted = false;
+ protected boolean closeAndCleanCompleted = false;
- private boolean logWritersClosed = false;
+ protected boolean writersClosed = false;
- private final int numThreads;
+ protected final int numThreads;
- private CancelableProgressable reporter = null;
+ protected CancelableProgressable reporter = null;
- public OutputSink() {
- // More threads could potentially write faster at the expense
- // of causing more disk seeks as the logs are split.
- // 3. After a certain setting (probably around 3) the
- // process will be bound on the reader in the current
- // implementation anyway.
- numThreads = conf.getInt(
- "hbase.regionserver.hlog.splitlog.writer.threads", 3);
+ protected AtomicLong skippedEdits = new AtomicLong();
+
+ public OutputSink(int numWriters) {
+ numThreads = numWriters;
}
void setReporter(CancelableProgressable reporter) {
@@ -1145,12 +1189,11 @@ public class HLogSplitter {
}
/**
- * Start the threads that will pump data from the entryBuffers
- * to the output files.
+ * Start the threads that will pump data from the entryBuffers to the output files.
*/
synchronized void startWriterThreads() {
for (int i = 0; i < numThreads; i++) {
- WriterThread t = new WriterThread(i);
+ WriterThread t = new WriterThread(this, i);
t.start();
writerThreads.add(t);
}
@@ -1158,66 +1201,147 @@ public class HLogSplitter {
/**
*
- * @return null if failed to report progress
+ * Update region's maximum edit log SeqNum.
+ */
+ void updateRegionMaximumEditLogSeqNum(Entry entry) {
+ synchronized (regionMaximumEditLogSeqNum) {
+ Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
+ .getEncodedRegionName());
+ if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
+ regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
+ .getLogSeqNum());
+ }
+ }
+ }
+
+ Long getRegionMaximumEditLogSeqNum(byte[] region) {
+ return regionMaximumEditLogSeqNum.get(region);
+ }
+
+ /**
+ * @return the number of currently opened writers
+ */
+ int getNumOpenWriters() {
+ return this.writers.size();
+ }
+
+ long getSkippedEdits() {
+ return this.skippedEdits.get();
+ }
+
+ /**
+ * Wait for writer threads to dump all info to the sink
+ * @return true when there is no error
* @throws IOException
*/
- List<Path> finishWritingAndClose() throws IOException {
+ protected boolean finishWriting() throws IOException {
LOG.info("Waiting for split writer threads to finish");
boolean progress_failed = false;
- try {
- for (WriterThread t : writerThreads) {
- t.finish();
- }
- for (WriterThread t : writerThreads) {
- if (!progress_failed && reporter != null && !reporter.progress()) {
- progress_failed = true;
- }
- try {
- t.join();
- } catch (InterruptedException ie) {
- IOException iie = new InterruptedIOException();
- iie.initCause(ie);
- throw iie;
- }
- checkForErrors();
+ for (WriterThread t : writerThreads) {
+ t.finish();
+ }
+ for (WriterThread t : writerThreads) {
+ if (!progress_failed && reporter != null && !reporter.progress()) {
+ progress_failed = true;
}
- LOG.info("Split writers finished");
- if (progress_failed) {
- return null;
+ try {
+ t.join();
+ } catch (InterruptedException ie) {
+ IOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ throw iie;
}
- return closeStreams();
+ checkForErrors();
+ }
+ LOG.info("Split writers finished");
+ return (!progress_failed);
+ }
+
+ abstract List<Path> finishWritingAndClose() throws IOException;
+
+ /**
+ * @return a map from encoded region ID to the number of edits written out for that region.
+ */
+ abstract Map<byte[], Long> getOutputCounts();
+
+ /**
+ * @return number of regions we've recovered
+ */
+ abstract int getNumberOfRecoveredRegions();
+
+ /**
+ * @param entry A WAL Edit Entry
+ * @throws IOException
+ */
+ abstract void append(RegionEntryBuffer buffer) throws IOException;
+
+ /**
+ * WriterThread call this function to help flush internal remaining edits in buffer before close
+ * @return true when underlying sink has something to flush
+ */
+ protected boolean flush() throws IOException {
+ return false;
+ }
+ }
+
+ /**
+ * Class that manages the output streams from the log splitting process.
+ */
+ class LogRecoveredEditsOutputSink extends OutputSink {
+
+ public LogRecoveredEditsOutputSink(int numWriters) {
+ // More threads could potentially write faster at the expense
+ // of causing more disk seeks as the logs are split.
+ // 3. After a certain setting (probably around 3) the
+ // process will be bound on the reader in the current
+ // implementation anyway.
+ super(numWriters);
+ }
+
+ /**
+ * @return null if failed to report progress
+ * @throws IOException
+ */
+ @Override
+ List<Path> finishWritingAndClose() throws IOException {
+ boolean isSuccessful = false;
+ List<Path> result = null;
+ try {
+ isSuccessful = finishWriting();
} finally {
+ result = close();
List<IOException> thrown = closeLogWriters(null);
if (thrown != null && !thrown.isEmpty()) {
throw MultipleIOException.createIOException(thrown);
}
}
+ return (isSuccessful) ? result : null;
}
/**
* Close all of the output streams.
* @return the list of paths written.
*/
- private List<Path> closeStreams() throws IOException {
+ private List<Path> close() throws IOException {
Preconditions.checkState(!closeAndCleanCompleted);
final List<Path> paths = new ArrayList<Path>();
final List<IOException> thrown = Lists.newArrayList();
- ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(
- numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
- private int count = 1;
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "split-log-closeStream-" + count++);
- return t;
- }
- });
+ ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
+ TimeUnit.SECONDS, new ThreadFactory() {
+ private int count = 1;
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "split-log-closeStream-" + count++);
+ return t;
+ }
+ });
CompletionService<Void> completionService = new ExecutorCompletionService<Void>(
closeThreadPool);
- for (final Map.Entry<byte[], WriterAndPath> logWritersEntry : logWriters
- .entrySet()) {
+ for (final Map.Entry<byte[], ? extends SinkWriter> writersEntry : writers.entrySet()) {
completionService.submit(new Callable<Void>() {
public Void call() throws Exception {
- WriterAndPath wap = logWritersEntry.getValue();
+ WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
try {
wap.w.close();
} catch (IOException ioe) {
@@ -1225,15 +1349,25 @@ public class HLogSplitter {
thrown.add(ioe);
return null;
}
- LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten
- + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)");
+ LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + " edits in "
+ + (wap.nanosSpent / 1000 / 1000) + "ms)");
+
+ if (wap.editsWritten == 0) {
+ // just remove the empty recovered.edits file
+ if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
+ LOG.warn("Failed deleting empty " + wap.p);
+ throw new IOException("Failed deleting empty " + wap.p);
+ }
+ return null;
+ }
+
Path dst = getCompletedRecoveredEditsFilePath(wap.p,
- regionMaximumEditLogSeqNum.get(logWritersEntry.getKey()));
+ regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
try {
if (!dst.equals(wap.p) && fs.exists(dst)) {
LOG.warn("Found existing old edits file. It could be the "
- + "result of a previous failed split attempt. Deleting "
- + dst + ", length=" + fs.getFileStatus(dst).getLen());
+ + "result of a previous failed split attempt. Deleting " + dst + ", length="
+ + fs.getFileStatus(dst).getLen());
if (!fs.delete(dst, false)) {
LOG.warn("Failed deleting of old " + dst);
throw new IOException("Failed deleting of old " + dst);
@@ -1244,8 +1378,7 @@ public class HLogSplitter {
// TestHLogSplit#testThreading is an example.
if (fs.exists(wap.p)) {
if (!fs.rename(wap.p, dst)) {
- throw new IOException("Failed renaming " + wap.p + " to "
- + dst);
+ throw new IOException("Failed renaming " + wap.p + " to " + dst);
}
LOG.debug("Rename " + wap.p + " to " + dst);
}
@@ -1262,7 +1395,7 @@ public class HLogSplitter {
boolean progress_failed = false;
try {
- for (int i = 0, n = logWriters.size(); i < n; i++) {
+ for (int i = 0, n = this.writers.size(); i < n; i++) {
Future<Void> future = completionService.take();
future.get();
if (!progress_failed && reporter != null && !reporter.progress()) {
@@ -1282,7 +1415,7 @@ public class HLogSplitter {
if (!thrown.isEmpty()) {
throw MultipleIOException.createIOException(thrown);
}
- logWritersClosed = true;
+ writersClosed = true;
closeAndCleanCompleted = true;
if (progress_failed) {
return null;
@@ -1290,57 +1423,58 @@ public class HLogSplitter {
return paths;
}
- private List<IOException> closeLogWriters(List<IOException> thrown)
- throws IOException {
- if (!logWritersClosed) {
- if (thrown == null) {
- thrown = Lists.newArrayList();
- }
- try {
- for (WriterThread t : writerThreads) {
- while (t.isAlive()) {
- t.shouldStop = true;
- t.interrupt();
- try {
- t.join(10);
- } catch (InterruptedException e) {
- IOException iie = new InterruptedIOException();
- iie.initCause(e);
- throw iie;
- }
+ private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
+ if (writersClosed) {
+ return thrown;
+ }
+
+ if (thrown == null) {
+ thrown = Lists.newArrayList();
+ }
+ try {
+ for (WriterThread t : writerThreads) {
+ while (t.isAlive()) {
+ t.shouldStop = true;
+ t.interrupt();
+ try {
+ t.join(10);
+ } catch (InterruptedException e) {
+ IOException iie = new InterruptedIOException();
+ iie.initCause(e);
+ throw iie;
}
}
- } finally {
- synchronized (logWriters) {
- for (WriterAndPath wap : logWriters.values()) {
- try {
- wap.w.close();
- } catch (IOException ioe) {
- LOG.error("Couldn't close log at " + wap.p, ioe);
- thrown.add(ioe);
- continue;
- }
- LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten
- + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)");
+ }
+ } finally {
+ synchronized (writers) {
+ WriterAndPath wap = null;
+ for (SinkWriter tmpWAP : writers.values()) {
+ try {
+ wap = (WriterAndPath) tmpWAP;
+ wap.w.close();
+ } catch (IOException ioe) {
+ LOG.error("Couldn't close log at " + wap.p, ioe);
+ thrown.add(ioe);
+ continue;
}
+ LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + " edits in "
+ + (wap.nanosSpent / 1000 / 1000) + "ms)");
}
- logWritersClosed = true;
}
+ writersClosed = true;
}
+
return thrown;
}
/**
- * Get a writer and path for a log starting at the given entry.
- *
- * This function is threadsafe so long as multiple threads are always
- * acting on different regions.
- *
+ * Get a writer and path for a log starting at the given entry. This function is threadsafe so
+ * long as multiple threads are always acting on different regions.
* @return null if this region shouldn't output any logs
*/
- WriterAndPath getWriterAndPath(Entry entry) throws IOException {
+ private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
byte region[] = entry.getKey().getEncodedRegionName();
- WriterAndPath ret = logWriters.get(region);
+ WriterAndPath ret = (WriterAndPath) writers.get(region);
if (ret != null) {
return ret;
}
@@ -1354,75 +1488,624 @@ public class HLogSplitter {
blacklistedRegions.add(region);
return null;
}
- logWriters.put(region, ret);
+ writers.put(region, ret);
return ret;
}
- /**
- * Update region's maximum edit log SeqNum.
- */
- void updateRegionMaximumEditLogSeqNum(Entry entry) {
- synchronized (regionMaximumEditLogSeqNum) {
- Long currentMaxSeqNum=regionMaximumEditLogSeqNum.get(entry.getKey().getEncodedRegionName());
- if (currentMaxSeqNum == null
- || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
- regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(),
- entry.getKey().getLogSeqNum());
+ private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs,
+ Configuration conf) throws IOException {
+ Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
+ if (regionedits == null) {
+ return null;
+ }
+ if (fs.exists(regionedits)) {
+ LOG.warn("Found old edits file. It could be the "
+ + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
+ + fs.getFileStatus(regionedits).getLen());
+ if (!fs.delete(regionedits, false)) {
+ LOG.warn("Failed delete of old " + regionedits);
}
}
-
+ Writer w = createWriter(fs, regionedits, conf);
+ LOG.debug("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
+ return (new WriterAndPath(regionedits, w));
}
- Long getRegionMaximumEditLogSeqNum(byte[] region) {
- return regionMaximumEditLogSeqNum.get(region);
+ void append(RegionEntryBuffer buffer) throws IOException {
+ List<Entry> entries = buffer.entryBuffer;
+ if (entries.isEmpty()) {
+ LOG.warn("got an empty buffer, skipping");
+ return;
+ }
+
+ WriterAndPath wap = null;
+
+ long startTime = System.nanoTime();
+ try {
+ int editsCount = 0;
+
+ for (Entry logEntry : entries) {
+ if (wap == null) {
+ wap = getWriterAndPath(logEntry);
+ if (wap == null) {
+ // getWriterAndPath decided we don't need to write these edits
+ return;
+ }
+ }
+ wap.w.append(logEntry);
+ this.updateRegionMaximumEditLogSeqNum(logEntry);
+ editsCount++;
+ }
+ // Pass along summary statistics
+ wap.incrementEdits(editsCount);
+ wap.incrementNanoTime(System.nanoTime() - startTime);
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ LOG.fatal(" Got while writing log entry to log", e);
+ throw e;
+ }
}
/**
- * @return a map from encoded region ID to the number of edits written out
- * for that region.
+ * @return a map from encoded region ID to the number of edits written out for that region.
*/
- private Map<byte[], Long> getOutputCounts() {
- TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(
- Bytes.BYTES_COMPARATOR);
- synchronized (logWriters) {
- for (Map.Entry<byte[], WriterAndPath> entry : logWriters.entrySet()) {
+ Map<byte[], Long> getOutputCounts() {
+ TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ synchronized (writers) {
+ for (Map.Entry<byte[], ? extends SinkWriter> entry : writers.entrySet()) {
ret.put(entry.getKey(), entry.getValue().editsWritten);
}
}
return ret;
}
+
+ @Override
+ int getNumberOfRecoveredRegions() {
+ return writers.size();
+ }
}
/**
- * Private data structure that wraps a Writer and its Path,
- * also collecting statistics about the data written to this
- * output.
+ * Class wraps the actual writer which writes data out and related statistics
*/
- private final static class WriterAndPath {
- final Path p;
- final Writer w;
-
+ private abstract static class SinkWriter {
/* Count of edits written to this path */
long editsWritten = 0;
/* Number of nanos spent writing to this log */
long nanosSpent = 0;
+ void incrementEdits(int edits) {
+ editsWritten += edits;
+ }
+
+ void incrementNanoTime(long nanos) {
+ nanosSpent += nanos;
+ }
+ }
+
+ /**
+ * Private data structure that wraps a Writer and its Path, also collecting statistics about the
+ * data written to this output.
+ */
+ private final static class WriterAndPath extends SinkWriter {
+ final Path p;
+ final Writer w;
+
WriterAndPath(final Path p, final Writer w) {
this.p = p;
this.w = w;
}
+ }
- void incrementEdits(int edits) {
- editsWritten += edits;
+ /**
+ * Class that manages to replay edits from WAL files directly to assigned fail over region servers
+ */
+ class LogReplayOutputSink extends OutputSink {
+ private static final double BUFFER_THRESHOLD = 0.35;
+ private static final String KEY_DELIMITER = "#";
+
+ private long waitRegionOnlineTimeOut;
+ private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
+ private final Map<String, RegionServerWriter> writers =
+ new ConcurrentHashMap<String, RegionServerWriter>();
+ // online encoded region name map
+ private final Set<String> onlineRegions = Collections.synchronizedSet(new HashSet<String>());
+
+ private Map<byte[], HConnection> tableNameToHConnectionMap = Collections
+ .synchronizedMap(new TreeMap<byte[], HConnection>(Bytes.BYTES_COMPARATOR));
+ /**
+ * Map key -> value layout
+ * <servername>:<table name> -> Queue<Row>
+ */
+ private Map<String, List<Pair<HRegionLocation, Row>>> serverToBufferQueueMap =
+ new ConcurrentHashMap<String, List<Pair<HRegionLocation, Row>>>();
+ private List<Throwable> thrown = new ArrayList<Throwable>();
+
+ // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
+ // table. It's a limitation of distributedLogReplay. Because log replay needs a region is
+ // assigned and online before it can replay wal edits while regions of disabling/disabled table
+ // won't be assigned by AM. We can retire this code after HBASE-8234.
+ private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
+ private boolean hasEditsInDisablingOrDisabledTables = false;
+
+ public LogReplayOutputSink(int numWriters) {
+ super(numWriters);
+
+ this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout",
+ SplitLogManager.DEFAULT_TIMEOUT);
+ this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
+ this.logRecoveredEditsOutputSink.setReporter(reporter);
}
- void incrementNanoTime(long nanos) {
- nanosSpent += nanos;
+ void append(RegionEntryBuffer buffer) throws IOException {
+ List<Entry> entries = buffer.entryBuffer;
+ if (entries.isEmpty()) {
+ LOG.warn("got an empty buffer, skipping");
+ return;
+ }
+
+ // check if current region in a disabling or disabled table
+ if (disablingOrDisabledTables.contains(Bytes.toString(buffer.tableName))) {
+ // need fall back to old way
+ logRecoveredEditsOutputSink.append(buffer);
+ hasEditsInDisablingOrDisabledTables = true;
+ // store regions we have recovered so far
+ addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
+ return;
+ }
+
+ // group entries by region servers
+ groupEditsByServer(entries);
+
+ // process workitems
+ String maxLocKey = null;
+ int maxSize = 0;
+ List<Pair<HRegionLocation, Row>> maxQueue = null;
+ synchronized (this.serverToBufferQueueMap) {
+ for (String key : this.serverToBufferQueueMap.keySet()) {
+ List<Pair<HRegionLocation, Row>> curQueue = this.serverToBufferQueueMap.get(key);
+ if (curQueue.size() > maxSize) {
+ maxSize = curQueue.size();
+ maxQueue = curQueue;
+ maxLocKey = key;
+ }
+ }
+ if (maxSize < minBatchSize
+ && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
+ // buffer more to process
+ return;
+ } else if (maxSize > 0) {
+ this.serverToBufferQueueMap.remove(maxLocKey);
+ }
+ }
+
+ if (maxSize > 0) {
+ processWorkItems(maxLocKey, maxQueue);
+ }
+ }
+
+ private void addToRecoveredRegions(String encodedRegionName) {
+ if (!recoveredRegions.contains(encodedRegionName)) {
+ recoveredRegions.add(encodedRegionName);
+ }
+ }
+
+ /**
+ * Helper function to group WALEntries to individual region servers
+ * @throws IOException
+ */
+ private void groupEditsByServer(List<Entry> entries) throws IOException {
+ Set<byte[]> nonExistentTables = null;
+ Long cachedLastFlushedSequenceId = -1l;
+ for (HLog.Entry entry : entries) {
+ WALEdit edit = entry.getEdit();
+ byte[] table = entry.getKey().getTablename();
+ String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
+ // skip edits of non-existent tables
+ if (nonExistentTables != null && nonExistentTables.contains(table)) {
+ this.skippedEdits.incrementAndGet();
+ continue;
+ }
+ boolean needSkip = false;
+ Put put = null;
+ Delete del = null;
+ KeyValue lastKV = null;
+ HRegionLocation loc = null;
+ Row preRow = null;
+ HRegionLocation preLoc = null;
+ Row lastAddedRow = null; // it is not really needed here just be conservative
+ String preKey = null;
+ List<KeyValue> kvs = edit.getKeyValues();
+ HConnection hconn = this.getConnectionByTableName(table);
+
+ for (KeyValue kv : kvs) {
+ // filtering HLog meta entries
+ // We don't handle HBASE-2231 because we may or may not replay a compaction event.
+ // Details at https://issues.apache.org/jira/browse/HBASE-2231?focusedCommentId=13647143&
+ // page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13647143
+ if (kv.matchingFamily(WALEdit.METAFAMILY)) continue;
+
+ if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
+ if (preRow != null) {
+ synchronized (serverToBufferQueueMap) {
+ List<Pair<HRegionLocation, Row>> queue = serverToBufferQueueMap.get(preKey);
+ if (queue == null) {
+ queue = Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Row>>());
+ serverToBufferQueueMap.put(preKey, queue);
+ }
+ queue.add(new Pair<HRegionLocation, Row>(preLoc, preRow));
+ lastAddedRow = preRow;
+ }
+ // store regions we have recovered so far
+ addToRecoveredRegions(preLoc.getRegionInfo().getEncodedName());
+ }
+
+ try {
+ loc = locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow());
+ } catch (TableNotFoundException ex) {
+ // table has been deleted so skip edits of the table
+ LOG.info("Table " + Bytes.toString(table)
+ + " doesn't exist. Skip log replay for region " + encodeRegionNameStr);
+ lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
+ if (nonExistentTables == null) {
+ nonExistentTables = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ }
+ nonExistentTables.add(table);
+ this.skippedEdits.incrementAndGet();
+ needSkip = true;
+ break;
+ }
+ cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo()
+ .getEncodedName());
+ if (cachedLastFlushedSequenceId != null
+ && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
+ // skip the whole HLog entry
+ this.skippedEdits.incrementAndGet();
+ needSkip = true;
+ break;
+ }
+
+ if (kv.isDelete()) {
+ del = new Delete(kv.getRow());
+ del.setClusterId(entry.getKey().getClusterId());
+ preRow = del;
+ } else {
+ put = new Put(kv.getRow());
+ put.setClusterId(entry.getKey().getClusterId());
+ preRow = put;
+ }
+ preKey = loc.getHostnamePort() + KEY_DELIMITER + Bytes.toString(table);
+ preLoc = loc;
+ }
+ if (kv.isDelete()) {
+ del.addDeleteMarker(kv);
+ } else {
+ put.add(kv);
+ }
+ lastKV = kv;
+ }
+
+ // skip the edit
+ if(needSkip) continue;
+
+ // add the last row
+ if (preRow != null && lastAddedRow != preRow) {
+ synchronized (serverToBufferQueueMap) {
+ List<Pair<HRegionLocation, Row>> queue = serverToBufferQueueMap.get(preKey);
+ if (queue == null) {
+ queue = Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Row>>());
+ serverToBufferQueueMap.put(preKey, queue);
+ }
+ queue.add(new Pair<HRegionLocation, Row>(preLoc, preRow));
+ }
+ // store regions we have recovered so far
+ addToRecoveredRegions(preLoc.getRegionInfo().getEncodedName());
+ }
+ }
+ }
+
+ /**
+ * Locate destination region based on table name & row. This function also makes sure the
+ * destination region is online for replay.
+ * @throws IOException
+ */
+ private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
+ byte[] table, byte[] row) throws IOException {
+ HRegionLocation loc = hconn.getRegionLocation(table, row, false);
+ if (loc == null) {
+ throw new IOException("Can't locate location for row:" + Bytes.toString(row)
+ + " of table:" + Bytes.toString(table));
+ }
+ if (onlineRegions.contains(loc.getRegionInfo().getEncodedName())) {
+ return loc;
+ }
+
+ Long lastFlushedSequenceId = -1l;
+ loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut);
+ Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo()
+ .getEncodedName());
+
+ onlineRegions.add(loc.getRegionInfo().getEncodedName());
+ // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
+ // update the value for the region
+ lastFlushedSequenceId = SplitLogManager.getLastFlushedSequenceId(watcher, loc
+ .getServerName().getServerName(), loc.getRegionInfo().getEncodedName());
+ if (cachedLastFlushedSequenceId == null
+ || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
+ lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
+ } else if (loc.getRegionInfo().isRecovering() == false) {
+ // region isn't in recovering at all because WAL file may contain a region that has
+ // been moved to somewhere before hosting RS fails
+ lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
+ LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
+ + " because it's not in recovering.");
+ }
+
+ return loc;
+ }
+
+ private void processWorkItems(String key, List<Pair<HRegionLocation, Row>> actions)
+ throws IOException {
+ RegionServerWriter rsw = null;
+
+ long startTime = System.nanoTime();
+ try {
+ rsw = getRegionServerWriter(key);
+ rsw.sink.replayEntries(actions);
+
+ // Pass along summary statistics
+ rsw.incrementEdits(actions.size());
+ rsw.incrementNanoTime(System.nanoTime() - startTime);
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ LOG.fatal(" Got while writing log entry to log", e);
+ throw e;
+ }
+ }
+
+ /**
+ * Wait until region is online on the destination region server
+ * @param loc
+ * @param row
+ * @param timeout How long to wait
+ * @return True when region is online on the destination region server
+ * @throws InterruptedException
+ */
+ private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
+ final long timeout)
+ throws IOException {
+ final long endTime = EnvironmentEdgeManager.currentTimeMillis() + timeout;
+ final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+ HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ boolean reloadLocation = false;
+ byte[] tableName = loc.getRegionInfo().getTableName();
+ int tries = 0;
+ Throwable cause = null;
+ while (endTime > EnvironmentEdgeManager.currentTimeMillis()) {
+ try {
+ // Try and get regioninfo from the hosting server.
+ HConnection hconn = getConnectionByTableName(tableName);
+ if(reloadLocation) {
+ loc = hconn.getRegionLocation(tableName, row, true);
+ }
+ BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
+ HRegionInfo region = loc.getRegionInfo();
+ if((region =ProtobufUtil.getRegionInfo(remoteSvr, region.getRegionName())) != null) {
+ loc.getRegionInfo().setRecovering(region.isRecovering());
+ return loc;
+ }
+ } catch (IOException e) {
+ cause = e.getCause();
+ if(!(cause instanceof RegionOpeningException)) {
+ reloadLocation = true;
+ }
+ }
+ long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
+ try {
+ Thread.sleep(expectedSleep);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted when waiting regon " +
+ loc.getRegionInfo().getEncodedName() + " online.", e);
+ }
+ tries++;
+ }
+
+ throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
+ " online for " + timeout + " milliseconds.", cause);
+ }
+
+ @Override
+ protected boolean flush() throws IOException {
+ String curLoc = null;
+ int curSize = 0;
+ List<Pair<HRegionLocation, Row>> curQueue = null;
+ synchronized (this.serverToBufferQueueMap) {
+ for (String locationKey : this.serverToBufferQueueMap.keySet()) {
+ curQueue = this.serverToBufferQueueMap.get(locationKey);
+ if (!curQueue.isEmpty()) {
+ curSize = curQueue.size();
+ curLoc = locationKey;
+ break;
+ }
+ }
+ if (curSize > 0) {
+ this.serverToBufferQueueMap.remove(curLoc);
+ }
+ }
+
+ if (curSize > 0) {
+ this.processWorkItems(curLoc, curQueue);
+ dataAvailable.notifyAll();
+ return true;
+ }
+ return false;
+ }
+
+ void addWriterError(Throwable t) {
+ thrown.add(t);
+ }
+
+ @Override
+ List<Path> finishWritingAndClose() throws IOException {
+ List<Path> result = new ArrayList<Path>();
+ try {
+ if (!finishWriting()) {
+ return null;
+ }
+ if (hasEditsInDisablingOrDisabledTables) {
+ result = logRecoveredEditsOutputSink.finishWritingAndClose();
+ }
+ // returns an empty array in order to keep interface same as old way
+ return result;
+ } finally {
+ List<IOException> thrown = closeRegionServerWriters();
+ if (thrown != null && !thrown.isEmpty()) {
+ throw MultipleIOException.createIOException(thrown);
+ }
+ }
+ }
+
+ @Override
+ int getNumOpenWriters() {
+ return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
+ }
+
+ private List<IOException> closeRegionServerWriters() throws IOException {
+ List<IOException> result = null;
+ if (!writersClosed) {
+ result = Lists.newArrayList();
+ try {
+ for (WriterThread t : writerThreads) {
+ while (t.isAlive()) {
+ t.shouldStop = true;
+ t.interrupt();
+ try {
+ t.join(10);
+ } catch (InterruptedException e) {
+ IOException iie = new InterruptedIOException();
+ iie.initCause(e);
+ throw iie;
+ }
+ }
+ }
+ } finally {
+ synchronized (writers) {
+ for (String locationKey : writers.keySet()) {
+ RegionServerWriter tmpW = writers.get(locationKey);
+ try {
+ tmpW.close();
+ } catch (IOException ioe) {
+ LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
+ result.add(ioe);
+ }
+ }
+ }
+
+ // close connections
+ synchronized (this.tableNameToHConnectionMap) {
+ for (byte[] tableName : this.tableNameToHConnectionMap.keySet()) {
+ HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
+ try {
+ hconn.close();
+ } catch (IOException ioe) {
+ result.add(ioe);
+ }
+ }
+ }
+ writersClosed = true;
+ }
+ }
+ return result;
+ }
+
+ Map<byte[], Long> getOutputCounts() {
+ TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ synchronized (writers) {
+ for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
+ ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ int getNumberOfRecoveredRegions() {
+ return this.recoveredRegions.size();
+ }
+
+ /**
+ * Get a writer and path for a log starting at the given entry. This function is threadsafe so
+ * long as multiple threads are always acting on different regions.
+ * @return null if this region shouldn't output any logs
+ */
+ private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
+ RegionServerWriter ret = writers.get(loc);
+ if (ret != null) {
+ return ret;
+ }
+
+ String tableName = getTableFromLocationStr(loc);
+ if(tableName.isEmpty()){
+ LOG.warn("Invalid location string:" + loc + " found.");
+ }
+
+ HConnection hconn = getConnectionByTableName(Bytes.toBytes(tableName));
+ synchronized (writers) {
+ ret = writers.get(loc);
+ if (ret == null) {
+ ret = new RegionServerWriter(conf, Bytes.toBytes(tableName), hconn);
+ writers.put(loc, ret);
+ }
+ }
+ return ret;
+ }
+
+ private HConnection getConnectionByTableName(final byte[] tableName) throws IOException {
+ HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
+ if (hconn == null) {
+ synchronized (this.tableNameToHConnectionMap) {
+ hconn = this.tableNameToHConnectionMap.get(tableName);
+ if (hconn == null) {
+ hconn = HConnectionManager.createConnection(conf);
+ this.tableNameToHConnectionMap.put(tableName, hconn);
+ }
+ }
+ }
+ return hconn;
+ }
+
+ private String getTableFromLocationStr(String loc) {
+ /**
+ * location key is in format <server name:port>#<table name>
+ */
+ String[] splits = loc.split(KEY_DELIMITER);
+ if (splits.length != 2) {
+ return "";
+ }
+ return splits[1];
+ }
+ }
+
+ /**
+ * Private data structure that wraps a receiving RS and collecting statistics about the data
+ * written to this newly assigned RS.
+ */
+ private final static class RegionServerWriter extends SinkWriter {
+ final WALEditsReplaySink sink;
+
+ RegionServerWriter(final Configuration conf, final byte[] tableName, final HConnection conn)
+ throws IOException {
+ this.sink = new WALEditsReplaySink(conf, tableName, conn);
+ }
+
+ void close() throws IOException {
}
}
static class CorruptedLogFileException extends Exception {
private static final long serialVersionUID = 1L;
+
CorruptedLogFileException(String s) {
super(s);
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java Wed May 15 04:25:57 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ServerNam
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Bytes;
public class HLogUtil {
static final Log LOG = LogFactory.getLog(HLogUtil.class);
@@ -168,6 +169,37 @@ public class HLogUtil {
}
/**
+ * This function returns region server name from a log file name which is in either format:
+ * hdfs://<name node>/hbase/.logs/<server name>-splitting/... or hdfs://<name
+ * node>/hbase/.logs/<server name>/...
+ * @param logFile
+ * @return null if the passed in logFile isn't a valid HLog file path
+ */
+ public static ServerName getServerNameFromHLogDirectoryName(Path logFile) {
+ Path logDir = logFile.getParent();
+ String logDirName = logDir.getName();
+ if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
+ logDir = logFile;
+ logDirName = logDir.getName();
+ }
+ ServerName serverName = null;
+ if (logDirName.endsWith(HLog.SPLITTING_EXT)) {
+ logDirName = logDirName.substring(0, logDirName.length() - HLog.SPLITTING_EXT.length());
+ }
+ try {
+ serverName = ServerName.parseServerName(logDirName);
+ } catch (IllegalArgumentException ex) {
+ serverName = null;
+ LOG.warn("Invalid log file path=" + logFile, ex);
+ }
+ if (serverName != null && serverName.getStartcode() < 0) {
+ LOG.warn("Invalid log file path=" + logFile);
+ return null;
+ }
+ return serverName;
+ }
+
+ /**
* Returns sorted set of edit files made by wal-log splitter, excluding files
* with '.temp' suffix.
*
Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALEditsReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALEditsReplay.java?rev=1482676&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALEditsReplay.java (added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALEditsReplay.java Wed May 15 04:25:57 2013
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Class used to push numbers about WAL edits replay into the metrics subsystem. This will take a
+ * single function call and turn it into multiple manipulations of the hadoop metrics system.
+ */
+@InterfaceAudience.Private
+public class MetricsWALEditsReplay {
+ static final Log LOG = LogFactory.getLog(MetricsWALEditsReplay.class);
+
+ private final MetricsEditsReplaySource source;
+
+ public MetricsWALEditsReplay() {
+ source = CompatibilitySingletonFactory.getInstance(MetricsEditsReplaySource.class);
+ }
+
+ /**
+ * Add the time a replay command took
+ */
+ void updateReplayTime(long time) {
+ source.updateReplayTime(time);
+ }
+
+ /**
+ * Add the batch size of each replay
+ */
+ void updateReplayBatchSize(long size) {
+ source.updateReplayDataSize(size);
+ }
+
+ /**
+ * Add the payload data size of each replay
+ */
+ void updateReplayDataSize(long size) {
+ }
+}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java Wed May 15 04:25:57 2013
@@ -83,6 +83,7 @@ public class WALEdit implements Writable
static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH");
static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
private final int VERSION_2 = -1;
+ private final boolean isReplay;
private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
@@ -93,6 +94,11 @@ public class WALEdit implements Writable
private CompressionContext compressionContext;
public WALEdit() {
+ this(false);
+ }
+
+ public WALEdit(boolean isReplay) {
+ this.isReplay = isReplay;
}
/**
@@ -103,6 +109,14 @@ public class WALEdit implements Writable
return Bytes.equals(METAFAMILY, f);
}
+ /**
+ * @return True when current WALEdit is created by log replay. Replication skips WALEdits from
+ * replay.
+ */
+ public boolean isReplay() {
+ return this.isReplay;
+ }
+
public void setCompressionContext(final CompressionContext compressionContext) {
this.compressionContext = compressionContext;
}
Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java?rev=1482676&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java (added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java Wed May 15 04:25:57 2013
@@ -0,0 +1,233 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.Action;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is responsible for replaying the edits coming from a failed region server.
+ * <p/>
+ * This class uses the native HBase client in order to replay WAL entries.
+ * <p/>
+ */
+@InterfaceAudience.Private
+public class WALEditsReplaySink {
+
+ private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
+
+ private final Configuration conf;
+ private final HConnection conn;
+ private final byte[] tableName;
+ private final MetricsWALEditsReplay metrics;
+ private final AtomicLong totalReplayedEdits = new AtomicLong();
+ private final boolean skipErrors;
+
+ /**
+ * Create a sink for WAL log entries replay
+ * @param conf
+ * @param tableName
+ * @param conn
+ * @throws IOException
+ */
+ public WALEditsReplaySink(Configuration conf, byte[] tableName, HConnection conn)
+ throws IOException {
+ this.conf = conf;
+ this.metrics = new MetricsWALEditsReplay();
+ this.conn = conn;
+ this.tableName = tableName;
+ this.skipErrors = conf.getBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
+ HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS);
+ }
+
+ /**
+ * Replay an array of actions of the same region directly into the newly assigned Region Server
+ * @param actions
+ * @throws IOException
+ */
+ public void replayEntries(List<Pair<HRegionLocation, Row>> actions) throws IOException {
+ if (actions.size() == 0) {
+ return;
+ }
+
+ int batchSize = actions.size();
+ int dataSize = 0;
+ Map<HRegionInfo, List<Action<Row>>> actionsByRegion =
+ new HashMap<HRegionInfo, List<Action<Row>>>();
+ HRegionLocation loc = null;
+ Row row = null;
+ List<Action<Row>> regionActions = null;
+ // Build the action list.
+ for (int i = 0; i < batchSize; i++) {
+ loc = actions.get(i).getFirst();
+ row = actions.get(i).getSecond();
+ if (actionsByRegion.containsKey(loc.getRegionInfo())) {
+ regionActions = actionsByRegion.get(loc.getRegionInfo());
+ } else {
+ regionActions = new ArrayList<Action<Row>>();
+ actionsByRegion.put(loc.getRegionInfo(), regionActions);
+ }
+ Action<Row> action = new Action<Row>(row, i);
+ regionActions.add(action);
+ dataSize += row.getRow().length;
+ }
+
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+
+ // replaying edits by region
+ for (HRegionInfo curRegion : actionsByRegion.keySet()) {
+ replayEdits(loc, curRegion, actionsByRegion.get(curRegion));
+ }
+
+ long endTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+ LOG.debug("number of rows:" + actions.size() + " are sent by batch! spent " + endTime
+ + "(ms)!");
+
+ metrics.updateReplayTime(endTime);
+ metrics.updateReplayBatchSize(batchSize);
+ metrics.updateReplayDataSize(dataSize);
+
+ this.totalReplayedEdits.addAndGet(batchSize);
+ }
+
+ /**
+ * Get a string representation of this sink's metrics
+ * @return string with the total replayed edits count
+ */
+ public String getStats() {
+ return this.totalReplayedEdits.get() == 0 ? "" : "Sink: total replayed edits: "
+ + this.totalReplayedEdits;
+ }
+
+ private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
+ final List<Action<Row>> actions) throws IOException {
+ try {
+ ReplayServerCallable<MultiResponse> callable = new ReplayServerCallable<MultiResponse>(
+ this.conn, this.tableName, regionLoc, regionInfo, actions);
+ callable.withRetries();
+ } catch (IOException ie) {
+ if (skipErrors) {
+ LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ + "=true so continuing replayEdits with error:" + ie.getMessage());
+ } else {
+ throw ie;
+ }
+ }
+ }
+
+ /**
+ * Callable that handles the <code>replay</code> method call going against a single regionserver
+ * @param <R>
+ */
+ class ReplayServerCallable<R> extends ServerCallable<MultiResponse> {
+ private HRegionInfo regionInfo;
+ private List<Action<Row>> actions;
+
+ private Map<HRegionLocation, Map<HRegionInfo, List<Action<Row>>>> retryActions = null;
+
+ ReplayServerCallable(final HConnection connection, final byte [] tableName,
+ final HRegionLocation regionLoc, final HRegionInfo regionInfo,
+ final List<Action<Row>> actions) {
+ super(connection, tableName, null);
+ this.actions = actions;
+ this.regionInfo = regionInfo;
+ this.location = regionLoc;
+ }
+
+ @Override
+ public MultiResponse call() throws IOException {
+ try {
+ replayToServer(this.regionInfo, this.actions);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ return null;
+ }
+
+ private void replayToServer(HRegionInfo regionInfo, List<Action<Row>> actions)
+ throws IOException, ServiceException {
+ AdminService.BlockingInterface remoteSvr = connection.getAdmin(location.getServerName());
+ MultiRequest request = RequestConverter.buildMultiRequest(regionInfo.getRegionName(),
+ actions);
+ MultiResponse protoResults = remoteSvr.replay(null, request);
+ // check if it's a partial success
+ List<ActionResult> resultList = protoResults.getResultList();
+ for (int i = 0, n = resultList.size(); i < n; i++) {
+ ActionResult result = resultList.get(i);
+ if (result.hasException()) {
+ Throwable t = ProtobufUtil.toException(result.getException());
+ if (!skipErrors) {
+ IOException ie = new IOException();
+ ie.initCause(t);
+ // retry
+ throw ie;
+ } else {
+ LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ + "=true so continuing replayToServer with error:" + t.getMessage());
+ return;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void prepare(boolean reload) throws IOException {
+ if (!reload) return;
+
+ // relocate regions in case we have a new dead server or network hiccup
+ // if not due to connection issue, the following code should run fast because it uses
+ // cached location
+ for (Action<Row> action : actions) {
+ // use first row to relocate region because all actions are for one region
+ this.location = this.connection.locateRegion(tableName, action.getAction().getRow());
+ break;
+ }
+ }
+ }
+
+}
Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java?rev=1482676&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java (added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java Wed May 15 04:25:57 2013
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Watcher used to be notified of the recovering region coming out of recovering state
+ */
+@InterfaceAudience.Private
+public class RecoveringRegionWatcher extends ZooKeeperListener {
+ private static final Log LOG = LogFactory.getLog(RecoveringRegionWatcher.class);
+
+ private HRegionServer server;
+
+ /**
+ * Construct a ZooKeeper event listener.
+ */
+ public RecoveringRegionWatcher(ZooKeeperWatcher watcher, HRegionServer server) {
+ super(watcher);
+ watcher.registerListener(this);
+ this.server = server;
+ }
+
+ /**
+ * Called when a node has been deleted
+ * @param path full path of the deleted node
+ */
+ public void nodeDeleted(String path) {
+ if (this.server.isStopped() || this.server.isStopping()) {
+ return;
+ }
+
+ String parentPath = path.substring(0, path.lastIndexOf('/'));
+ if (!this.watcher.recoveringRegionsZNode.equalsIgnoreCase(parentPath)) {
+ return;
+ }
+
+ String regionName = path.substring(parentPath.length() + 1);
+ HRegion region = this.server.getRecoveringRegions().remove(regionName);
+ if (region != null) {
+ region.setRecovering(false);
+ }
+
+ LOG.info(path + " znode deleted. Region: " + regionName + " completes recovery.");
+ }
+
+ @Override
+ public void nodeDataChanged(String path) {
+ registerWatcher(path);
+ }
+
+ @Override
+ public void nodeChildrenChanged(String path) {
+ registerWatcher(path);
+ }
+
+ /**
+ * Reinstall watcher because watcher only fire once though we're only interested in nodeDeleted
+ * event we need to register the watcher in case other event happens
+ */
+ private void registerWatcher(String path) {
+ String parentPath = path.substring(0, path.lastIndexOf('/'));
+ if (!this.watcher.recoveringRegionsZNode.equalsIgnoreCase(parentPath)) {
+ return;
+ }
+
+ try {
+ ZKUtil.getDataAndWatch(watcher, path);
+ } catch (KeeperException e) {
+ LOG.warn("Can't register watcher on znode " + path, e);
+ }
+ }
+}
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java Wed May 15 04:25:57 2013
@@ -217,6 +217,7 @@ public class TestIOFencing {
public void doTest(Class<?> regionClass) throws Exception {
Configuration c = TEST_UTIL.getConfiguration();
+ c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
// Insert our custom region
c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
c.setBoolean("dfs.support.append", true);
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java Wed May 15 04:25:57 2013
@@ -58,7 +58,7 @@ public class TestRegionServerCoprocessor
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); // Let's fail fast.
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, BuggyRegionObserver.class.getName());
conf.set("hbase.coprocessor.abortonerror", "true");
- TEST_UTIL.startMiniCluster(2);
+ TEST_UTIL.startMiniCluster(3);
}
@AfterClass
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java Wed May 15 04:25:57 2013
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
@@ -520,4 +521,17 @@ ClientProtos.ClientService.BlockingInter
public ExecutorService getExecutorService() {
return null;
}
+
+ @Override
+ public MultiResponse replay(RpcController controller, MultiRequest request)
+ throws ServiceException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Map<String, HRegion> getRecoveringRegions() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java Wed May 15 04:25:57 2013
@@ -331,6 +331,12 @@ public class TestCatalogJanitor {
public void dispatchMergingRegions(HRegionInfo region_a, HRegionInfo region_b,
boolean forcible) throws IOException {
}
+
+ @Override
+ public boolean isInitialized() {
+ // Auto-generated method stub
+ return false;
+ }
}
@Test