You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/08/25 08:57:30 UTC
svn commit: r1377225 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/
main/java/org/apache/hadoop/hbase/ipc/
main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/regionser...
Author: mbautin
Date: Sat Aug 25 06:57:29 2012
New Revision: 1377225
URL: http://svn.apache.org/viewvc?rev=1377225&view=rev
Log:
[HBASE-6508] [89-fb] Filter out HLog edits at log splitting time
Author: avf
Summary:
Filter out edits that have already been flushed from MemStore to HFiles for a specific region. To do so, we:
1) Keep track of the last sequence id flushed for each of server's regions for in HServerInfo
2) Keep track of sequence ids for all region servers in HMaster
3) Create a method call in HMaseRegionInterface that region servers can use to retrieve latest flushed sequence id for a given region
4) Pass HMasterRegionInterface objects to HLogSplitter / HLogSplitWorker
Test Plan: Unit tests (added unit test TestHLogFiltering)
Reviewers: Liyin, aaiyer, Kannan
Reviewed By: Liyin
CC: khemani, JIRA, tedyu
Differential Revision: https://reviews.facebook.net/D4509
Added:
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerInfo.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/CodeToClassAndBack.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseMapWritable.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerInfo.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerInfo.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerInfo.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerInfo.java Sat Aug 25 06:57:29 2012
@@ -25,10 +25,14 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.regex.Pattern;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -69,6 +73,11 @@ public class HServerInfo implements Writ
private String hostname;
private String cachedHostnamePort = null;
+ // For each region, store the last sequence id that was flushed
+ // from MemStore to an HFile
+ private final SortedMap<byte[], Long> flushedSequenceIdByRegion =
+ new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+
public HServerInfo() {
this(new HServerAddress(), 0, "default name");
}
@@ -110,6 +119,7 @@ public class HServerInfo implements Writ
this.startCode = other.getStartCode();
this.load = other.getLoad();
this.hostname = other.hostname;
+ this.flushedSequenceIdByRegion.putAll(other.flushedSequenceIdByRegion);
}
public HServerLoad getLoad() {
@@ -137,6 +147,18 @@ public class HServerInfo implements Writ
return this.hostname;
}
+ public void setFlushedSequenceIdForRegion(byte[] region, long sequenceId) {
+ flushedSequenceIdByRegion.put(region, sequenceId);
+ }
+
+ public long getFlushedSequenceIdForRegion(byte[] region) {
+ return flushedSequenceIdByRegion.get(region);
+ }
+
+ public SortedMap<byte[], Long> getFlushedSequenceIdByRegion() {
+ return flushedSequenceIdByRegion;
+ }
+
/**
* @return The hostname and port concatenated with a ':' as separator.
*/
@@ -238,6 +260,9 @@ public class HServerInfo implements Writ
this.load.readFields(in);
in.readInt();
this.hostname = in.readUTF();
+ HbaseMapWritable<byte[], Long> sequenceIdsWritable =
+ new HbaseMapWritable<byte[], Long>(flushedSequenceIdByRegion);
+ sequenceIdsWritable.readFields(in);
}
public void write(DataOutput out) throws IOException {
@@ -247,6 +272,9 @@ public class HServerInfo implements Writ
// Still serializing the info port for backward compatibility but it is not used.
out.writeInt(HConstants.DEFAULT_REGIONSERVER_INFOPORT);
out.writeUTF(hostname);
+ HbaseMapWritable<byte[], Long> sequenceIdsWritable =
+ new HbaseMapWritable<byte[], Long>(flushedSequenceIdByRegion);
+ sequenceIdsWritable.write(out);
}
public int compareTo(HServerInfo o) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/CodeToClassAndBack.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/CodeToClassAndBack.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/CodeToClassAndBack.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/CodeToClassAndBack.java Sat Aug 25 06:57:29 2012
@@ -45,7 +45,7 @@ public interface CodeToClassAndBack {
/**
* Class list for supported classes
*/
- public Class<?>[] classList = {byte[].class};
+ public Class<?>[] classList = {byte[].class, Long.class};
/**
* The static loader that is used instead of the static constructor in
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseMapWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseMapWritable.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseMapWritable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseMapWritable.java Sat Aug 25 06:57:29 2012
@@ -187,6 +187,8 @@ implements SortedMap<byte[],V>, Configur
Object value = e.getValue();
if (value instanceof byte []) {
Bytes.writeByteArray(out, (byte [])value);
+ } else if (value instanceof Long) {
+ out.writeLong((Long)value);
} else {
((Writable)value).write(out);
}
@@ -209,6 +211,9 @@ implements SortedMap<byte[],V>, Configur
if (clazz.equals(byte [].class)) {
byte [] bytes = Bytes.readByteArray(in);
value = (V)bytes;
+ } else if (clazz.equals(Long.class)) {
+ Long val = in.readLong();
+ value = (V)val;
} else {
Writable w = (Writable)ReflectionUtils.
newInstance(clazz, getConf());
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java Sat Aug 25 06:57:29 2012
@@ -61,4 +61,18 @@ public interface HMasterRegionInterface
public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[],
HRegionInfo mostLoadedRegions[])
throws IOException;
+
+
+ /**
+ * Get the sequence id of the last MemStore entry flushed to an
+ * HFile for a specified region. Used by the region server to speed up
+ * log splitting
+ *
+ * @param regionName
+ * @return The last HLog sequence id flushed from MemStore to HFile for
+ * the region
+ * @throws IOException
+ */
+ public long getLastFlushedSequenceId(byte[] regionName)
+ throws IOException;
}
\ No newline at end of file
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Sat Aug 25 06:57:29 2012
@@ -35,9 +35,12 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -164,6 +167,9 @@ public class HMaster extends HasThread i
private final AtomicBoolean clusterShutdownRequested =
new AtomicBoolean(false);
+ private final SortedMap<byte[], Long> flushedSequenceIdByRegion =
+ new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+
private final Configuration conf;
private final Path rootdir;
private InfoServer infoServer;
@@ -1269,6 +1275,27 @@ public class HMaster extends HasThread i
this.serverManager.regionServerReport(serverInfo, msgs, mostLoadedRegions));
}
+ void updateLastFlushedSequenceIds(HServerInfo serverInfo) {
+ SortedMap<byte[], Long> flushedSequenceIds = serverInfo.getFlushedSequenceIdByRegion();
+ for (Entry<byte[], Long> entry : flushedSequenceIds.entrySet()) {
+ Long existingValue = flushedSequenceIdByRegion.get(entry.getKey());
+ if (existingValue != null) {
+ if (entry.getValue() < existingValue) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RegionServer " + serverInfo +
+ " indicates a last flushed sequence id (" + entry.getValue() +
+ ") that is less than the previous last flushed sequence id (" +
+ existingValue + ") for region " +
+ Bytes.toString(entry.getKey()) + " Ignoring.");
+ }
+ continue; // Don't let smaller sequence ids override greater
+ // sequence ids.
+ }
+ }
+ flushedSequenceIdByRegion.put(entry.getKey(), entry.getValue());
+ }
+ }
+
/**
* Override if you'd add messages to return to regionserver <code>hsi</code>
* or to send an exception.
@@ -1539,7 +1566,7 @@ public class HMaster extends HasThread i
HColumnDescriptor descriptor)
throws IOException {
alterTable(tableName, null, Arrays.asList(
- new Pair<byte [], HColumnDescriptor>(columnName, descriptor)), null);
+ new Pair<byte[], HColumnDescriptor>(columnName, descriptor)), null);
}
@Override
@@ -2214,6 +2241,14 @@ public class HMaster extends HasThread i
}
}
+ @Override
+ public long getLastFlushedSequenceId(byte[] regionName) throws IOException {
+ if (flushedSequenceIdByRegion.containsKey(regionName)) {
+ return flushedSequenceIdByRegion.get(regionName);
+ }
+ return -1;
+ }
+
String getZKWrapperName() {
return getServerName();
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Sat Aug 25 06:57:29 2012
@@ -387,6 +387,7 @@ public class ServerManager {
// will send us one of these messages after it gets MSG_REGIONSERVER_STOP
return new HMsg [] {HMsg.REGIONSERVER_STOP};
}
+ this.master.updateLastFlushedSequenceIds(info);
return processRegionServerAllsWell(info, mostLoadedRegions, msgs);
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Aug 25 06:57:29 2012
@@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.HColumnDe
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
@@ -1436,7 +1437,6 @@ public class HRegion implements HeapSize
sequenceId = (wal == null)? myseqid :
wal.startCacheFlush(this.regionInfo.getRegionName());
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
-
for (Store s : stores.values()) {
storeFlushers.add(s.getStoreFlusher(completeSequenceId));
}
@@ -1540,6 +1540,13 @@ public class HRegion implements HeapSize
this.getRegionInfo().isMetaRegion());
}
+ // Update the last flushed sequence id for region
+ if (this.regionServer != null) {
+ this.regionServer.getServerInfo().setFlushedSequenceIdForRegion(
+ getRegionName(),
+ completeSequenceId);
+ }
+
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
synchronized (this) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Aug 25 06:57:29 2012
@@ -59,6 +59,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.cli.CommandLine;
@@ -193,7 +194,7 @@ public class HRegionServer implements HR
// If false, the file system has become unavailable
protected volatile boolean fsOk;
- protected HServerInfo serverInfo;
+ protected volatile HServerInfo serverInfo;
protected final Configuration conf;
private final ServerConnection connection;
@@ -336,6 +337,9 @@ public class HRegionServer implements HR
/** Regionserver launched by the main method. Not used in tests. */
private static HRegionServer mainRegionServer;
+ /** Keep a reference to the current active master for use by HLogSplitter. */
+ private final AtomicReference<HMasterRegionInterface> masterRef =
+ new AtomicReference<HMasterRegionInterface>();
/**
* Starts a HRegionServer at the default location
@@ -1466,7 +1470,7 @@ public class HRegionServer implements HR
// Create the log splitting worker and start it
this.splitLogWorker = new SplitLogWorker(this.zooKeeperWrapper,
this.getConfiguration(), this.serverInfo.getServerName(),
- logCloseThreadPool);
+ logCloseThreadPool, masterRef);
splitLogWorker.start();
LOG.info("HRegionServer started at: " +
this.serverInfo.getServerAddress().toString());
@@ -1651,6 +1655,7 @@ public class HRegionServer implements HR
}
}
this.hbaseMaster = master;
+ masterRef.set(hbaseMaster);
return true;
}
@@ -2849,10 +2854,12 @@ public class HRegionServer implements HR
* @return the removed HRegion, or null if the HRegion was not in onlineRegions.
*/
HRegion removeFromOnlineRegions(HRegionInfo hri) {
+ byte[] regionName = hri.getRegionName();
+ serverInfo.getFlushedSequenceIdByRegion().remove(regionName);
this.lock.writeLock().lock();
HRegion toReturn = null;
try {
- toReturn = onlineRegions.remove(Bytes.mapKey(hri.getRegionName()));
+ toReturn = onlineRegions.remove(Bytes.mapKey(regionName));
} finally {
this.lock.writeLock().unlock();
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Sat Aug 25 06:57:29 2012
@@ -27,6 +27,7 @@ import java.io.InterruptedIOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,6 +35,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
+import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -87,7 +90,6 @@ public class SplitLogWorker implements R
private boolean workerInGrabTask = false;
protected ZooKeeperWrapper watcher;
-
public SplitLogWorker(ZooKeeperWrapper watcher, Configuration conf,
String serverName, TaskExecutor executor) {
this.watcher = watcher;
@@ -97,7 +99,8 @@ public class SplitLogWorker implements R
}
public SplitLogWorker(ZooKeeperWrapper watcher, final Configuration conf,
- final String serverName, final ExecutorService logCloseThreadPool) {
+ final String serverName, final ExecutorService logCloseThreadPool,
+ final AtomicReference<HMasterRegionInterface> masterRef) {
this(watcher, conf, serverName, new TaskExecutor () {
@Override
public Status exec(String filename, CancelableProgressable p) {
@@ -128,7 +131,7 @@ public class SplitLogWorker implements R
String tmpname =
ZKSplitLog.getSplitLogDirTmpComponent(serverName, filename);
if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname,
- st, fs, conf, p, logCloseThreadPool) == false) {
+ st, fs, conf, p, logCloseThreadPool, masterRef.get()) == false) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Sat Aug 25 06:57:29 2012
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
@@ -47,6 +48,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
+import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
@@ -82,11 +85,12 @@ public class HLogSplitter {
// Thread pool for closing LogWriters in parallel
protected final ExecutorService logCloseThreadPool;
+ // For checking the latest flushed sequence id
+ protected final HMasterRegionInterface master;
// If an exception is thrown by one of the other threads, it will be
// stored here.
protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
-
// Wait/notify for when data has been produced by the reader thread,
// consumed by the reader thread, or an exception occurred
Object dataAvailable = new Object();
@@ -136,13 +140,15 @@ public class HLogSplitter {
}
public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
- Path oldLogDir, FileSystem fs, ExecutorService logCloseThreadPool) {
+ Path oldLogDir, FileSystem fs, ExecutorService logCloseThreadPool,
+ HMasterRegionInterface master) {
this.conf = conf;
this.rootDir = rootDir;
this.srcDir = srcDir;
this.oldLogDir = oldLogDir;
this.fs = fs;
this.logCloseThreadPool = logCloseThreadPool;
+ this.master = master;
}
/**
@@ -166,9 +172,10 @@ public class HLogSplitter {
static public boolean splitLogFileToTemp(Path rootDir, String tmpname,
FileStatus logfile, FileSystem fs,
Configuration conf, CancelableProgressable reporter,
- ExecutorService logCloseThreadPool) throws IOException {
+ ExecutorService logCloseThreadPool, HMasterRegionInterface master)
+ throws IOException {
HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */,
- fs, logCloseThreadPool);
+ fs, logCloseThreadPool, master);
return s.splitLogFileToTemp(logfile, tmpname, reporter);
}
@@ -217,11 +224,32 @@ public class HLogSplitter {
status.markComplete("Failed: reporter.progress asked us to terminate");
return false;
}
+ Map<byte[], Long> lastFlushedSequenceIds =
+ new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
int editsCount = 0;
+ int editsSkipped = 0;
Entry entry;
try {
while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
byte[] region = entry.getKey().getRegionName();
+ Long lastFlushedSequenceId = -1l;
+ if (master != null) {
+ lastFlushedSequenceId = lastFlushedSequenceIds.get(region);
+ if (lastFlushedSequenceId == null) {
+ try {
+ lastFlushedSequenceId = master.getLastFlushedSequenceId(region);
+ lastFlushedSequenceIds.put(region, lastFlushedSequenceId);
+ } catch (ConnectException e) {
+ lastFlushedSequenceId = -1l;
+ LOG.warn("Unable to connect to the master to check " +
+ "the last flushed sequence id", e);
+ }
+ }
+ }
+ if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
+ editsSkipped++;
+ continue;
+ }
Object o = logWriters.get(region);
if (o == BAD_WRITER) {
continue;
@@ -241,7 +269,8 @@ public class HLogSplitter {
wap.w.append(entry);
editsCount++;
if (editsCount % interval == 0) {
- status.setStatus("Split " + editsCount + " edits");
+ status.setStatus("Split " + (editsCount - editsSkipped) +
+ " edits, skipped " + editsSkipped + " edits.");
long t1 = EnvironmentEdgeManager.currentTimeMillis();
if ((t1 - last_report_at) > period) {
last_report_at = t;
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestSerialization.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestSerialization.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestSerialization.java Sat Aug 25 06:57:29 2012
@@ -138,10 +138,13 @@ public class TestSerialization extends H
*/
public void testServerInfo() throws Exception {
HServerInfo hsi = new HServerInfo(new HServerAddress("0.0.0.0:123"), -1, "default name");
+ byte[] region = Bytes.toBytes("region");
+ hsi.setFlushedSequenceIdForRegion(region, 0xfaceb);
byte [] b = Writables.getBytes(hsi);
HServerInfo deserializedHsi =
(HServerInfo)Writables.getWritable(b, new HServerInfo());
assertTrue(hsi.equals(deserializedHsi));
+ assertEquals(hsi.getFlushedSequenceIdForRegion(region), 0xfaceb);
}
public void testPut() throws Exception{
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java?rev=1377225&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java Sat Aug 25 06:57:29 2012
@@ -0,0 +1,147 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+public class TestHLogFiltering {
+
+ private static final Log LOG = LogFactory.getLog(TestHLogFiltering.class);
+
+ private static final int NUM_MASTERS = 1;
+ private static final int NUM_RS = 3;
+
+ private static final byte[] TABLE_NAME = Bytes.toBytes("TestHLogFiltering");
+ private static final byte[] CF1 = Bytes.toBytes("MyCF1");
+ private static final byte[] CF2 = Bytes.toBytes("MyCF2");
+ private static final byte[][] FAMILIES = { CF1, CF2 };
+
+ private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @Before
+ public void setUp() throws Exception {
+ TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
+ fillTable();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private void fillTable() throws IOException, InterruptedException {
+ HTable table = TEST_UTIL.createTable(TABLE_NAME, FAMILIES, 3,
+ Bytes.toBytes("row0"), Bytes.toBytes("row99"), NUM_RS);
+ Random rand = new Random(19387129L);
+ for (int iStoreFile = 0; iStoreFile < 4; ++iStoreFile) {
+ for (int iRow = 0; iRow < 100; ++iRow) {
+ final byte[] row = Bytes.toBytes("row" + iRow);
+ Put put = new Put(row);
+ Delete del = new Delete(row);
+ for (int iCol = 0; iCol < 10; ++iCol) {
+ final byte[] cf = rand.nextBoolean() ? CF1 : CF2;
+ final long ts = rand.nextInt();
+ final byte[] qual = Bytes.toBytes("col" + iCol);
+ if (rand.nextBoolean()) {
+ final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
+ "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
+ ts + "_random_" + rand.nextLong());
+ put.add(cf, qual, ts, value);
+ } else if (rand.nextDouble() < 0.8) {
+ del.deleteColumn(cf, qual, ts);
+ } else {
+ del.deleteColumns(cf, qual, ts);
+ }
+ }
+ table.put(put);
+ table.delete(del);
+ table.flushCommits();
+ }
+ }
+ TEST_UTIL.waitUntilAllRegionsAssigned(NUM_RS);
+ }
+
+ @Test
+ public void testFlushedSequenceIdsSentToHMaster()
+ throws IOException, InterruptedException {
+ SortedMap<byte[], Long> allFlushedSequenceIds =
+ new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ for (int i = 0; i < NUM_RS; ++i) {
+ SortedMap<byte[], Long> flushedSequenceIds = flushAllRegions(i);
+ assertTrue(flushedSequenceIds.size() > 0);
+ allFlushedSequenceIds.putAll(flushedSequenceIds);
+ }
+ Thread.sleep(10000);
+ HMasterRegionInterface master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+ for (int i = 0; i < NUM_RS; ++i) {
+ for (byte[] regionName : getRegionsByServer(i)) {
+ if (allFlushedSequenceIds.containsKey(regionName)) {
+ assertEquals((long)allFlushedSequenceIds.get(regionName),
+ master.getLastFlushedSequenceId(regionName));
+ }
+ }
+ }
+ }
+
+ private List<byte[]> getRegionsByServer(int rsId) throws IOException {
+ List<byte[]> regionNames = Lists.newArrayList();
+ HRegionServer hrs = getRegionServer(rsId);
+ for (HRegionInfo hri : hrs.getRegionsAssignment()) {
+ regionNames.add(hri.getRegionName());
+ }
+ return regionNames;
+ }
+
+ private HRegionServer getRegionServer(int rsId) {
+ return TEST_UTIL.getMiniHBaseCluster().getRegionServer(rsId);
+ }
+
+ private SortedMap<byte[], Long> flushAllRegions(int rsId)
+ throws IOException {
+ HRegionServer hrs = getRegionServer(rsId);
+ for (byte[] regionName : getRegionsByServer(rsId)) {
+ hrs.flushRegion(regionName);
+ }
+ return hrs.getServerInfo().getFlushedSequenceIdByRegion();
+ }
+}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Sat Aug 25 06:57:29 2012
@@ -725,7 +725,7 @@ public class TestHLogSplit {
FileStatus logfile = fs.listStatus(hlogDir)[0];
fs.initialize(fs.getUri(), conf);
HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
- reporter, logCloseThreadPool);
+ reporter, logCloseThreadPool, null);
HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
logfile.getPath().toString(), conf);
@@ -751,7 +751,7 @@ public class TestHLogSplit {
fs.delete(regiondir, true);
HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
- reporter, logCloseThreadPool);
+ reporter, logCloseThreadPool, null);
HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
logfile.getPath().toString(), conf);
// This test passes if there are no exceptions when
@@ -769,7 +769,7 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf);
HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
- reporter, logCloseThreadPool);
+ reporter, logCloseThreadPool, null);
HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
logfile.getPath().toString(), conf);
Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME);
@@ -787,7 +787,7 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf);
HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
- reporter, logCloseThreadPool);
+ reporter, logCloseThreadPool, null);
HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
logfile.getPath().toString(), conf);
for (String region : regions) {
@@ -807,7 +807,7 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf);
HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
- reporter, logCloseThreadPool);
+ reporter, logCloseThreadPool, null);
HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
logfile.getPath().toString(), conf);