You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/11/14 20:18:24 UTC

svn commit: r1542033 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/regionserver/

Author: liyin
Date: Thu Nov 14 19:18:23 2013
New Revision: 1542033

URL: http://svn.apache.org/r1542033
Log:
[0.89-fb] [HBASE-9967] delete store files when a flush aborts

Author: pervyshev

Test Plan: unit test

Reviewers: aaiyer, liyintang, manukranthk, rshroff

Reviewed By: aaiyer

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D1049445

Task ID: 3111537

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1542033&r1=1542032&r2=1542033&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Nov 14 19:18:23 2013
@@ -177,10 +177,10 @@ public class HRegion implements HeapSize
   // private byte [] name = null;
 
   protected final AtomicLong memstoreSize = new AtomicLong(0);
-  
+
   // The number of rows are read
   protected final AtomicInteger rowReadCnt = new AtomicInteger(0);
-  
+
   // The number of rows are updated
   protected final AtomicInteger rowUpdateCnt = new AtomicInteger(0);
 
@@ -1576,6 +1576,9 @@ public class HRegion implements HeapSize
       status.abort("Flush failed: " + StringUtils.stringifyException(ioe));
       // The caller can recover from this IOException. No harm done if
       // memstore flush fails.
+      for (StoreFlusher flusher : storeFlushers) {
+        flusher.cancel();
+      }
       throw ioe;
     }
 
@@ -1917,7 +1920,7 @@ public class HRegion implements HeapSize
   public void delete(Map<byte[], List<KeyValue>> familyMap, boolean writeToWAL)
   throws IOException {
     long now = EnvironmentEdgeManager.currentTimeMillis();
-    
+
     byte [] byteNow = Bytes.toBytes(now);
     boolean flush = false;
 
@@ -2484,9 +2487,9 @@ public class HRegion implements HeapSize
                  long seqNum) {
     // Increment the rowUpdatedCnt
     this.rowUpdateCnt.incrementAndGet();
-    
+
     long start = EnvironmentEdgeManager.currentTimeMillis();
-    
+
     MultiVersionConsistencyControl.WriteEntry w = null;
     long size = 0;
     try {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1542033&r1=1542032&r2=1542033&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Nov 14 19:18:23 2013
@@ -722,12 +722,13 @@ public class Store extends SchemaConfigu
   private StoreFile flushCache(final long logCacheFlushId,
       SortedSet<KeyValue> snapshot,
       TimeRangeTracker snapshotTimeRangeTracker,
-      MonitoredTask status) throws IOException {
+      MonitoredTask status,
+      CacheFlushInfo info) throws IOException {
     // If an exception happens flushing, we let it out without clearing
     // the memstore snapshot.  The old snapshot will be returned when we say
     // 'snapshot', the next time flush comes around.
     return internalFlushCache(snapshot, logCacheFlushId,
-        snapshotTimeRangeTracker, status);
+        snapshotTimeRangeTracker, status, info);
   }
 
   /*
@@ -741,7 +742,8 @@ public class Store extends SchemaConfigu
   private StoreFile internalFlushCache(final SortedSet<KeyValue> snapshot,
       final long logCacheFlushId,
       TimeRangeTracker snapshotTimeRangeTracker,
-      MonitoredTask status) throws IOException {
+      MonitoredTask status,
+      CacheFlushInfo info) throws IOException {
     StoreFile.Writer writer;
     // Find the smallest read point across all the Scanners.
     long smallestReadPoint = region.getSmallestReadPoint();
@@ -770,6 +772,7 @@ public class Store extends SchemaConfigu
         status.setStatus("Flushing " + this + ": creating writer");
         // A. Write the map out to the disk
         writer = createWriterInTmp(snapshot.size(), this.compression, false);
+        info.tmpPath = writer.getPath();
         writer.setTimeRangeTracker(snapshotTimeRangeTracker);
         fileName = writer.getPath().getName();
         try {
@@ -801,6 +804,7 @@ public class Store extends SchemaConfigu
           writer.appendMetadata(EnvironmentEdgeManager.currentTimeMillis(), logCacheFlushId, false);
           status.setStatus("Flushing " + this + ": closing flushed file");
           writer.close();
+          InjectionHandler.processEventIO(InjectionEvent.STOREFILE_AFTER_WRITE_CLOSE, writer.getPath());
         }
       }
     } finally {
@@ -814,28 +818,58 @@ public class Store extends SchemaConfigu
     // Write-out finished successfully, move into the right spot
     LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath);
     fs.rename(writer.getPath(), dstPath);
+    InjectionHandler.processEventIO(InjectionEvent.STOREFILE_AFTER_RENAME, writer.getPath(), dstPath);
+    info.dstPath = dstPath;
 
     StoreFile sf = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf,
         this.family.getBloomFilterType(), this.dataBlockEncoder);
     passSchemaMetricsTo(sf);
 
     StoreFile.Reader r = sf.createReader();
-    this.storeSize += r.length();
+    info.sequenceId = logCacheFlushId;
+    info.entries = r.getEntries();
+    info.memSize = flushed;
+    info.fileSize = r.length();
+    return sf;
+  }
+
+  private boolean commit(StoreFile storeFile, SortedSet<KeyValue> snapshot, CacheFlushInfo info)
+    throws IOException {
+    this.storeSize += info.fileSize;
     // This increments the metrics associated with total flushed bytes for this
     // family. The overall flush count is stored in the static metrics and
     // retrieved from HRegion.recentFlushes, which is set within
     // HRegion.internalFlushcache, which indirectly calls this to actually do
     // the flushing through the StoreFlusherImpl class
     getSchemaMetrics().updatePersistentStoreMetric(
-        SchemaMetrics.StoreMetricType.FLUSH_SIZE, flushed);
+        SchemaMetrics.StoreMetricType.FLUSH_SIZE, info.memSize);
     if (LOG.isInfoEnabled()) {
-      LOG.info("Added " + sf + ", entries=" + r.getEntries() +
-        ", sequenceid=" + logCacheFlushId +
-        ", memsize=" + StringUtils.humanReadableInt(flushed) +
-        ", filesize=" + StringUtils.humanReadableInt(r.length()) +
+      LOG.info("Added " + storeFile + ", entries=" + info.entries +
+        ", sequenceid=" + info.sequenceId +
+        ", memsize=" + StringUtils.humanReadableInt(info.memSize) +
+        ", filesize=" + StringUtils.humanReadableInt(info.fileSize) +
         " to " + this.region.regionInfo.getRegionNameAsString());
     }
-    return sf;
+    // Add new file to store files.  Clear snapshot too while we have
+    // the Store write lock.
+    return updateStorefiles(storeFile, snapshot);
+  }
+
+  private void cancel(CacheFlushInfo info) {
+    if (info.tmpPath != null) {
+      try {
+        fs.delete(info.tmpPath, false);
+      } catch (IOException e) {
+        // that's ok
+      }
+    }
+    if (info.dstPath != null) {
+      try {
+        fs.delete(info.dstPath, false);
+      } catch (IOException e) {
+        // that's ok
+      }
+    }
   }
 
   /*
@@ -1949,6 +1983,16 @@ public class Store extends SchemaConfigu
     return new StoreFlusherImpl(cacheFlushId);
   }
 
+  private class CacheFlushInfo {
+    Path tmpPath;
+    Path dstPath;
+
+    long entries;
+    long sequenceId;
+    long memSize;
+    long fileSize;
+  }
+
   private class StoreFlusherImpl implements StoreFlusher {
 
     private long cacheFlushId;
@@ -1956,6 +2000,8 @@ public class Store extends SchemaConfigu
     private StoreFile storeFile;
     private TimeRangeTracker snapshotTimeRangeTracker;
 
+    CacheFlushInfo cacheFlushInfo;
+
     private StoreFlusherImpl(long cacheFlushId) {
       this.cacheFlushId = cacheFlushId;
     }
@@ -1965,12 +2011,13 @@ public class Store extends SchemaConfigu
       memstore.snapshot();
       this.snapshot = memstore.getSnapshot();
       this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
+      this.cacheFlushInfo = new CacheFlushInfo();
     }
 
     @Override
     public void flushCache(MonitoredTask status) throws IOException {
       storeFile = Store.this.flushCache(cacheFlushId, snapshot,
-          snapshotTimeRangeTracker, status);
+          snapshotTimeRangeTracker, status, cacheFlushInfo);
     }
 
     @Override
@@ -1978,9 +2025,12 @@ public class Store extends SchemaConfigu
       if (storeFile == null) {
         return false;
       }
-      // Add new file to store files.  Clear snapshot too while we have
-      // the Store write lock.
-      return Store.this.updateStorefiles(storeFile, snapshot);
+      return Store.this.commit(storeFile, snapshot, cacheFlushInfo);
+    }
+
+    @Override
+    public void cancel() {
+      Store.this.cancel(cacheFlushInfo);
     }
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=1542033&r1=1542032&r2=1542033&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Thu Nov 14 19:18:23 2013
@@ -63,4 +63,9 @@ interface StoreFlusher {
    */
   boolean commit() throws IOException;
 
+  /**
+   * Cancel the flush - remove files from file system
+   */
+  void cancel();
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java?rev=1542033&r1=1542032&r2=1542033&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java Thu Nov 14 19:18:23 2013
@@ -39,5 +39,7 @@ public enum InjectionEvent {
 
   // Injection into Store.java
   READONLYSTORE_COMPACTION_WHILE_SNAPSHOTTING,
-  STORESCANNER_COMPACTION_RACE
+  STORESCANNER_COMPACTION_RACE,
+  STOREFILE_AFTER_WRITE_CLOSE,
+  STOREFILE_AFTER_RENAME
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1542033&r1=1542032&r2=1542033&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Thu Nov 14 19:18:23 2013
@@ -23,8 +23,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -66,6 +68,8 @@ import org.apache.hadoop.hbase.util.Envi
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
@@ -1468,7 +1472,7 @@ public class TestHRegion extends HBaseTe
     // Setting up region
     String method = this.getName();
     byte[] tableName = Bytes.toBytes("testtableNextRows");
-    byte[][] rows = {Bytes.toBytes("row1"), Bytes.toBytes("row2"), 
+    byte[][] rows = {Bytes.toBytes("row1"), Bytes.toBytes("row2"),
                     Bytes.toBytes("rows3")};
     byte[][] families = { Bytes.toBytes("fam1"), Bytes.toBytes("fam2"),
         Bytes.toBytes("fam3"), Bytes.toBytes("fam4") };
@@ -1478,8 +1482,8 @@ public class TestHRegion extends HBaseTe
     List<KeyValue> expected = new ArrayList<KeyValue>();
     fillTable(rows, families, 2, expected);
     /**
-     * in this case we know kv size = 28 
-     * KLEN VLEN ROWLEN ROWNAME CFLEN CFNAME TS TYPE 
+     * in this case we know kv size = 28
+     * KLEN VLEN ROWLEN ROWNAME CFLEN CFNAME TS TYPE
      * --4-|--4-|--2---|---4---|--1--|--4---|-8-|--1-- ===> 28 bytes
      */
     Scan scan = new Scan();
@@ -1488,23 +1492,23 @@ public class TestHRegion extends HBaseTe
     scan.addFamily(families[3]);
 
     // fetch one kv even when responseSize = 0, oh well, this's the semantic
-    // that users should be aware of  
+    // that users should be aware of
     compareNextRows(scan, 0, true, Integer.MAX_VALUE, expected.subList(0, 1));
     // fetch the last kv pair if the responseSize is not big enough
     compareNextRows(scan, 1, true, Integer.MAX_VALUE, expected.subList(0, 1));
-    // maxResponseSize perfectly fits one kv 
+    // maxResponseSize perfectly fits one kv
     compareNextRows(scan, 28, true, Integer.MAX_VALUE, expected.subList(0, 1));
 
-    // if partialRow == true, fetch as much as  maxResponseSize allows 
+    // if partialRow == true, fetch as much as  maxResponseSize allows
     compareNextRows(scan, 29, true, Integer.MAX_VALUE, expected.subList(0, 2));
-    // if partialRow == false, fetch the entire row  
+    // if partialRow == false, fetch the entire row
     compareNextRows(scan, 29, false, Integer.MAX_VALUE, expected.subList(0, 6));
-    
+
     // fetch everything in the table as long as responseSize is big enough
     compareNextRows(scan, 10000, true, Integer.MAX_VALUE, expected);
     compareNextRows(scan, 10000, false, Integer.MAX_VALUE, expected);
-   
-    // check nbRows 
+
+    // check nbRows
     // fetch two rows, each has two columns and each column has 3 kvs
     compareNextRows(scan, 10000, true, 2, expected.subList(0, 12));
     compareNextRows(scan, 10000, false, 2, expected.subList(0, 12));
@@ -2977,6 +2981,72 @@ public class TestHRegion extends HBaseTe
     assertTrue(keyValues.length == 0);
   }
 
+  public void testRemoveStoreFilesOnWriteFailure()
+    throws IOException {
+    byte[] table = Bytes.toBytes("table");
+    byte[][] families = new byte[][] {
+        Bytes.toBytes("family1"),
+        Bytes.toBytes("family2"),
+        Bytes.toBytes("family3")
+    };
+    initHRegion(table, getName(), families);
+
+    Put put = new Put(Bytes.toBytes("row"));
+    put.add(families[0], null, null);
+    put.add(families[1], null, null);
+    put.add(families[2], null, null);
+    region.put(put);
+
+    class InjectionHandlerImpl extends InjectionHandler {
+
+      private Set<Path> paths = new HashSet<Path>();
+
+      private int writeCount = 0;
+
+      protected void _processEventIO(InjectionEvent event, Object... args)
+        throws IOException {
+        switch (event) {
+          case STOREFILE_AFTER_WRITE_CLOSE:
+          {
+            paths.add((Path) args[0]);
+            if (++writeCount == 2) {
+              throw new IOException();
+            }
+            break;
+          }
+          case STOREFILE_AFTER_RENAME:
+          {
+            paths.add((Path) args[1]);
+            break;
+          }
+        }
+      }
+
+      public void validate()
+        throws IOException {
+        for (Path path : paths) {
+          assertFalse("file should not exist: " + path, region.fs.exists(path));
+        }
+      }
+
+    }
+
+    InjectionHandlerImpl ih = new InjectionHandlerImpl();
+    InjectionHandler.set(ih);
+
+    try {
+      region.flushcache();
+      fail();
+    }
+    catch (IOException e) {
+      // that's expected
+    }
+
+    ih.validate();
+
+    InjectionHandler.clear();
+  }
+
   private void putData(int startRow, int numRows, byte [] qf,
       byte [] ...families)
   throws IOException {