You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/09/21 23:17:52 UTC

[3/7] hbase git commit: HBASE-20704 Sometimes some compacted storefiles are not archived on region close, branch-1 backport

HBASE-20704 Sometimes some compacted storefiles are not archived on region close, branch-1 backport

Signed-off-by: Andrew Purtell <ap...@apache.org>
Amending-Author: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/465c96e2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/465c96e2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/465c96e2

Branch: refs/heads/branch-1.3
Commit: 465c96e24a77d5fde59b8e83942f783e498444d4
Parents: 508b145
Author: Francis Liu <to...@apache.org>
Authored: Tue Sep 18 19:17:20 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Sep 21 16:12:18 2018 -0700

----------------------------------------------------------------------
 .../hbase/io/FSDataInputStreamWrapper.java      |  17 +-
 .../hadoop/hbase/regionserver/HStore.java       |  49 ++++-
 .../hadoop/hbase/regionserver/StoreFile.java    |   8 +
 .../TestCleanupCompactedFileOnRegionClose.java  | 192 +++++++++++++++++++
 4 files changed, 246 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/465c96e2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index ad749f3..241bd9e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -215,18 +216,12 @@ public class FSDataInputStreamWrapper {
 
   /** Close stream(s) if necessary. */
   public void close() throws IOException {
-    if (!doCloseStreams) return;
-    try {
-      if (stream != streamNoFsChecksum && streamNoFsChecksum != null) {
-        streamNoFsChecksum.close();
-        streamNoFsChecksum = null;
-      }
-    } finally {
-      if (stream != null) {
-        stream.close();
-        stream = null;
-      }
+    if (!doCloseStreams) {
+      return;
     }
+    // we do not care about the close exception as it is for reading, no data loss issue.
+    IOUtils.closeQuietly(streamNoFsChecksum);
+    IOUtils.closeQuietly(stream);
   }
 
   public HFileSystem getHfs() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/465c96e2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 0f6536f..03dad03 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -857,7 +857,7 @@ public class HStore implements Store {
           storeEngine.getStoreFileManager().clearCompactedFiles();
       // clear the compacted files
       if (compactedfiles != null && !compactedfiles.isEmpty()) {
-        removeCompactedfiles(compactedfiles);
+        removeCompactedfiles(compactedfiles, true);
       }
       if (!result.isEmpty()) {
         // initialize the thread pool for closing store files in parallel.
@@ -2709,6 +2709,11 @@ public class HStore implements Store {
 
   @Override
   public synchronized void closeAndArchiveCompactedFiles() throws IOException {
+    closeAndArchiveCompactedFiles(false);
+  }
+
+  @VisibleForTesting
+  public synchronized void closeAndArchiveCompactedFiles(boolean storeClosing) throws IOException {
     // ensure other threads do not attempt to archive the same files on close()
     archiveLock.lock();
     try {
@@ -2730,7 +2735,7 @@ public class HStore implements Store {
         lock.readLock().unlock();
       }
       if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
-        removeCompactedfiles(copyCompactedfiles);
+        removeCompactedfiles(copyCompactedfiles, storeClosing);
       }
     } finally {
       archiveLock.unlock();
@@ -2742,20 +2747,38 @@ public class HStore implements Store {
    * @param compactedfiles The compacted files in this store that are not active in reads
    * @throws IOException
    */
-  private void removeCompactedfiles(Collection<StoreFile> compactedfiles)
+  private void removeCompactedfiles(Collection<StoreFile> compactedfiles, boolean storeClosing)
       throws IOException {
     final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
     for (final StoreFile file : compactedfiles) {
       synchronized (file) {
         try {
           StoreFile.Reader r = file.getReader();
+
+          //Compacted files in the list should always be marked compacted away. In the event
+          //they're contradicting in order to guarantee data consistency
+          //should we choose one and ignore the other?
+          if (storeClosing && r != null && !r.isCompactedAway()) {
+            String msg =
+                "Region closing but StoreFile is in compacted list but not compacted away: " +
+                    file.getPath();
+            throw new IllegalStateException(msg);
+          }
+
           if (r == null) {
             if (LOG.isDebugEnabled()) {
               LOG.debug("The file " + file + " was closed but still not archived.");
             }
             filesToRemove.add(file);
           }
-          if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
+
+          //If store is closing we're ignoring any references to keep things consistent
+          //and remove compacted storefiles from the region directory
+          if (r != null && file.isCompactedAway() && (!r.isReferencedInReads() || storeClosing)) {
+            if (storeClosing && r.isReferencedInReads()) {
+              LOG.warn("Region closing but StoreFile still has references: file=" +
+                  file.getPath() + ", refCount=" + r.getRefCount());
+            }
             // Even if deleting fails we need not bother as any new scanners won't be
             // able to use the compacted file as the status is already compactedAway
             if (LOG.isTraceEnabled()) {
@@ -2766,13 +2789,21 @@ public class HStore implements Store {
             filesToRemove.add(file);
           } else {
             LOG.info("Can't archive compacted file " + file.getPath()
-                + " because of either isCompactedAway = " + r.isCompactedAway()
-                + " or file has reference, isReferencedInReads = " + r.isReferencedInReads()
-                + ", skipping for now.");
+                + " because of either isCompactedAway=" + r.isCompactedAway()
+                + " or file has reference, isReferencedInReads=" + r.isReferencedInReads()
+                + ", refCount=" + r.getRefCount() + ", skipping for now.");
           }
         } catch (Exception e) {
-          LOG.error(
-            "Exception while trying to close the compacted store file " + file.getPath().getName());
+          String msg = "Exception while trying to close the compacted store file " +
+              file.getPath();
+          if (storeClosing) {
+            msg = "Store is closing. " + msg;
+          }
+          LOG.error(msg, e);
+          //if we get an exception let caller know so it can abort the server
+          if (storeClosing) {
+            throw new IOException(msg, e);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/465c96e2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 013573d..a7f55f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -1220,6 +1220,14 @@ public class StoreFile {
     }
 
     /**
+     * Return the ref count associated with the reader whenever a scanner associated with the
+     * reader is opened.
+     */
+    int getRefCount() {
+      return refCount.get();
+    }
+
+    /**
      * Increment the ref count associated with the reader when ever a scanner associated with the
      * reader is opened
      */

http://git-wip-us.apache.org/repos/asf/hbase/blob/465c96e2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java
new file mode 100644
index 0000000..dfc9209
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java
@@ -0,0 +1,192 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static junit.framework.TestCase.assertNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MediumTests.class})
+public class TestCleanupCompactedFileOnRegionClose {
+
+  private static HBaseTestingUtility util;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    util = new HBaseTestingUtility();
+    util.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY,100);
+    util.getConfiguration().set("dfs.blocksize", "64000");
+    util.getConfiguration().set("dfs.namenode.fs-limits.min-block-size", "1024");
+    util.getConfiguration().set(TimeToLiveHFileCleaner.TTL_CONF_KEY,"0");
+    util.startMiniCluster(2);
+  }
+
+  @AfterClass
+  public static void afterclass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testCleanupOnClose() throws Exception {
+    TableName tableName = TableName.valueOf("testCleanupOnClose");
+    String familyName = "f";
+    byte[] familyNameBytes = Bytes.toBytes(familyName);
+    util.createTable(tableName, familyName);
+
+    HBaseAdmin hBaseAdmin = util.getHBaseAdmin();
+    Table table = util.getConnection().getTable(tableName);
+
+    HRegionServer rs = util.getRSForFirstRegionInTable(tableName);
+    Region region = rs.getOnlineRegions(tableName).get(0);
+
+    int refSFCount = 4;
+    for (int i = 0; i < refSFCount; i++) {
+      for (int j = 0; j < refSFCount; j++) {
+        Put put = new Put(Bytes.toBytes(j));
+        put.addColumn(familyNameBytes, Bytes.toBytes(i), Bytes.toBytes(j));
+        table.put(put);
+      }
+      util.flush(tableName);
+    }
+    assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
+
+    //add a delete, to test wether we end up with an inconsistency post region close
+    Delete delete = new Delete(Bytes.toBytes(refSFCount-1));
+    table.delete(delete);
+    util.flush(tableName);
+    assertFalse(table.exists(new Get(Bytes.toBytes(refSFCount-1))));
+
+    //Create a scanner and keep it open to add references to StoreFileReaders
+    Scan scan = new Scan();
+    scan.setStopRow(Bytes.toBytes(refSFCount-2));
+    scan.setCaching(1);
+    ResultScanner scanner = table.getScanner(scan);
+    Result res = scanner.next();
+    assertNotNull(res);
+    assertEquals(refSFCount, res.getFamilyMap(familyNameBytes).size());
+
+
+    //Verify the references
+    int count = 0;
+    for (StoreFile sf : (Collection<StoreFile>)region.getStore(familyNameBytes).getStorefiles()) {
+      synchronized (sf) {
+        if (count < refSFCount) {
+          assertTrue(sf.getReader().isReferencedInReads());
+        } else {
+          assertFalse(sf.getReader().isReferencedInReads());
+        }
+      }
+      count++;
+    }
+
+    //Major compact to produce compacted storefiles that need to be cleaned up
+    util.compact(tableName, true);
+    assertEquals(1, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
+    assertEquals(refSFCount+1,
+      ((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager()
+          .getCompactedfiles().size());
+
+    //close then open the region to determine wether compacted storefiles get cleaned up on close
+    hBaseAdmin.unassign(region.getRegionInfo().getRegionName(), false);
+    hBaseAdmin.assign(region.getRegionInfo().getRegionName());
+    util.waitUntilNoRegionsInTransition(10000);
+
+
+    assertFalse("Deleted row should not exist",
+        table.exists(new Get(Bytes.toBytes(refSFCount-1))));
+
+    rs = util.getRSForFirstRegionInTable(tableName);
+    region = rs.getOnlineRegions(tableName).get(0);
+    assertEquals(1, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
+    assertNull(((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager()
+        .getCompactedfiles());
+  }
+
+  @Test
+  public void testIOExceptionThrownOnClose() throws Exception {
+    byte[] filler = new byte[128000];
+    TableName tableName = TableName.valueOf("testIOExceptionThrownOnClose");
+    String familyName = "f";
+    byte[] familyNameBytes = Bytes.toBytes(familyName);
+    util.createTable(tableName, familyName);
+
+    Table table = util.getConnection().getTable(tableName);
+
+    HRegionServer rs = util.getRSForFirstRegionInTable(tableName);
+    Region region = rs.getOnlineRegions(tableName).get(0);
+
+    int refSFCount = 4;
+    for (int i = 0; i < refSFCount; i++) {
+      for (int j = 0; j < refSFCount; j++) {
+        Put put = new Put(Bytes.toBytes(j));
+        put.addColumn(familyNameBytes, Bytes.toBytes(i), filler);
+        table.put(put);
+      }
+      util.flush(tableName);
+    }
+    assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
+
+    HStore store = (HStore)region.getStore(familyNameBytes);
+    StoreFile hsf = region.getStore(familyNameBytes).getStorefiles().iterator().next();
+    long readPt = region.getReadpoint(IsolationLevel.READ_COMMITTED);
+    StoreFileScanner preadScanner = hsf.getReader().getStoreFileScanner(
+        false, true, false, readPt, 0, false);
+    preadScanner.seek(KeyValue.LOWESTKEY);
+
+    //Major compact to produce compacted storefiles that need to be cleaned up
+    util.compact(tableName, true);
+    assertNotNull(preadScanner.next());
+    store.closeAndArchiveCompactedFiles(true);
+
+    try {
+      assertNotNull(preadScanner.next());
+      fail("Expected IOException");
+    }catch (IOException ex) {
+      ex.printStackTrace();
+    }
+  }
+}