You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/09/06 19:22:02 UTC
svn commit: r1381684 [2/2] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/protobuf/
main/java/org/apache/hadoop/hbase/protobuf/generated/ main/java/org/...
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1381684&r1=1381683&r2=1381684&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Thu Sep 6 17:22:01 2012
@@ -56,7 +56,9 @@ import org.apache.hadoop.hbase.io.HeapSi
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
@@ -120,6 +122,8 @@ public class HLogSplitter {
// Used in distributed log splitting
private DistributedLogSplittingHelper distributedLogSplittingHelper = null;
+ // For checking the latest flushed sequence id
+ protected final LastSequenceId sequenceIdChecker;
/**
* Create a new HLogSplitter using the given {@link Configuration} and the
@@ -147,8 +151,9 @@ public class HLogSplitter {
Path.class, // rootDir
Path.class, // srcDir
Path.class, // oldLogDir
- FileSystem.class); // fs
- return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs);
+ FileSystem.class, // fs
+ LastSequenceId.class);
+ return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs, null);
} catch (IllegalArgumentException e) {
throw new RuntimeException(e);
} catch (InstantiationException e) {
@@ -165,12 +170,13 @@ public class HLogSplitter {
}
public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
- Path oldLogDir, FileSystem fs) {
+ Path oldLogDir, FileSystem fs, LastSequenceId idChecker) {
this.conf = conf;
this.rootDir = rootDir;
this.srcDir = srcDir;
this.oldLogDir = oldLogDir;
this.fs = fs;
+ this.sequenceIdChecker = idChecker;
entryBuffers = new EntryBuffers(
conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
@@ -355,16 +361,38 @@ public class HLogSplitter {
* @param fs
* @param conf
* @param reporter
+ * @param idChecker
* @return false if it is interrupted by the progress-able.
* @throws IOException
*/
static public boolean splitLogFile(Path rootDir, FileStatus logfile,
- FileSystem fs, Configuration conf, CancelableProgressable reporter)
+ FileSystem fs, Configuration conf, CancelableProgressable reporter,
+ LastSequenceId idChecker)
throws IOException {
- HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */, fs);
+ HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */, fs, idChecker);
return s.splitLogFile(logfile, reporter);
}
+ /**
+ * Splits a HLog file into region's recovered-edits directory
+ * <p>
+ * If the log file has N regions then N recovered.edits files will be
+ * produced.
+ * <p>
+ * @param rootDir
+ * @param logfile
+ * @param fs
+ * @param conf
+ * @param reporter
+ * @return false if it is interrupted by the progress-able.
+ * @throws IOException
+ */
+ static public boolean splitLogFile(Path rootDir, FileStatus logfile,
+ FileSystem fs, Configuration conf, CancelableProgressable reporter)
+ throws IOException {
+ return HLogSplitter.splitLogFile(rootDir, logfile, fs, conf, reporter, null);
+ }
+
public boolean splitLogFile(FileStatus logfile,
CancelableProgressable reporter) throws IOException {
boolean isCorrupted = false;
@@ -402,17 +430,34 @@ public class HLogSplitter {
outputSink.startWriterThreads();
// Report progress every so many edits and/or files opened (opening a file
// takes a bit of time).
- int editsCount = 0;
+ Map<byte[], Long> lastFlushedSequenceIds =
+ new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
Entry entry;
+ int editsCount = 0;
+ int editsSkipped = 0;
try {
while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
+ byte[] region = entry.getKey().getEncodedRegionName();
+ Long lastFlushedSequenceId = -1l;
+ if (sequenceIdChecker != null) {
+ lastFlushedSequenceId = lastFlushedSequenceIds.get(region);
+ if (lastFlushedSequenceId == null) {
+ lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
+ lastFlushedSequenceIds.put(region, lastFlushedSequenceId);
+ }
+ }
+ if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
+ editsSkipped++;
+ continue;
+ }
entryBuffers.appendEntry(entry);
editsCount++;
// If sufficient edits have passed, check if we should report progress.
if (editsCount % interval == 0
|| (outputSink.logWriters.size() - numOpenedFilesLastCheck) > numOpenedFilesBeforeReporting) {
numOpenedFilesLastCheck = outputSink.logWriters.size();
- String countsStr = "edits=" + editsCount;
+ String countsStr = (editsCount - editsSkipped) +
+ " edits, skipped " + editsSkipped + " edits.";
status.setStatus("Split " + countsStr);
if (!reportProgressIfIsDistributedLogSplitting()) {
return false;
Modified: hbase/trunk/hbase-server/src/main/protobuf/RegionServerStatus.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/RegionServerStatus.proto?rev=1381684&r1=1381683&r2=1381684&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/RegionServerStatus.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/RegionServerStatus.proto Thu Sep 6 17:22:01 2012
@@ -67,6 +67,16 @@ message ReportRSFatalErrorRequest {
message ReportRSFatalErrorResponse {
}
+message GetLastFlushedSequenceIdRequest {
+ /** region name */
+ required bytes regionName = 1;
+}
+
+message GetLastFlushedSequenceIdResponse {
+ /** the last HLog sequence id flushed from MemStore to HFile for the region */
+ required uint64 lastFlushedSequenceId = 1;
+}
+
service RegionServerStatusService {
/** Called when a region server first starts. */
rpc regionServerStartup(RegionServerStartupRequest)
@@ -82,4 +92,10 @@ service RegionServerStatusService {
*/
rpc reportRSFatalError(ReportRSFatalErrorRequest)
returns(ReportRSFatalErrorResponse);
+
+ /** Called to get the sequence id of the last MemStore entry flushed to an
+ * HFile for a specified region. Used by the region server to speed up
+ * log splitting. */
+ rpc getLastFlushedSequenceId(GetLastFlushedSequenceIdRequest)
+ returns(GetLastFlushedSequenceIdResponse);
}
Modified: hbase/trunk/hbase-server/src/main/protobuf/hbase.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/hbase.proto?rev=1381684&r1=1381683&r2=1381684&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/hbase.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/hbase.proto Thu Sep 6 17:22:01 2012
@@ -132,6 +132,9 @@ message RegionLoad {
/** Region-level coprocessors. */
repeated Coprocessor coprocessors = 15;
+
+ /** the most recent sequence Id from cache flush */
+ optional uint64 completeSequenceId = 16;
}
/* Server-level protobufs */
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java?rev=1381684&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java Thu Sep 6 17:22:01 2012
@@ -0,0 +1,157 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
+
+@Category(MediumTests.class)
+public class TestHLogFiltering {
+ private static final Log LOG = LogFactory.getLog(TestHLogFiltering.class);
+
+ private static final int NUM_MASTERS = 1;
+ private static final int NUM_RS = 4;
+
+ private static final byte[] TABLE_NAME = Bytes.toBytes("TestHLogFiltering");
+ private static final byte[] CF1 = Bytes.toBytes("MyCF1");
+ private static final byte[] CF2 = Bytes.toBytes("MyCF2");
+ private static final byte[][] FAMILIES = { CF1, CF2 };
+
+ private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @Before
+ public void setUp() throws Exception {
+ TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
+ fillTable();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private void fillTable() throws IOException, InterruptedException {
+ HTable table = TEST_UTIL.createTable(TABLE_NAME, FAMILIES, 3,
+ Bytes.toBytes("row0"), Bytes.toBytes("row99"), NUM_RS);
+ Random rand = new Random(19387129L);
+ for (int iStoreFile = 0; iStoreFile < 4; ++iStoreFile) {
+ for (int iRow = 0; iRow < 100; ++iRow) {
+ final byte[] row = Bytes.toBytes("row" + iRow);
+ Put put = new Put(row);
+ Delete del = new Delete(row);
+ for (int iCol = 0; iCol < 10; ++iCol) {
+ final byte[] cf = rand.nextBoolean() ? CF1 : CF2;
+ final long ts = rand.nextInt();
+ final byte[] qual = Bytes.toBytes("col" + iCol);
+ if (rand.nextBoolean()) {
+ final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
+ "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
+ ts + "_random_" + rand.nextLong());
+ put.add(cf, qual, ts, value);
+ } else if (rand.nextDouble() < 0.8) {
+ del.deleteColumn(cf, qual, ts);
+ } else {
+ del.deleteColumns(cf, qual, ts);
+ }
+ }
+ table.put(put);
+ table.delete(del);
+ table.flushCommits();
+ }
+ }
+ TEST_UTIL.waitUntilAllRegionsAssigned(NUM_RS);
+ }
+
+ @Test
+ public void testFlushedSequenceIdsSentToHMaster()
+ throws IOException, InterruptedException, ServiceException {
+ SortedMap<byte[], Long> allFlushedSequenceIds =
+ new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ for (int i = 0; i < NUM_RS; ++i) {
+ flushAllRegions(i);
+ }
+ Thread.sleep(10000);
+ HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+ for (int i = 0; i < NUM_RS; ++i) {
+ for (byte[] regionName : getRegionsByServer(i)) {
+ if (allFlushedSequenceIds.containsKey(regionName)) {
+ GetLastFlushedSequenceIdRequest req =
+ RequestConverter.buildGetLastFlushedSequenceIdRequest(regionName);
+
+ assertEquals((long)allFlushedSequenceIds.get(regionName),
+ master.getLastFlushedSequenceId(null, req).getLastFlushedSequenceId());
+ }
+ }
+ }
+ }
+
+ private List<byte[]> getRegionsByServer(int rsId) throws IOException {
+ List<byte[]> regionNames = Lists.newArrayList();
+ HRegionServer hrs = getRegionServer(rsId);
+ for (HRegion r : hrs.getOnlineRegions(TABLE_NAME)) {
+ regionNames.add(r.getRegionName());
+ }
+ return regionNames;
+ }
+
+ private HRegionServer getRegionServer(int rsId) {
+ return TEST_UTIL.getMiniHBaseCluster().getRegionServer(rsId);
+ }
+
+ private void flushAllRegions(int rsId)
+ throws ServiceException, IOException {
+ HRegionServer hrs = getRegionServer(rsId);
+ for (byte[] regionName : getRegionsByServer(rsId)) {
+ FlushRegionRequest request =
+ RequestConverter.buildFlushRegionRequest(regionName);
+ hrs.flushRegion(null, request);
+ }
+ }
+
+ @org.junit.Rule
+ public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+ new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1381684&r1=1381683&r2=1381684&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Thu Sep 6 17:22:01 2012
@@ -665,13 +665,12 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf);
// Set up a splitter that will throw an IOE on the output side
HLogSplitter logSplitter = new HLogSplitter(
- conf, hbaseDir, hlogDir, oldLogDir, fs) {
+ conf, hbaseDir, hlogDir, oldLogDir, fs, null) {
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException {
HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
Mockito.doThrow(new IOException("Injected")).when(mockWriter).append(Mockito.<HLog.Entry>any());
return mockWriter;
-
}
};
try {
@@ -698,7 +697,7 @@ public class TestHLogSplit {
when(spiedFs).append(Mockito.<Path>any());
HLogSplitter logSplitter = new HLogSplitter(
- conf, hbaseDir, hlogDir, oldLogDir, spiedFs);
+ conf, hbaseDir, hlogDir, oldLogDir, spiedFs, null);
try {
logSplitter.splitLog();
@@ -756,7 +755,7 @@ public class TestHLogSplit {
// Create a splitter that reads and writes the data without touching disk
HLogSplitter logSplitter = new HLogSplitter(
- localConf, hbaseDir, hlogDir, oldLogDir, fs) {
+ localConf, hbaseDir, hlogDir, oldLogDir, fs, null) {
/* Produce a mock writer that doesn't write anywhere */
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
@@ -1015,7 +1014,7 @@ public class TestHLogSplit {
generateHLogs(1, 10, -1);
FileStatus logfile = fs.listStatus(hlogDir)[0];
fs.initialize(fs.getUri(), conf);
- HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+ HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter, null);
HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
.toString(), conf);
@@ -1122,7 +1121,7 @@ public class TestHLogSplit {
generateHLogs(-1);
HLogSplitter logSplitter = new HLogSplitter(
- conf, hbaseDir, hlogDir, oldLogDir, fs) {
+ conf, hbaseDir, hlogDir, oldLogDir, fs, null) {
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException {
HLog.Writer writer = HLog.createWriter(fs, logfile, conf);