You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/03/28 19:18:40 UTC
svn commit: r1462244 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
test/java/org/apache/hadoop/hbase/io/hfile/TestCompression.java
Author: liyin
Date: Thu Mar 28 18:18:40 2013
New Revision: 1462244
URL: http://svn.apache.org/r1462244
Log:
[HBASE-8206] Thread safety issues with Compression.Algorithm.GZ
Author: manukranthk
Summary: The Compression.Algorithm was not completely thread safe. Hence making it thread safe. Trying to fix this the same way it was fixed in HBase trunk(HBASE-5458).
Test Plan: Tested by running ClientSideScan(which opens store files in parallel to perform a client side merge) on the of the region servers on MORSE002-FRC3 cluster. Will try to write a test case which simulates exactly the same way. Also let me know if there is a better way to test this out.
Reviewers: aaiyer, liyintang, nspiegelberg, kannan
Reviewed By: liyintang
CC: hbase-eng@, sdong
Differential Revision: https://phabricator.fb.com/D752641
Task ID: 2225566
Added:
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCompression.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java?rev=1462244&r1=1462243&r2=1462244&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java Thu Mar 28 18:18:40 2013
@@ -22,11 +22,15 @@ import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
@@ -78,8 +82,11 @@ public final class Compression {
public static enum Algorithm {
LZO("lzo") {
// Use base type to avoid compile-time dependencies.
- private transient CompressionCodec lzoCodec;
-
+ private volatile transient CompressionCodec lzoCodec;
+ @Override
+ protected void deleteCodec() {
+ lzoCodec = null;
+ }
@Override
CompressionCodec getCodec(Configuration conf) {
if (lzoCodec == null) {
@@ -95,24 +102,36 @@ public final class Compression {
}
},
GZ("gz") {
- private transient GzipCodec codec;
+ private volatile transient GzipCodec codec;
+ private transient Object lock = new Object();
@Override
DefaultCodec getCodec(Configuration conf) {
if (codec == null) {
- codec = new ReusableStreamGzipCodec();
- codec.setConf(new Configuration(conf));
-
+ synchronized (lock) {
+ if (codec == null) {
+ GzipCodec tmpCodec = new ReusableStreamGzipCodec();
+ tmpCodec.setConf(new Configuration(conf));
+ codec = tmpCodec;
+ }
+ }
}
-
return codec;
}
+ @Override
+ protected void deleteCodec() {
+ codec = null;
+ }
},
NONE("none") {
@Override
DefaultCodec getCodec(Configuration conf) {
return null;
}
+ @Override
+ protected void deleteCodec() {
+ return;
+ }
@Override
public synchronized InputStream createDecompressionStream(
@@ -141,28 +160,39 @@ public final class Compression {
}
},
SNAPPY("snappy") {
- private transient CompressionCodec snappyCodec;
+ private volatile transient CompressionCodec snappyCodec;
+ private transient Object lock = new Object();
+ @Override
+ protected void deleteCodec() {
+ snappyCodec = null;
+ }
@SuppressWarnings("unchecked")
@Override
CompressionCodec getCodec(Configuration conf) {
if (snappyCodec == null) {
- try {
- Class<? extends CompressionCodec> snappyCodecClass =
- (Class<? extends CompressionCodec>)
- Class.forName(CompressionCodec.class.getPackage().getName() + ".SnappyCodec");
- snappyCodec = snappyCodecClass.newInstance();
- } catch (InstantiationException e) {
- LOG.error(e);
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- LOG.error(e);
- throw new RuntimeException(e);
- } catch (ClassNotFoundException e) {
- LOG.error(e);
- throw new RuntimeException(e);
+ synchronized (lock) {
+ if (snappyCodec == null) {
+ CompressionCodec tmpCodec;
+ try {
+ Class<? extends CompressionCodec> snappyCodecClass =
+ (Class<? extends CompressionCodec>)
+ Class.forName(CompressionCodec.class.getPackage().getName() + ".SnappyCodec");
+ tmpCodec = snappyCodecClass.newInstance();
+ } catch (InstantiationException e) {
+ LOG.error(e);
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ LOG.error(e);
+ throw new RuntimeException(e);
+ } catch (ClassNotFoundException e) {
+ LOG.error(e);
+ throw new RuntimeException(e);
+ }
+ ((Configurable) tmpCodec).setConf(new Configuration(conf));
+ snappyCodec = tmpCodec;
+ }
}
- ((Configurable) snappyCodec).setConf(new Configuration(conf));
}
return (CompressionCodec) snappyCodec;
}
@@ -170,9 +200,9 @@ public final class Compression {
private final Configuration conf;
private final String compressName;
- // data input buffer size to absorb small reads from application.
+ // data input buffer size to absorb small reads from application.
private static final int DATA_IBUF_SIZE = 1 * 1024;
- // data output buffer size to absorb small writes from application.
+ // data output buffer size to absorb small writes from application.
private static final int DATA_OBUF_SIZE = 4 * 1024;
Algorithm(String name) {
@@ -284,6 +314,8 @@ public final class Compression {
public String getName() {
return compressName;
}
+
+ protected abstract void deleteCodec();
}
public static Algorithm getCompressionAlgorithmByName(String compressName) {
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCompression.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCompression.java?rev=1462244&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCompression.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCompression.java Thu Mar 28 18:18:40 2013
@@ -0,0 +1,162 @@
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.TestHFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+
+public class TestCompression extends HBaseTestCase {
+ static final Log LOG = LogFactory.getLog(TestHFile.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static String ROOT_DIR =
+ TEST_UTIL.getTestDir("TestHFile").toString();
+ private final int minBlockSize = 512;
+ private static String localFormatter = "%010d";
+ private static CacheConfig cacheConf = null;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ private void writeRecords(Writer writer) throws IOException {
+ writeSomeRecords(writer, 0, 100);
+ writer.close();
+ }
+
+ // write some records into the tfile
+ // write them twice
+ private int writeSomeRecords(Writer writer, int start, int n)
+ throws IOException {
+ String value = "value";
+ for (int i = start; i < (start + n); i++) {
+ String key = String.format(localFormatter, Integer.valueOf(i));
+ writer.append(Bytes.toBytes(key), Bytes.toBytes(value + key));
+ }
+ return (start + n);
+ }
+
+ public void testCompressionAlgorithmThreadSafety() throws IOException{
+ // Create some HFiles
+ fs.mkdirs(new Path(ROOT_DIR));
+ int numFiles = 100;
+ String codec = "gz";
+ if (cacheConf == null) cacheConf = new CacheConfig(conf);
+ List<Path> createdHFiles = new ArrayList<Path>();
+ for (int i=0; i<numFiles; i++) {
+ Path p = StoreFile.getUniqueFile(fs, new Path(ROOT_DIR));
+ createdHFiles.add(p);
+ FSDataOutputStream fout = createFSOutput(p);
+ conf.setInt(HFile.FORMAT_VERSION_KEY, 2);
+ Writer writer = HFile.getWriterFactory(conf, cacheConf)
+ .withOutputStream(fout)
+ .withBlockSize(minBlockSize)
+ .withCompression(codec)
+ .create();
+ LOG.info(writer);
+ writeRecords(writer);
+ fout.close();
+ }
+ LOG.debug("Clearing the codecs");
+ for (Algorithm a : Algorithm.values()) {
+ a.deleteCodec();
+ }
+ try {
+ List<Reader> lst = loadHFileReaders(createdHFiles);
+ } catch (IOException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ // Unsafe execution of the threads might cause RuntimeException.
+ assertTrue(false);
+ }
+ }
+
+ private FSDataOutputStream createFSOutput(Path name) throws IOException {
+ if (fs.exists(name)) fs.delete(name, true);
+ FSDataOutputStream fout = fs.create(name);
+ return fout;
+ }
+
+ public static ThreadPoolExecutor getReaderCreatorThreadPool(int maxThreads,
+ final String threadNamePrefix) {
+ ThreadPoolExecutor readerCreatorThreadPool = Threads
+ .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+ new ThreadFactory() {
+ private int count = 1;
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, threadNamePrefix + "-" + count++);
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ return readerCreatorThreadPool;
+ }
+
+ private List<Reader> loadHFileReaders(List<Path> files) throws IOException {
+ // initialize the thread pool for opening store files in parallel..
+ ThreadPoolExecutor ReaderCreatorThreadPool =
+ getReaderCreatorThreadPool(files.size(), "ReaderCreatorPool");
+ CompletionService<Reader> completionService =
+ new ExecutorCompletionService<Reader>(ReaderCreatorThreadPool);
+
+ int totalValidHFile = 0;
+ for (int i = 0; files != null && i < files.size(); i++) {
+ final Path p = files.get(i);
+ completionService.submit(new Callable<Reader>() {
+ public Reader call() throws IOException {
+ return HFile.createReader(fs, p, cacheConf);
+ }
+ });
+ totalValidHFile++;
+ }
+ List<Reader> ret = new ArrayList<Reader>();
+ try {
+ for (int i = 0; i < totalValidHFile; i++) {
+ Future<Reader> future = completionService.take();
+ Reader hfileReader = future.get();
+ hfileReader.loadFileInfo();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created HFileReader for " + hfileReader.getName());
+ }
+ ret.add(hfileReader);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ } finally {
+ ReaderCreatorThreadPool.shutdownNow();
+ }
+ return ret;
+ }
+}