You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/08/29 18:23:33 UTC

svn commit: r1378631 [2/2] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/protobuf/ main/java/org/apache/hadoop/hbase/protobuf/generated/ main/java/org/...

Modified: hbase/trunk/hbase-server/src/main/protobuf/RegionServerStatus.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/RegionServerStatus.proto?rev=1378631&r1=1378630&r2=1378631&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/RegionServerStatus.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/RegionServerStatus.proto Wed Aug 29 16:23:32 2012
@@ -67,6 +67,16 @@ message ReportRSFatalErrorRequest {
 message ReportRSFatalErrorResponse {
 }
 
+message GetLastFlushedSequenceIdRequest {
+  /** region name */
+  required bytes regionName = 1;
+}
+
+message GetLastFlushedSequenceIdResponse {
+  /** the last HLog sequence id flushed from MemStore to HFile for the region */
+  required uint64 lastFlushedSequenceId = 1;
+}
+
 service RegionServerStatusService {
   /** Called when a region server first starts. */
   rpc regionServerStartup(RegionServerStartupRequest)
@@ -82,4 +92,10 @@ service RegionServerStatusService {
    */
   rpc reportRSFatalError(ReportRSFatalErrorRequest)
     returns(ReportRSFatalErrorResponse);
+
+  /** Called to get the sequence id of the last MemStore entry flushed to an
+   * HFile for a specified region. Used by the region server to speed up
+   * log splitting. */
+  rpc getLastFlushedSequenceId(GetLastFlushedSequenceIdRequest)
+    returns(GetLastFlushedSequenceIdResponse);
 }

Modified: hbase/trunk/hbase-server/src/main/protobuf/hbase.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/hbase.proto?rev=1378631&r1=1378630&r2=1378631&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/hbase.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/hbase.proto Wed Aug 29 16:23:32 2012
@@ -132,6 +132,9 @@ message RegionLoad {
 
   /** Region-level coprocessors. */
   repeated Coprocessor coprocessors = 15;
+
+  /** the most recent sequence Id from cache flush */
+  optional uint64 completeSequenceId = 16;
 }
 
 /* Server-level protobufs */

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java?rev=1378631&r1=1378630&r2=1378631&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java Wed Aug 29 16:23:32 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.TableDescriptors;
@@ -175,6 +176,10 @@ public class TestCatalogJanitor {
     }
 
     @Override
+    public void updateLastFlushedSequenceIds(ServerName sn, ServerLoad hsl) {
+      // no-op
+    }
+    @Override
     public void checkTableModifiable(byte[] tableName) throws IOException {
       //no-op
     }

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java?rev=1378631&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java Wed Aug 29 16:23:32 2012
@@ -0,0 +1,155 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
+
+@Category(MediumTests.class)
+public class TestHLogFiltering {
+  private static final Log LOG = LogFactory.getLog(TestHLogFiltering.class);
+
+  private static final int NUM_MASTERS = 1;
+  private static final int NUM_RS = 4;
+
+  private static final byte[] TABLE_NAME = Bytes.toBytes("TestHLogFiltering");
+  private static final byte[] CF1 = Bytes.toBytes("MyCF1");
+  private static final byte[] CF2 = Bytes.toBytes("MyCF2");
+  private static final byte[][] FAMILIES = { CF1, CF2 };
+
+  private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @Before
+  public void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
+    fillTable();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void fillTable() throws IOException, InterruptedException {
+    HTable table = TEST_UTIL.createTable(TABLE_NAME, FAMILIES, 3,
+        Bytes.toBytes("row0"), Bytes.toBytes("row99"), NUM_RS);
+    Random rand = new Random(19387129L);
+    for (int iStoreFile = 0; iStoreFile < 4; ++iStoreFile) {
+      for (int iRow = 0; iRow < 100; ++iRow) {
+        final byte[] row = Bytes.toBytes("row" + iRow);
+        Put put = new Put(row);
+        Delete del = new Delete(row);
+        for (int iCol = 0; iCol < 10; ++iCol) {
+          final byte[] cf = rand.nextBoolean() ? CF1 : CF2;
+          final long ts = rand.nextInt();
+          final byte[] qual = Bytes.toBytes("col" + iCol);
+          if (rand.nextBoolean()) {
+            final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
+                "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
+                ts + "_random_" + rand.nextLong());
+            put.add(cf, qual, ts, value);
+          } else if (rand.nextDouble() < 0.8) {
+            del.deleteColumn(cf, qual, ts);
+          } else {
+            del.deleteColumns(cf, qual, ts);
+          }
+        }
+        table.put(put);
+        table.delete(del);
+        table.flushCommits();
+      }
+    }
+    TEST_UTIL.waitUntilAllRegionsAssigned(NUM_RS);
+  }
+
+  @Test
+  public void testFlushedSequenceIdsSentToHMaster()
+  throws IOException, InterruptedException, ServiceException {
+    SortedMap<byte[], Long> allFlushedSequenceIds =
+        new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+    for (int i = 0; i < NUM_RS; ++i) {
+      flushAllRegions(i);
+    }
+    Thread.sleep(10000);
+    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+    for (int i = 0; i < NUM_RS; ++i) {
+      for (byte[] regionName : getRegionsByServer(i)) {
+        if (allFlushedSequenceIds.containsKey(regionName)) {
+          GetLastFlushedSequenceIdRequest req =
+            RequestConverter.buildGetLastFlushedSequenceIdRequest(regionName);
+
+          assertEquals((long)allFlushedSequenceIds.get(regionName),
+              master.getLastFlushedSequenceId(null, req).getLastFlushedSequenceId());
+        }
+      }
+    }
+  }
+
+  private List<byte[]> getRegionsByServer(int rsId) throws IOException {
+    List<byte[]> regionNames = Lists.newArrayList();
+    HRegionServer hrs = getRegionServer(rsId);
+    for (HRegion r : hrs.getOnlineRegions(TABLE_NAME)) {
+      regionNames.add(r.getRegionName());
+    }
+    return regionNames;
+  }
+
+  private HRegionServer getRegionServer(int rsId) {
+    return TEST_UTIL.getMiniHBaseCluster().getRegionServer(rsId);
+  }
+
+  private void flushAllRegions(int rsId)
+  throws IOException {
+    HRegionServer hrs = getRegionServer(rsId);
+    for (byte[] regionName : getRegionsByServer(rsId)) {
+      hrs.flushRegion(regionName);
+    }
+  }
+
+  @org.junit.Rule
+  public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+    new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1378631&r1=1378630&r2=1378631&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Wed Aug 29 16:23:32 2012
@@ -666,13 +666,12 @@ public class TestHLogSplit {
     fs.initialize(fs.getUri(), conf);
     // Set up a splitter that will throw an IOE on the output side
     HLogSplitter logSplitter = new HLogSplitter(
-        conf, hbaseDir, hlogDir, oldLogDir, fs) {
+        conf, hbaseDir, hlogDir, oldLogDir, fs, null) {
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
       throws IOException {
         HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
         Mockito.doThrow(new IOException("Injected")).when(mockWriter).append(Mockito.<HLog.Entry>any());
         return mockWriter;
-
       }
     };
     try {
@@ -699,7 +698,7 @@ public class TestHLogSplit {
         when(spiedFs).append(Mockito.<Path>any());
 
     HLogSplitter logSplitter = new HLogSplitter(
-        conf, hbaseDir, hlogDir, oldLogDir, spiedFs);
+        conf, hbaseDir, hlogDir, oldLogDir, spiedFs, null);
 
     try {
       logSplitter.splitLog();
@@ -757,7 +756,7 @@ public class TestHLogSplit {
 
     // Create a splitter that reads and writes the data without touching disk
     HLogSplitter logSplitter = new HLogSplitter(
-        localConf, hbaseDir, hlogDir, oldLogDir, fs) {
+        localConf, hbaseDir, hlogDir, oldLogDir, fs, null) {
 
       /* Produce a mock writer that doesn't write anywhere */
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
@@ -1016,7 +1015,7 @@ public class TestHLogSplit {
     generateHLogs(1, 10, -1);
     FileStatus logfile = fs.listStatus(hlogDir)[0];
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter, null);
     HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
         .toString(), conf);
 
@@ -1045,7 +1044,7 @@ public class TestHLogSplit {
     LOG.info("Region directory is" + regiondir);
     fs.delete(regiondir, true);
     
-    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter, null);
     HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
         .toString(), conf);
     
@@ -1063,7 +1062,7 @@ public class TestHLogSplit {
 
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter, null);
     HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
         .toString(), conf);
     Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME);
@@ -1079,7 +1078,7 @@ public class TestHLogSplit {
     FileStatus logfile = fs.listStatus(hlogDir)[0];
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter, null);
     HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
         .toString(), conf);
     for (String region : regions) {
@@ -1099,7 +1098,7 @@ public class TestHLogSplit {
         Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
 
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+    HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter, null);
     HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
         .toString(), conf);
 
@@ -1123,7 +1122,7 @@ public class TestHLogSplit {
     generateHLogs(-1);
 
     HLogSplitter logSplitter = new HLogSplitter(
-        conf, hbaseDir, hlogDir, oldLogDir, fs) {
+        conf, hbaseDir, hlogDir, oldLogDir, fs, null) {
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
       throws IOException {
         HLog.Writer writer = HLog.createWriter(fs, logfile, conf);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1378631&r1=1378630&r2=1378631&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Wed Aug 29 16:23:32 2012
@@ -664,7 +664,7 @@ public class TestWALReplay {
     wal.close();
     FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
     HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf,
-        null);
+        null, null);
     FileStatus[] listStatus1 = this.fs.listStatus(new Path(hbaseRootDir + "/"
         + tableNameStr + "/" + hri.getEncodedName() + "/recovered.edits"));
     int editCount = 0;