You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/10/23 01:43:54 UTC
svn commit: r1534854 -
/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
Author: stack
Date: Tue Oct 22 23:43:54 2013
New Revision: 1534854
URL: http://svn.apache.org/r1534854
Log:
HBASE-9737 Corrupt HFile cause resource leak leading to Region Server OOM
Modified:
hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1534854&r1=1534853&r2=1534854&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Tue Oct 22 23:43:54 2013
@@ -193,18 +193,18 @@ public class HFile {
static final AtomicLong checksumFailures = new AtomicLong();
// For getting more detailed stats on FS latencies
- // If, for some reason, the metrics subsystem stops polling for latencies,
+ // If, for some reason, the metrics subsystem stops polling for latencies,
// I don't want data to pile up in a memory leak
// so, after LATENCY_BUFFER_SIZE items have been enqueued for processing,
// fs latency stats will be dropped (and this behavior will be logged)
private static final int LATENCY_BUFFER_SIZE = 5000;
- private static final BlockingQueue<Long> fsReadLatenciesNanos =
+ private static final BlockingQueue<Long> fsReadLatenciesNanos =
new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
- private static final BlockingQueue<Long> fsWriteLatenciesNanos =
+ private static final BlockingQueue<Long> fsWriteLatenciesNanos =
new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
- private static final BlockingQueue<Long> fsPreadLatenciesNanos =
+ private static final BlockingQueue<Long> fsPreadLatenciesNanos =
new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
-
+
public static final void offerReadLatency(long latencyNanos, boolean pread) {
if (pread) {
fsPreadLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full
@@ -216,30 +216,30 @@ public class HFile {
readOps.incrementAndGet();
}
}
-
+
public static final void offerWriteLatency(long latencyNanos) {
fsWriteLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full
-
+
writeTimeNano.addAndGet(latencyNanos);
writeOps.incrementAndGet();
}
-
+
public static final Collection<Long> getReadLatenciesNanos() {
- final List<Long> latencies =
+ final List<Long> latencies =
Lists.newArrayListWithCapacity(fsReadLatenciesNanos.size());
fsReadLatenciesNanos.drainTo(latencies);
return latencies;
}
public static final Collection<Long> getPreadLatenciesNanos() {
- final List<Long> latencies =
+ final List<Long> latencies =
Lists.newArrayListWithCapacity(fsPreadLatenciesNanos.size());
fsPreadLatenciesNanos.drainTo(latencies);
return latencies;
}
-
+
public static final Collection<Long> getWriteLatenciesNanos() {
- final List<Long> latencies =
+ final List<Long> latencies =
Lists.newArrayListWithCapacity(fsWriteLatenciesNanos.size());
fsWriteLatenciesNanos.drainTo(latencies);
return latencies;
@@ -568,15 +568,20 @@ public class HFile {
boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
assert !isHBaseChecksum; // Initially we must read with FS checksum.
trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
- } catch (IllegalArgumentException iae) {
- throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, iae);
- }
- switch (trailer.getMajorVersion()) {
- case 2:
- return new HFileReaderV2(
+ switch (trailer.getMajorVersion()) {
+ case 2:
+ return new HFileReaderV2(
path, trailer, fsdis, size, cacheConf, preferredEncodingInCache, hfs);
- default:
- throw new CorruptHFileException("Invalid HFile version " + trailer.getMajorVersion());
+ default:
+ throw new CorruptHFileException("Invalid HFile version " + trailer.getMajorVersion());
+ }
+ } catch (Throwable t) {
+ try {
+ fsdis.close();
+ } catch (Throwable t2) {
+ LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2);
+ }
+ throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t);
}
}
@@ -823,7 +828,7 @@ public class HFile {
/** Now parse the old Writable format. It was a list of Map entries. Each map entry was a key and a value of
* a byte []. The old map format had a byte before each entry that held a code which was short for the key or
* value type. We know it was a byte [] so in below we just read and dump it.
- * @throws IOException
+ * @throws IOException
*/
void parseWritable(final DataInputStream in) throws IOException {
// First clear the map. Otherwise we will just accumulate entries every time this method is called.