You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/03/28 19:18:40 UTC

svn commit: r1462244 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/io/hfile/Compression.java test/java/org/apache/hadoop/hbase/io/hfile/TestCompression.java

Author: liyin
Date: Thu Mar 28 18:18:40 2013
New Revision: 1462244

URL: http://svn.apache.org/r1462244
Log:
[HBASE-8206] Thread safety issues with Compression.Algorithm.GZ

Author: manukranthk

Summary: The Compression.Algorithm was not completely thread safe. Hence making it thread safe. Trying to fix this the same way it was fixed in HBase trunk(HBASE-5458).

Test Plan: Tested by running ClientSideScan(which opens store files in parallel to perform a client side merge) on the of the region servers on MORSE002-FRC3 cluster. Will try to write a test case which simulates exactly the same way. Also let me know if there is a better way to test this out.

Reviewers: aaiyer, liyintang, nspiegelberg, kannan

Reviewed By: liyintang

CC: hbase-eng@, sdong

Differential Revision: https://phabricator.fb.com/D752641

Task ID: 2225566

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCompression.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java?rev=1462244&r1=1462243&r2=1462244&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java Thu Mar 28 18:18:40 2013
@@ -22,11 +22,15 @@ import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
@@ -78,8 +82,11 @@ public final class Compression {
   public static enum Algorithm {
     LZO("lzo") {
       // Use base type to avoid compile-time dependencies.
-      private transient CompressionCodec lzoCodec;
-
+      private volatile transient CompressionCodec lzoCodec;
+      @Override
+      protected void deleteCodec() {
+        lzoCodec = null;
+      }
       @Override
       CompressionCodec getCodec(Configuration conf) {
         if (lzoCodec == null) {
@@ -95,24 +102,36 @@ public final class Compression {
       }
     },
     GZ("gz") {
-      private transient GzipCodec codec;
+      private volatile transient GzipCodec codec;
+      private transient Object lock = new Object();
 
       @Override
       DefaultCodec getCodec(Configuration conf) {
         if (codec == null) {
-          codec = new ReusableStreamGzipCodec();
-          codec.setConf(new Configuration(conf));
-
+          synchronized (lock) {
+            if (codec == null) {
+              GzipCodec tmpCodec = new ReusableStreamGzipCodec();
+              tmpCodec.setConf(new Configuration(conf));
+              codec = tmpCodec;
+            }
+          }
         }
-
         return codec;
       }
+      @Override
+      protected void deleteCodec() {
+        codec = null;
+      }
     },
     NONE("none") {
       @Override
       DefaultCodec getCodec(Configuration conf) {
         return null;
       }
+      @Override
+      protected void deleteCodec() {
+        return;
+      }
 
       @Override
       public synchronized InputStream createDecompressionStream(
@@ -141,28 +160,39 @@ public final class Compression {
       }
     },
     SNAPPY("snappy") {
-      private transient CompressionCodec snappyCodec;
+      private volatile transient CompressionCodec snappyCodec;
+      private transient Object lock = new Object();
 
+      @Override
+      protected void deleteCodec() {
+        snappyCodec = null;
+      }
       @SuppressWarnings("unchecked")
       @Override
       CompressionCodec getCodec(Configuration conf) {
         if (snappyCodec == null) {
-          try {
-            Class<? extends CompressionCodec> snappyCodecClass = 
-                (Class<? extends CompressionCodec>) 
-                Class.forName(CompressionCodec.class.getPackage().getName() + ".SnappyCodec");
-            snappyCodec = snappyCodecClass.newInstance();
-          } catch (InstantiationException e) {
-            LOG.error(e);
-            throw new RuntimeException(e);
-          } catch (IllegalAccessException e) {
-            LOG.error(e);
-            throw new RuntimeException(e);
-          } catch (ClassNotFoundException e) {
-            LOG.error(e);
-            throw new RuntimeException(e);
+          synchronized (lock) {
+            if (snappyCodec == null) {
+              CompressionCodec tmpCodec;
+              try {
+                Class<? extends CompressionCodec> snappyCodecClass =
+                    (Class<? extends CompressionCodec>)
+                    Class.forName(CompressionCodec.class.getPackage().getName() + ".SnappyCodec");
+                tmpCodec = snappyCodecClass.newInstance();
+              } catch (InstantiationException e) {
+                LOG.error(e);
+                throw new RuntimeException(e);
+              } catch (IllegalAccessException e) {
+                LOG.error(e);
+                throw new RuntimeException(e);
+              } catch (ClassNotFoundException e) {
+                LOG.error(e);
+                throw new RuntimeException(e);
+              }
+              ((Configurable) tmpCodec).setConf(new Configuration(conf));
+              snappyCodec = tmpCodec;
+            }
           }
-          ((Configurable) snappyCodec).setConf(new Configuration(conf));
         }
         return (CompressionCodec) snappyCodec;
       }
@@ -170,9 +200,9 @@ public final class Compression {
 
     private final Configuration conf;
     private final String compressName;
-	// data input buffer size to absorb small reads from application.
+    // data input buffer size to absorb small reads from application.
     private static final int DATA_IBUF_SIZE = 1 * 1024;
-	// data output buffer size to absorb small writes from application.
+    // data output buffer size to absorb small writes from application.
     private static final int DATA_OBUF_SIZE = 4 * 1024;
 
     Algorithm(String name) {
@@ -284,6 +314,8 @@ public final class Compression {
     public String getName() {
       return compressName;
     }
+
+    protected abstract void deleteCodec();
   }
 
   public static Algorithm getCompressionAlgorithmByName(String compressName) {

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCompression.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCompression.java?rev=1462244&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCompression.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCompression.java Thu Mar 28 18:18:40 2013
@@ -0,0 +1,162 @@
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.TestHFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+
+public class TestCompression extends HBaseTestCase {
+  static final Log LOG = LogFactory.getLog(TestHFile.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static String ROOT_DIR =
+      TEST_UTIL.getTestDir("TestHFile").toString();
+  private final int minBlockSize = 512;
+  private static String localFormatter = "%010d";
+  private static CacheConfig cacheConf = null;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  private void writeRecords(Writer writer) throws IOException {
+    writeSomeRecords(writer, 0, 100);
+    writer.close();
+  }
+
+  // write some records into the tfile
+  // write them twice
+  private int writeSomeRecords(Writer writer, int start, int n)
+      throws IOException {
+    String value = "value";
+    for (int i = start; i < (start + n); i++) {
+      String key = String.format(localFormatter, Integer.valueOf(i));
+      writer.append(Bytes.toBytes(key), Bytes.toBytes(value + key));
+    }
+    return (start + n);
+  }
+
+  public void testCompressionAlgorithmThreadSafety() throws IOException{
+    // Create some HFiles
+    fs.mkdirs(new Path(ROOT_DIR));
+    int numFiles = 100;
+    String codec = "gz";
+    if (cacheConf == null) cacheConf = new CacheConfig(conf);
+    List<Path> createdHFiles = new ArrayList<Path>();
+    for (int i=0; i<numFiles; i++) {
+      Path p = StoreFile.getUniqueFile(fs, new Path(ROOT_DIR));
+      createdHFiles.add(p);
+      FSDataOutputStream fout = createFSOutput(p);
+      conf.setInt(HFile.FORMAT_VERSION_KEY, 2);
+      Writer writer = HFile.getWriterFactory(conf, cacheConf)
+          .withOutputStream(fout)
+          .withBlockSize(minBlockSize)
+          .withCompression(codec)
+          .create();
+      LOG.info(writer);
+      writeRecords(writer);
+      fout.close();
+    }
+    LOG.debug("Clearing the codecs");
+    for (Algorithm a : Algorithm.values()) {
+      a.deleteCodec();
+    }
+    try {
+      List<Reader> lst = loadHFileReaders(createdHFiles);
+    } catch (IOException e) {
+      throw e;
+    } catch (RuntimeException e) {
+      // Unsafe execution of the threads might cause RuntimeException.
+      assertTrue(false);
+    }
+  }
+
+  private FSDataOutputStream createFSOutput(Path name) throws IOException {
+    if (fs.exists(name)) fs.delete(name, true);
+    FSDataOutputStream fout = fs.create(name);
+    return fout;
+  }
+
+  public static ThreadPoolExecutor getReaderCreatorThreadPool(int maxThreads,
+      final String threadNamePrefix) {
+    ThreadPoolExecutor readerCreatorThreadPool = Threads
+        .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+            new ThreadFactory() {
+              private int count = 1;
+
+              public Thread newThread(Runnable r) {
+                Thread t = new Thread(r, threadNamePrefix + "-" + count++);
+                t.setDaemon(true);
+                return t;
+              }
+            });
+    return readerCreatorThreadPool;
+  }
+
+  private List<Reader> loadHFileReaders(List<Path> files) throws IOException {
+    // initialize the thread pool for opening store files in parallel..
+    ThreadPoolExecutor ReaderCreatorThreadPool =
+        getReaderCreatorThreadPool(files.size(), "ReaderCreatorPool");
+    CompletionService<Reader> completionService =
+      new ExecutorCompletionService<Reader>(ReaderCreatorThreadPool);
+
+    int totalValidHFile = 0;
+    for (int i = 0; files != null && i < files.size(); i++) {
+      final Path p = files.get(i);
+      completionService.submit(new Callable<Reader>() {
+        public Reader call() throws IOException {
+          return HFile.createReader(fs, p, cacheConf);
+        }
+      });
+      totalValidHFile++;
+    }
+    List<Reader> ret = new ArrayList<Reader>();
+    try {
+      for (int i = 0; i < totalValidHFile; i++) {
+        Future<Reader> future = completionService.take();
+        Reader hfileReader = future.get();
+        hfileReader.loadFileInfo();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Created HFileReader for " + hfileReader.getName());
+        }
+        ret.add(hfileReader);
+      }
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    } finally {
+      ReaderCreatorThreadPool.shutdownNow();
+    }
+    return ret;
+  }
+}