You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/09/06 19:22:02 UTC

svn commit: r1381684 [2/2] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/protobuf/ main/java/org/apache/hadoop/hbase/protobuf/generated/ main/java/org/...

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1381684&r1=1381683&r2=1381684&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Thu Sep  6 17:22:01 2012
@@ -56,7 +56,9 @@ import org.apache.hadoop.hbase.io.HeapSi
 import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.LastSequenceId;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
@@ -120,6 +122,8 @@ public class HLogSplitter {
   // Used in distributed log splitting
   private DistributedLogSplittingHelper distributedLogSplittingHelper = null;
 
+  // For checking the latest flushed sequence id
+  protected final LastSequenceId sequenceIdChecker;
 
   /**
    * Create a new HLogSplitter using the given {@link Configuration} and the
@@ -147,8 +151,9 @@ public class HLogSplitter {
           Path.class, // rootDir
           Path.class, // srcDir
           Path.class, // oldLogDir
-          FileSystem.class); // fs
-      return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs);
+          FileSystem.class, // fs
+          LastSequenceId.class);
+      return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs, null);
     } catch (IllegalArgumentException e) {
       throw new RuntimeException(e);
     } catch (InstantiationException e) {
@@ -165,12 +170,13 @@ public class HLogSplitter {
   }
 
   public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
-      Path oldLogDir, FileSystem fs) {
+      Path oldLogDir, FileSystem fs, LastSequenceId idChecker) {
     this.conf = conf;
     this.rootDir = rootDir;
     this.srcDir = srcDir;
     this.oldLogDir = oldLogDir;
     this.fs = fs;
+    this.sequenceIdChecker = idChecker;
 
     entryBuffers = new EntryBuffers(
         conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
@@ -355,16 +361,38 @@ public class HLogSplitter {
    * @param fs
    * @param conf
    * @param reporter
+   * @param idChecker
    * @return false if it is interrupted by the progress-able.
    * @throws IOException
    */
   static public boolean splitLogFile(Path rootDir, FileStatus logfile,
-      FileSystem fs, Configuration conf, CancelableProgressable reporter)
+      FileSystem fs, Configuration conf, CancelableProgressable reporter,
+      LastSequenceId idChecker)
       throws IOException {
-    HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */, fs);
+    HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */, fs, idChecker);
     return s.splitLogFile(logfile, reporter);
   }
 
+  /**
+   * Splits a HLog file into region's recovered-edits directory
+   * <p>
+   * If the log file has N regions then N recovered.edits files will be
+   * produced.
+   * <p>
+   * @param rootDir
+   * @param logfile
+   * @param fs
+   * @param conf
+   * @param reporter
+   * @return false if it is interrupted by the progress-able.
+   * @throws IOException
+   */
+  static public boolean splitLogFile(Path rootDir, FileStatus logfile,
+      FileSystem fs, Configuration conf, CancelableProgressable reporter)
+      throws IOException {
+    return HLogSplitter.splitLogFile(rootDir, logfile, fs, conf, reporter, null);
+  }
+
   public boolean splitLogFile(FileStatus logfile,
       CancelableProgressable reporter) throws IOException {
     boolean isCorrupted = false;
@@ -402,17 +430,34 @@ public class HLogSplitter {
     outputSink.startWriterThreads();
     // Report progress every so many edits and/or files opened (opening a file
     // takes a bit of time).
-    int editsCount = 0;
+    Map<byte[], Long> lastFlushedSequenceIds =
+      new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
     Entry entry;
+    int editsCount = 0;
+    int editsSkipped = 0;
     try {
       while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
+        byte[] region = entry.getKey().getEncodedRegionName();
+        Long lastFlushedSequenceId = -1l;
+        if (sequenceIdChecker != null) {
+          lastFlushedSequenceId = lastFlushedSequenceIds.get(region);
+          if (lastFlushedSequenceId == null) {
+              lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
+              lastFlushedSequenceIds.put(region, lastFlushedSequenceId);
+          }
+        }
+        if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
+          editsSkipped++;
+          continue;
+        }
         entryBuffers.appendEntry(entry);
         editsCount++;
         // If sufficient edits have passed, check if we should report progress.
         if (editsCount % interval == 0
             || (outputSink.logWriters.size() - numOpenedFilesLastCheck) > numOpenedFilesBeforeReporting) {
           numOpenedFilesLastCheck = outputSink.logWriters.size();
-          String countsStr = "edits=" + editsCount;
+          String countsStr = (editsCount - editsSkipped) +
+            " edits, skipped " + editsSkipped + " edits.";
           status.setStatus("Split " + countsStr);
           if (!reportProgressIfIsDistributedLogSplitting()) {
             return false;

Modified: hbase/trunk/hbase-server/src/main/protobuf/RegionServerStatus.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/RegionServerStatus.proto?rev=1381684&r1=1381683&r2=1381684&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/RegionServerStatus.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/RegionServerStatus.proto Thu Sep  6 17:22:01 2012
@@ -67,6 +67,16 @@ message ReportRSFatalErrorRequest {
 message ReportRSFatalErrorResponse {
 }
 
+message GetLastFlushedSequenceIdRequest {
+  /** region name */
+  required bytes regionName = 1;
+}
+
+message GetLastFlushedSequenceIdResponse {
+  /** the last HLog sequence id flushed from MemStore to HFile for the region */
+  required uint64 lastFlushedSequenceId = 1;
+}
+
 service RegionServerStatusService {
   /** Called when a region server first starts. */
   rpc regionServerStartup(RegionServerStartupRequest)
@@ -82,4 +92,10 @@ service RegionServerStatusService {
    */
   rpc reportRSFatalError(ReportRSFatalErrorRequest)
     returns(ReportRSFatalErrorResponse);
+
+  /** Called to get the sequence id of the last MemStore entry flushed to an
+   * HFile for a specified region. Used by the region server to speed up
+   * log splitting. */
+  rpc getLastFlushedSequenceId(GetLastFlushedSequenceIdRequest)
+    returns(GetLastFlushedSequenceIdResponse);
 }

Modified: hbase/trunk/hbase-server/src/main/protobuf/hbase.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/hbase.proto?rev=1381684&r1=1381683&r2=1381684&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/hbase.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/hbase.proto Thu Sep  6 17:22:01 2012
@@ -132,6 +132,9 @@ message RegionLoad {
 
   /** Region-level coprocessors. */
   repeated Coprocessor coprocessors = 15;
+
+  /** the most recent sequence Id from cache flush */
+  optional uint64 completeSequenceId = 16;
 }
 
 /* Server-level protobufs */

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java?rev=1381684&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java Thu Sep  6 17:22:01 2012
@@ -0,0 +1,157 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
+
+@Category(MediumTests.class)
+public class TestHLogFiltering {
+  private static final Log LOG = LogFactory.getLog(TestHLogFiltering.class);
+
+  private static final int NUM_MASTERS = 1;
+  private static final int NUM_RS = 4;
+
+  private static final byte[] TABLE_NAME = Bytes.toBytes("TestHLogFiltering");
+  private static final byte[] CF1 = Bytes.toBytes("MyCF1");
+  private static final byte[] CF2 = Bytes.toBytes("MyCF2");
+  private static final byte[][] FAMILIES = { CF1, CF2 };
+
+  private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @Before
+  public void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
+    fillTable();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void fillTable() throws IOException, InterruptedException {
+    HTable table = TEST_UTIL.createTable(TABLE_NAME, FAMILIES, 3,
+        Bytes.toBytes("row0"), Bytes.toBytes("row99"), NUM_RS);
+    Random rand = new Random(19387129L);
+    for (int iStoreFile = 0; iStoreFile < 4; ++iStoreFile) {
+      for (int iRow = 0; iRow < 100; ++iRow) {
+        final byte[] row = Bytes.toBytes("row" + iRow);
+        Put put = new Put(row);
+        Delete del = new Delete(row);
+        for (int iCol = 0; iCol < 10; ++iCol) {
+          final byte[] cf = rand.nextBoolean() ? CF1 : CF2;
+          final long ts = rand.nextInt();
+          final byte[] qual = Bytes.toBytes("col" + iCol);
+          if (rand.nextBoolean()) {
+            final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
+                "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
+                ts + "_random_" + rand.nextLong());
+            put.add(cf, qual, ts, value);
+          } else if (rand.nextDouble() < 0.8) {
+            del.deleteColumn(cf, qual, ts);
+          } else {
+            del.deleteColumns(cf, qual, ts);
+          }
+        }
+        table.put(put);
+        table.delete(del);
+        table.flushCommits();
+      }
+    }
+    TEST_UTIL.waitUntilAllRegionsAssigned(NUM_RS);
+  }
+
+  @Test
+  public void testFlushedSequenceIdsSentToHMaster()
+  throws IOException, InterruptedException, ServiceException {
+    SortedMap<byte[], Long> allFlushedSequenceIds =
+        new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+    for (int i = 0; i < NUM_RS; ++i) {
+      flushAllRegions(i);
+    }
+    Thread.sleep(10000);
+    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+    for (int i = 0; i < NUM_RS; ++i) {
+      for (byte[] regionName : getRegionsByServer(i)) {
+        if (allFlushedSequenceIds.containsKey(regionName)) {
+          GetLastFlushedSequenceIdRequest req =
+            RequestConverter.buildGetLastFlushedSequenceIdRequest(regionName);
+
+          assertEquals((long)allFlushedSequenceIds.get(regionName),
+              master.getLastFlushedSequenceId(null, req).getLastFlushedSequenceId());
+        }
+      }
+    }
+  }
+
+  private List<byte[]> getRegionsByServer(int rsId) throws IOException {
+    List<byte[]> regionNames = Lists.newArrayList();
+    HRegionServer hrs = getRegionServer(rsId);
+    for (HRegion r : hrs.getOnlineRegions(TABLE_NAME)) {
+      regionNames.add(r.getRegionName());
+    }
+    return regionNames;
+  }
+
+  private HRegionServer getRegionServer(int rsId) {
+    return TEST_UTIL.getMiniHBaseCluster().getRegionServer(rsId);
+  }
+
+  private void flushAllRegions(int rsId)
+  throws ServiceException, IOException {
+    HRegionServer hrs = getRegionServer(rsId);
+    for (byte[] regionName : getRegionsByServer(rsId)) {
+      FlushRegionRequest request =
+        RequestConverter.buildFlushRegionRequest(regionName);
+      hrs.flushRegion(null, request);
+    }
+  }
+
+  @org.junit.Rule
+  public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+    new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1381684&r1=1381683&r2=1381684&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Thu Sep  6 17:22:01 2012
@@ -665,13 +665,12 @@ public class TestHLogSplit {
     fs.initialize(fs.getUri(), conf);
     // Set up a splitter that will throw an IOE on the output side
     HLogSplitter logSplitter = new HLogSplitter(
-        conf, hbaseDir, hlogDir, oldLogDir, fs) {
+        conf, hbaseDir, hlogDir, oldLogDir, fs, null) {
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
       throws IOException {
         HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
         Mockito.doThrow(new IOException("Injected")).when(mockWriter).append(Mockito.<HLog.Entry>any());
         return mockWriter;
-
       }
     };
     try {
@@ -698,7 +697,7 @@ public class TestHLogSplit {
         when(spiedFs).append(Mockito.<Path>any());
 
     HLogSplitter logSplitter = new HLogSplitter(
-        conf, hbaseDir, hlogDir, oldLogDir, spiedFs);
+        conf, hbaseDir, hlogDir, oldLogDir, spiedFs, null);
 
     try {
       logSplitter.splitLog();
@@ -756,7 +755,7 @@ public class TestHLogSplit {
 
     // Create a splitter that reads and writes the data without touching disk
     HLogSplitter logSplitter = new HLogSplitter(
-        localConf, hbaseDir, hlogDir, oldLogDir, fs) {
+        localConf, hbaseDir, hlogDir, oldLogDir, fs, null) {
 
       /* Produce a mock writer that doesn't write anywhere */
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
@@ -1015,7 +1014,7 @@ public class TestHLogSplit {
     generateHLogs(1, 10, -1);
     FileStatus logfile = fs.listStatus(hlogDir)[0];
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter, null);
     HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
         .toString(), conf);
 
@@ -1122,7 +1121,7 @@ public class TestHLogSplit {
     generateHLogs(-1);
 
     HLogSplitter logSplitter = new HLogSplitter(
-        conf, hbaseDir, hlogDir, oldLogDir, fs) {
+        conf, hbaseDir, hlogDir, oldLogDir, fs, null) {
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
       throws IOException {
         HLog.Writer writer = HLog.createWriter(fs, logfile, conf);