You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/05/17 23:07:51 UTC

svn commit: r1483992 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/wal/

Author: tedyu
Date: Fri May 17 21:07:51 2013
New Revision: 1483992

URL: http://svn.apache.org/r1483992
Log:
HBASE-7210 Backport HBASE-6059 to 0.94 (Ted Yu)


Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java?rev=1483992&r1=1483991&r2=1483992&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java Fri May 17 21:07:51 2013
@@ -191,7 +191,10 @@ class Compactor extends Configured {
         boolean hasMore;
         do {
           hasMore = scanner.next(kvs, compactionKVMax);
-          if (writer == null && !kvs.isEmpty()) {
+          // Create the writer even if no kv(Empty store file is also ok),
+          // because we need record the max seq id for the store file, see
+          // HBASE-6059
+          if (writer == null) {
             writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true,
                 maxMVCCReadpoint >= smallestReadPoint);
           }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1483992&r1=1483991&r2=1483992&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri May 17 21:07:51 2013
@@ -572,22 +572,13 @@ public class HRegion implements HeapSize
     cleanupTmpDir();
 
     // Load in all the HStores.
-    // Get minimum of the maxSeqId across all the store.
     //
     // Context: During replay we want to ensure that we do not lose any data. So, we
     // have to be conservative in how we replay logs. For each store, we calculate
-    // the maxSeqId up to which the store was flushed. But, since different stores
-    // could have a different maxSeqId, we choose the
-    // minimum across all the stores.
-    // This could potentially result in duplication of data for stores that are ahead
-    // of others. ColumnTrackers in the ScanQueryMatchers do the de-duplication, so we
-    // do not have to worry.
-    // TODO: If there is a store that was never flushed in a long time, we could replay
-    // a lot of data. Currently, this is not a problem because we flush all the stores at
-    // the same time. If we move to per-cf flushing, we might want to revisit this and send
-    // in a vector of maxSeqIds instead of sending in a single number, which has to be the
-    // min across all the max.
-    long minSeqId = -1;
+    // the maxSeqId up to which the store was flushed. And, skip the edits which
+    // is equal to or lower than maxSeqId for each store.
+    Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
+        Bytes.BYTES_COMPARATOR);
     long maxSeqId = -1;
     // initialized to -1 so that we pick up MemstoreTS from column families
     long maxMemstoreTS = -1;
@@ -617,9 +608,8 @@ public class HRegion implements HeapSize
 
           this.stores.put(store.getColumnFamilyName().getBytes(), store);
           long storeSeqId = store.getMaxSequenceId();
-          if (minSeqId == -1 || storeSeqId < minSeqId) {
-            minSeqId = storeSeqId;
-          }
+          maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
+              storeSeqId);
           if (maxSeqId == -1 || storeSeqId > maxSeqId) {
             maxSeqId = storeSeqId;
           }
@@ -639,7 +629,7 @@ public class HRegion implements HeapSize
     mvcc.initialize(maxMemstoreTS + 1);
     // Recover any edits if available.
     maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
-        this.regiondir, minSeqId, reporter, status));
+        this.regiondir, maxSeqIdInStores, reporter, status));
 
     status.setStatus("Cleaning up detritus from prior splits");
     // Get rid of any splits or merges that were lost in-progress.  Clean out
@@ -3094,8 +3084,8 @@ public class HRegion implements HeapSize
    * make sense in a this single region context only -- until we online.
    *
    * @param regiondir
-   * @param minSeqId Any edit found in split editlogs needs to be in excess of
-   * this minSeqId to be applied, else its skipped.
+   * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
+   * the maxSeqId for the store to be applied, else its skipped.
    * @param reporter
    * @return the sequence id of the last edit added to this region out of the
    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
@@ -3103,12 +3093,19 @@ public class HRegion implements HeapSize
    * @throws IOException
    */
   protected long replayRecoveredEditsIfAny(final Path regiondir,
-      final long minSeqId, final CancelableProgressable reporter,
-      final MonitoredTask status)
+      Map<byte[], Long> maxSeqIdInStores,
+      final CancelableProgressable reporter, final MonitoredTask status)
       throws UnsupportedEncodingException, IOException {
-    long seqid = minSeqId;
+    long minSeqIdForTheRegion = -1;
+    for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
+      if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
+        minSeqIdForTheRegion = maxSeqIdInStore;
+      }
+    }
+    long seqid = minSeqIdForTheRegion;
     NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
     if (files == null || files.isEmpty()) return seqid;
+
     for (Path edits: files) {
       if (edits == null || !this.fs.exists(edits)) {
         LOG.warn("Null or non-existent edits file: " + edits);
@@ -3119,16 +3116,16 @@ public class HRegion implements HeapSize
       long maxSeqId = Long.MAX_VALUE;
       String fileName = edits.getName();
       maxSeqId = Math.abs(Long.parseLong(fileName));
-      if (maxSeqId <= minSeqId) {
+      if (maxSeqId <= minSeqIdForTheRegion) {
         String msg = "Maximum sequenceid for this log is " + maxSeqId
-            + " and minimum sequenceid for the region is " + minSeqId
+            + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
             + ", skipped the whole file, path=" + edits;
         LOG.debug(msg);
         continue;
       }
 
       try {
-        seqid = replayRecoveredEdits(edits, seqid, reporter);
+        seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter);
       } catch (IOException e) {
         boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
         if (skipErrors) {
@@ -3145,7 +3142,7 @@ public class HRegion implements HeapSize
         this.rsAccounting.clearRegionReplayEditsSize(this.regionInfo.getRegionName());
       }
     }
-    if (seqid > minSeqId) {
+    if (seqid > minSeqIdForTheRegion) {
       // Then we added some edits to memory. Flush and cleanup split edit files.
       internalFlushcache(null, seqid, status);
     }
@@ -3162,18 +3159,17 @@ public class HRegion implements HeapSize
 
   /*
    * @param edits File of recovered edits.
-   * @param minSeqId Minimum sequenceid found in a store file.  Edits in log
-   * must be larger than this to be replayed.
+   * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in log
+   * must be larger than this to be replayed for each store.
    * @param reporter
    * @return the sequence id of the last edit added to this region out of the
    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
    * @throws IOException
    */
   private long replayRecoveredEdits(final Path edits,
-      final long minSeqId, final CancelableProgressable reporter)
+      Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
     throws IOException {
-    String msg = "Replaying edits from " + edits + "; minSequenceid=" +
-      minSeqId + "; path=" + edits;
+    String msg = "Replaying edits from " + edits;
     LOG.info(msg);
     MonitoredTask status = TaskMonitor.get().createStatus(msg);
 
@@ -3181,7 +3177,7 @@ public class HRegion implements HeapSize
     HLog.Reader reader = null;
     try {
       reader = HLog.getReader(this.fs, edits, conf);
-      long currentEditSeqId = minSeqId;
+      long currentEditSeqId = -1;
       long firstSeqIdInLog = -1;
       long skippedEdits = 0;
       long editsCount = 0;
@@ -3240,12 +3236,6 @@ public class HRegion implements HeapSize
           if (firstSeqIdInLog == -1) {
             firstSeqIdInLog = key.getLogSeqNum();
           }
-          // Now, figure if we should skip this edit.
-          if (key.getLogSeqNum() <= currentEditSeqId) {
-            skippedEdits++;
-            continue;
-          }
-          currentEditSeqId = key.getLogSeqNum();
           boolean flush = false;
           for (KeyValue kv: val.getKeyValues()) {
             // Check this edit is for me. Also, guard against writing the special
@@ -3266,6 +3256,13 @@ public class HRegion implements HeapSize
               skippedEdits++;
               continue;
             }
+            // Now, figure if we should skip this edit.
+            if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
+                .getName())) {
+              skippedEdits++;
+              continue;
+            }
+            currentEditSeqId = key.getLogSeqNum();
             // Once we are over the limit, restoreEdit will keep returning true to
             // flush -- but don't flush until we've played all the kvs that make up
             // the WALEdit.

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1483992&r1=1483991&r2=1483992&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri May 17 21:07:51 2013
@@ -1864,6 +1864,10 @@ public class Store extends SchemaConfigu
       LOG.warn("StoreFile " + f + " has a null Reader");
       return;
     }
+    if (r.getEntries() == 0) {
+      LOG.warn("StoreFile " + f + " is a empty store file");
+      return;
+    }
     // TODO: Cache these keys rather than make each time?
     byte [] fk = r.getFirstKey();
     if (fk == null) return;

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1483992&r1=1483991&r2=1483992&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Fri May 17 21:07:51 2013
@@ -268,7 +268,13 @@ public class TestHRegion extends HBaseTe
         writer.close();
       }
       MonitoredTask status = TaskMonitor.get().createStatus(method);
-      long seqId = region.replayRecoveredEditsIfAny(regiondir, minSeqId-1, null, status);
+      Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
+          Bytes.BYTES_COMPARATOR);
+      for (Store store : region.getStores().values()) {
+        maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
+            minSeqId - 1);
+      }
+      long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
       assertEquals(maxSeqId, seqId);
       Get get = new Get(row);
       Result result = region.get(get, null);
@@ -314,7 +320,13 @@ public class TestHRegion extends HBaseTe
       }
       long recoverSeqId = 1030;
       MonitoredTask status = TaskMonitor.get().createStatus(method);
-      long seqId = region.replayRecoveredEditsIfAny(regiondir, recoverSeqId-1, null, status);
+      Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
+          Bytes.BYTES_COMPARATOR);
+      for (Store store : region.getStores().values()) {
+        maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
+            recoverSeqId - 1);
+      }
+      long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
       assertEquals(maxSeqId, seqId);
       Get get = new Get(row);
       Result result = region.get(get, null);
@@ -355,7 +367,14 @@ public class TestHRegion extends HBaseTe
           recoveredEditsDir, String.format("%019d", minSeqId-1));
       FSDataOutputStream dos=  fs.create(recoveredEdits);
       dos.close();
-      long seqId = region.replayRecoveredEditsIfAny(regiondir, minSeqId, null, null);
+      
+      Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
+        Bytes.BYTES_COMPARATOR);
+      for (Store store : region.getStores().values()) {
+        maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId);
+      }
+      long seqId = region.replayRecoveredEditsIfAny(regiondir,
+          maxSeqIdInStores, null, null);
       assertEquals(minSeqId, seqId);
     } finally {
       HRegion.closeHRegion(this.region);

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1483992&r1=1483991&r2=1483992&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Fri May 17 21:07:51 2013
@@ -183,12 +183,18 @@ public class TestStore extends TestCase 
     for (int i = 1; i <= storeFileNum; i++) {
       // verify the expired store file.
       CompactionRequest cr = this.store.requestCompaction();
-      assertEquals(1, cr.getFiles().size());
-      assertTrue(cr.getFiles().get(0).getReader().getMaxTimestamp() < 
-          (System.currentTimeMillis() - this.store.scanInfo.getTtl()));
-      // Verify that the expired the store has been deleted.
+      // the first is expired normally.
+      // If not the first compaction, there is another empty store file,
+      assertEquals(Math.min(i, 2), cr.getFiles().size());
+      for (int j = 0; i < cr.getFiles().size(); j++) {
+        assertTrue(cr.getFiles().get(j).getReader().getMaxTimestamp() < (System
+            .currentTimeMillis() - this.store.scanInfo.getTtl()));
+      }
+      // Verify that the expired store file is compacted to an empty store file.
       this.store.compact(cr);
-      assertEquals(storeFileNum - i, this.store.getStorefiles().size());
+      // It is an empty store file.
+      assertEquals(0, this.store.getStorefiles().get(0).getReader()
+          .getEntries());
 
       // Let the next store file expired.
       Thread.sleep(sleepTime);

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1483992&r1=1483991&r2=1483992&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Fri May 17 21:07:51 2013
@@ -45,15 +45,24 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.Store;
@@ -91,7 +100,7 @@ public class TestWALReplay {
     conf.setBoolean("dfs.support.append", true);
     // The below config supported by 0.20-append and CDH3b2
     conf.setInt("dfs.client.block.recovery.retries", 2);
-    TEST_UTIL.startMiniDFSCluster(3);
+    TEST_UTIL.startMiniCluster(3);
     Path hbaseRootDir =
       TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
     LOG.info("hbase.rootdir=" + hbaseRootDir);
@@ -100,7 +109,7 @@ public class TestWALReplay {
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniDFSCluster();
+    TEST_UTIL.shutdownMiniCluster();
   }
 
   @Before
@@ -132,6 +141,100 @@ public class TestWALReplay {
   }
 
   /**
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
+    final byte[] tableName = Bytes
+        .toBytes("testReplayEditsAfterRegionMovedWithMultiCF");
+    byte[] family1 = Bytes.toBytes("cf1");
+    byte[] family2 = Bytes.toBytes("cf2");
+    byte[] qualifier = Bytes.toBytes("q");
+    byte[] value = Bytes.toBytes("testV");
+    byte[][] familys = { family1, family2 };
+    TEST_UTIL.createTable(tableName, familys);
+    HTable htable = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    Put put = new Put(Bytes.toBytes("r1"));
+    put.add(family1, qualifier, value);
+    htable.put(put);
+    ResultScanner resultScanner = htable.getScanner(new Scan());
+    int count = 0;
+    while (resultScanner.next() != null) {
+      count++;
+    }
+    resultScanner.close();
+    assertEquals(1, count);
+
+    MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
+    List<HRegion> regions = hbaseCluster.getRegions(tableName);
+    assertEquals(1, regions.size());
+
+    // move region to another regionserver
+    HRegion destRegion = regions.get(0);
+    int originServerNum = hbaseCluster
+        .getServerWith(destRegion.getRegionName());
+    assertTrue("Please start more than 1 regionserver", hbaseCluster
+        .getRegionServerThreads().size() > 1);
+    int destServerNum = 0;
+    while (destServerNum == originServerNum) {
+      destServerNum++;
+    }
+    HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
+    HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
+    // move region to destination regionserver
+    moveRegionAndWait(destRegion, destServer);
+
+    // delete the row
+    Delete del = new Delete(Bytes.toBytes("r1"));
+    htable.delete(del);
+    resultScanner = htable.getScanner(new Scan());
+    count = 0;
+    while (resultScanner.next() != null) {
+      count++;
+    }
+    resultScanner.close();
+    assertEquals(0, count);
+
+    // flush region and make major compaction
+    destServer.getOnlineRegion(destRegion.getRegionName()).flushcache();
+    // wait to complete major compaction
+    for (Store store : destServer.getOnlineRegion(destRegion.getRegionName())
+        .getStores().values()) {
+      store.triggerMajorCompaction();
+    }
+    destServer.getOnlineRegion(destRegion.getRegionName()).compactStores();
+
+    // move region to origin regionserver
+    moveRegionAndWait(destRegion, originServer);
+    // abort the origin regionserver
+    originServer.abort("testing");
+
+    // see what we get
+    Result result = htable.get(new Get(Bytes.toBytes("r1")));
+    if (result != null) {
+      assertTrue("Row is deleted, but we get" + result.toString(),
+          (result == null) || result.isEmpty());
+    }
+    resultScanner.close();
+  }
+
+  private void moveRegionAndWait(HRegion destRegion, HRegionServer destServer)
+      throws InterruptedException, MasterNotRunningException,
+      ZooKeeperConnectionException, IOException {
+    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+    TEST_UTIL.getHBaseAdmin().move(
+        destRegion.getRegionInfo().getEncodedNameAsBytes(),
+        Bytes.toBytes(destServer.getServerName().getServerName()));
+    while (true) {
+      ServerName serverName = master.getAssignmentManager()
+          .getRegionServerOfRegion(destRegion.getRegionInfo());
+      if (serverName != null && serverName.equals(destServer.getServerName())) break;
+      Thread.sleep(10);
+    }
+  }
+
+  /**
    * Tests for hbase-2727.
    * @throws Exception
    * @see https://issues.apache.org/jira/browse/HBASE-2727