You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/08/09 06:05:25 UTC
svn commit: r1512133 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java
Author: tedyu
Date: Fri Aug 9 04:05:24 2013
New Revision: 1512133
URL: http://svn.apache.org/r1512133
Log:
HBASE-8615 HLog Compression may fail due to Hadoop fs input stream returning partial bytes
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java?rev=1512133&r1=1512132&r2=1512133&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java Fri Aug 9 04:05:24 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.codec.Cod
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
@@ -219,7 +220,7 @@ public class WALCellCodec implements Cod
pos += elemLen;
// the rest
- in.read(backingArray, pos, length - pos);
+ IOUtils.readFully(in, backingArray, pos, length - pos);
return new KeyValue(backingArray);
}
@@ -229,7 +230,7 @@ public class WALCellCodec implements Cod
// status byte indicating that data to be read is not in dictionary.
// if this isn't in the dictionary, we need to add to the dictionary.
int length = StreamUtils.readRawVarint32(in);
- in.read(to, offset, length);
+ IOUtils.readFully(in, to, offset, length);
dict.addEntry(to, offset, length);
return length;
} else {
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java?rev=1512133&r1=1512132&r2=1512133&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java Fri Aug 9 04:05:24 2013
@@ -18,9 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -29,7 +26,7 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
@@ -44,18 +41,23 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import static org.junit.Assert.*;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
-@Category(MediumTests.class)
+@Category(LargeTests.class)
+@RunWith(Parameterized.class)
public class TestReplicationHLogReaderManager {
- private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static HBaseTestingUtility TEST_UTIL;
private static Configuration conf;
private static Path hbaseDir;
private static FileSystem fs;
@@ -70,13 +72,44 @@ public class TestReplicationHLogReaderMa
private HLog log;
private ReplicationHLogReaderManager logManager;
private PathWatcher pathWatcher;
+ private int nbRows;
+ private int walEditKVs;
+ @Parameters
+ public static Collection<Object[]> parameters() {
+ // Try out different combinations of row count and KeyValue count
+ int[] NB_ROWS = { 1500, 60000 };
+ int[] NB_KVS = { 1, 100 };
+ // whether compression is used
+ Boolean[] BOOL_VALS = { false, true };
+ List<Object[]> parameters = new ArrayList<Object[]>();
+ for (int nbRows : NB_ROWS) {
+ for (int walEditKVs : NB_KVS) {
+ for (boolean b : BOOL_VALS) {
+ Object[] arr = new Object[3];
+ arr[0] = nbRows;
+ arr[1] = walEditKVs;
+ arr[2] = b;
+ parameters.add(arr);
+ }
+ }
+ }
+ return parameters;
+ }
+ public TestReplicationHLogReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
+ this.nbRows = nbRows;
+ this.walEditKVs = walEditKVs;
+ TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
+ enableCompression);
+ }
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ conf = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniDFSCluster(3);
- conf = TEST_UTIL.getConfiguration();
hbaseDir = TEST_UTIL.createRootDir();
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
@@ -156,16 +189,32 @@ public class TestReplicationHLogReaderMa
fail();
} catch (EOFException ex) {}
+ for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
+ log.rollWriter();
+ logManager.openReader(path);
+ logManager.seek();
+ for (int i = 0; i < nbRows; i++) {
+ HLog.Entry e = logManager.readNextAndSetPosition(entriesArray, 0);
+ if (e == null) {
+ fail("Should have enough entries");
+ }
+ }
}
private void appendToLog() throws IOException {
- log.append(info, tableName, getWALEdit(), System.currentTimeMillis(), htd);
+ appendToLogPlus(1);
}
- private WALEdit getWALEdit() {
+ private void appendToLogPlus(int count) throws IOException {
+ log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd);
+ }
+
+ private WALEdit getWALEdits(int count) {
WALEdit edit = new WALEdit();
- edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
+ for (int i = 0; i < count; i++) {
+ edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
System.currentTimeMillis(), qualifier));
+ }
return edit;
}