You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2012/12/19 23:23:28 UTC
svn commit: r1424174 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/wal/
main/java/org/apache/hadoop/hbase/replication/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/wal/
test/java/org/apache/hadoop/hba...
Author: jdcryans
Date: Wed Dec 19 22:23:27 2012
New Revision: 1424174
URL: http://svn.apache.org/viewvc?rev=1424174&view=rev
Log:
HBASE-5778 Fix HLog compression's incompatibilities
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java Wed Dec 19 22:23:27 2012
@@ -144,7 +144,12 @@ public class Compressor {
// the status byte also acts as the higher order byte of the dictionary
// entry
short dictIdx = toShort(status, in.readByte());
- byte[] entry = dict.getEntry(dictIdx);
+ byte[] entry;
+ try {
+ entry = dict.getEntry(dictIdx);
+ } catch (Exception ex) {
+ throw new IOException("Unable to uncompress the log entry", ex);
+ }
if (entry == null) {
throw new IOException("Missing dictionary entry for index "
+ dictIdx);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Wed Dec 19 22:23:27 2012
@@ -71,6 +71,7 @@ public interface HLog {
void seek(long pos) throws IOException;
long getPosition() throws IOException;
+ void reset() throws IOException;
}
public interface Writer {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java Wed Dec 19 22:23:27 2012
@@ -62,7 +62,9 @@ public class HLogFactory {
}
/**
- * Create a reader for the WAL.
+ * Create a reader for the WAL. If you are reading from a file that's being written to
+ * and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method
+ * then just seek back to the last known good position.
* @return A WAL reader. Close when done with it.
* @throws IOException
*/
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Wed Dec 19 22:23:27 2012
@@ -140,15 +140,17 @@ public class SequenceFileLogReader imple
Configuration conf;
WALReader reader;
+ FileSystem fs;
// Needed logging exceptions
Path path;
int edit = 0;
long entryStart = 0;
+ boolean emptyCompressionContext = true;
/**
* Compression context to use reading. Can be null if no compression.
*/
- private CompressionContext compressionContext = null;
+ protected CompressionContext compressionContext = null;
protected Class<? extends HLogKey> keyClass;
@@ -174,6 +176,7 @@ public class SequenceFileLogReader imple
this.conf = conf;
this.path = path;
reader = new WALReader(fs, path, conf);
+ this.fs = fs;
// If compression is enabled, new dictionaries are created here.
boolean compression = reader.isWALCompressionEnabled();
@@ -238,11 +241,22 @@ public class SequenceFileLogReader imple
throw addFileInfoToException(ioe);
}
edit++;
+ if (compressionContext != null && emptyCompressionContext) {
+ emptyCompressionContext = false;
+ }
return b? e: null;
}
@Override
public void seek(long pos) throws IOException {
+ if (compressionContext != null && emptyCompressionContext) {
+ while (next() != null) {
+ if (getPosition() == pos) {
+ emptyCompressionContext = false;
+ break;
+ }
+ }
+ }
try {
reader.seek(pos);
} catch (IOException ioe) {
@@ -287,4 +301,11 @@ public class SequenceFileLogReader imple
return ioe;
}
+
+ @Override
+ public void reset() throws IOException {
+ // Resetting the reader lets us see newly added data if the file is being written to
+ // We also keep the same compressionContext which was previously populated for this file
+ reader = new WALReader(fs, path, conf);
+ }
}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java?rev=1424174&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java Wed Dec 19 22:23:27 2012
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+
+import java.io.IOException;
+
+/**
+ * Wrapper class around HLog to help manage the implementation details
+ * such as compression.
+ */
+@InterfaceAudience.Private
+public class ReplicationHLogReaderManager {
+
+ private static final Log LOG = LogFactory.getLog(ReplicationHLogReaderManager.class);
+ private final FileSystem fs;
+ private final Configuration conf;
+ private long position = 0;
+ private HLog.Reader reader;
+ private Path lastPath;
+
+ /**
+ * Creates the helper but doesn't open any file
+ * Use setInitialPosition after using the constructor if some content needs to be skipped
+ * @param fs
+ * @param conf
+ */
+ public ReplicationHLogReaderManager(FileSystem fs, Configuration conf) {
+ this.fs = fs;
+ this.conf = conf;
+ }
+
+ /**
+ * Opens the file at the current position
+ * @param path
+ * @return
+ * @throws IOException
+ */
+ public HLog.Reader openReader(Path path) throws IOException {
+ // Detect if this is a new file, if so get a new reader else
+ // reset the current reader so that we see the new data
+ if (this.reader == null || !this.lastPath.equals(path)) {
+ this.reader = HLogFactory.createReader(this.fs, path, this.conf);
+ this.lastPath = path;
+ } else {
+ this.reader.reset();
+ }
+ return this.reader;
+ }
+
+ /**
+ * Get the next entry, returned and also added in the array
+ * @param entriesArray
+ * @param currentNbEntries
+ * @return a new entry or null
+ * @throws IOException
+ */
+ public HLog.Entry readNextAndSetPosition(HLog.Entry[] entriesArray,
+ int currentNbEntries) throws IOException {
+ HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
+ // Store the position so that in the future the reader can start
+ // reading from here. If the above call to next() throws an
+ // exception, the position won't be changed and retry will happen
+ // from the last known good position
+ this.position = this.reader.getPosition();
+ // We need to set the CC to null else it will be compressed when sent to the sink
+ if (entry != null) {
+ entry.setCompressionContext(null);
+ }
+ return entry;
+ }
+
+ /**
+ * Advance the reader to the current position
+ * @throws IOException
+ */
+ public void seek() throws IOException {
+ if (this.position != 0) {
+ this.reader.seek(this.position);
+ }
+ }
+
+ /**
+ * Get the position that we stopped reading at
+ * @return current position, cannot be negative
+ */
+ public long getPosition() {
+ return this.position;
+ }
+
+ public void setPosition(long pos) {
+ this.position = pos;
+ }
+
+ /**
+ * Close the current reader
+ * @throws IOException
+ */
+ public void closeReader() throws IOException {
+ if (this.reader != null) {
+ this.reader.close();
+ }
+ }
+
+ /**
+ * Tell the helper to reset internal state
+ */
+ public void finishCurrentFile() {
+ this.position = 0;
+ this.reader = null;
+ }
+
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Dec 19 22:23:27 2012
@@ -107,8 +107,6 @@ public class ReplicationSource extends T
private int replicationQueueNbCapacity;
// Our reader for the current log
private HLog.Reader reader;
- // Current position in the log
- private long position = 0;
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
@@ -134,10 +132,14 @@ public class ReplicationSource extends T
private int currentNbEntries = 0;
// Current number of operations (Put/Delete) that we need to replicate
private int currentNbOperations = 0;
+ // Current size of data we need to replicate
+ private int currentSize = 0;
// Indicates if this particular source is running
private volatile boolean running = true;
// Metrics for this source
private MetricsSource metrics;
+ // Handle on the log reader helper
+ private ReplicationHLogReaderManager repLogReader;
/**
* Instantiation method used by region servers
@@ -185,7 +187,7 @@ public class ReplicationSource extends T
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs;
this.metrics = new MetricsSource(peerClusterZnode);
-
+ this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
try {
this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
} catch (KeeperException ke) {
@@ -266,8 +268,8 @@ public class ReplicationSource extends T
// normally has a position (unless the RS failed between 2 logs)
if (this.queueRecovered) {
try {
- this.position = this.zkHelper.getHLogRepPosition(
- this.peerClusterZnode, this.queue.peek().getName());
+ this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
+ this.peerClusterZnode, this.queue.peek().getName()));
} catch (KeeperException e) {
this.terminate("Couldn't get the position of this recovered queue " +
peerClusterZnode, e);
@@ -325,6 +327,7 @@ public class ReplicationSource extends T
boolean gotIOE = false;
currentNbEntries = 0;
+ currentSize = 0;
try {
if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) {
continue;
@@ -360,9 +363,7 @@ public class ReplicationSource extends T
}
} finally {
try {
- if (this.reader != null) {
- this.reader.close();
- }
+ this.repLogReader.closeReader();
} catch (IOException e) {
gotIOE = true;
LOG.warn("Unable to finalize the tailing of a file", e);
@@ -373,10 +374,11 @@ public class ReplicationSource extends T
// wait a bit and retry.
// But if we need to stop, don't bother sleeping
if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
- if (this.lastLoggedPosition != this.position) {
+ if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
- this.lastLoggedPosition = this.position;
+ this.peerClusterZnode, this.repLogReader.getPosition(),
+ queueRecovered, currentWALisBeingWrittenTo);
+ this.lastLoggedPosition = this.repLogReader.getPosition();
}
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
sleepMultiplier++;
@@ -409,11 +411,9 @@ public class ReplicationSource extends T
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
throws IOException{
long seenEntries = 0;
- if (this.position != 0) {
- this.reader.seek(this.position);
- }
- long startPosition = this.position;
- HLog.Entry entry = readNextAndSetPosition();
+ this.repLogReader.seek();
+ HLog.Entry entry =
+ this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
while (entry != null) {
WALEdit edit = entry.getEdit();
this.metrics.incrLogEditsRead();
@@ -437,18 +437,18 @@ public class ReplicationSource extends T
}
currentNbOperations += countDistinctRowKeys(edit);
currentNbEntries++;
+ currentSize += entry.getEdit().size();
} else {
this.metrics.incrLogEditsFiltered();
}
}
// Stop if too many entries or too big
- if ((this.reader.getPosition() - startPosition)
- >= this.replicationQueueSizeCapacity ||
+ if (currentSize >= this.replicationQueueSizeCapacity ||
currentNbEntries >= this.replicationQueueNbCapacity) {
break;
}
try {
- entry = readNextAndSetPosition();
+ entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
} catch (IOException ie) {
LOG.debug("Break on IOE: " + ie.getMessage());
break;
@@ -462,16 +462,6 @@ public class ReplicationSource extends T
return seenEntries == 0 && processEndOfFile();
}
- private HLog.Entry readNextAndSetPosition() throws IOException {
- HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
- // Store the position so that in the future the reader can start
- // reading from here. If the above call to next() throws an
- // exception, the position won't be changed and retry will happen
- // from the last known good position
- this.position = this.reader.getPosition();
- return entry;
- }
-
private void connectToPeers() {
// Connect to peer cluster first, unless we have to stop
while (this.isActive() && this.currentPeers.size() == 0) {
@@ -510,9 +500,7 @@ public class ReplicationSource extends T
protected boolean openReader(int sleepMultiplier) {
try {
try {
- this.reader = null;
- this.reader = HLogFactory.createReader(this.fs,
- this.currentPath, this.conf);
+ this.reader = repLogReader.openReader(this.currentPath);
} catch (FileNotFoundException fnfe) {
if (this.queueRecovered) {
// We didn't find the log in the archive directory, look if it still
@@ -648,10 +636,11 @@ public class ReplicationSource extends T
AdminProtocol rrs = getRS();
ProtobufUtil.replicateWALEntry(rrs,
Arrays.copyOf(this.entriesArray, currentNbEntries));
- if (this.lastLoggedPosition != this.position) {
+ if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
- this.lastLoggedPosition = this.position;
+ this.peerClusterZnode, this.repLogReader.getPosition(),
+ queueRecovered, currentWALisBeingWrittenTo);
+ this.lastLoggedPosition = this.repLogReader.getPosition();
}
this.totalReplicatedEdits += currentNbEntries;
this.metrics.shipBatch(this.currentNbOperations);
@@ -718,7 +707,8 @@ public class ReplicationSource extends T
protected boolean processEndOfFile() {
if (this.queue.size() != 0) {
this.currentPath = null;
- this.position = 0;
+ this.repLogReader.finishCurrentFile();
+ this.reader = null;
return true;
} else if (this.queueRecovered) {
this.manager.closeRecoveredQueue(this);
@@ -842,8 +832,14 @@ public class ReplicationSource extends T
@Override
public String getStats() {
+ String position;
+ try {
+ position = this.reader.getPosition()+"";
+ } catch (IOException ioe) {
+ position = "N/A";
+ }
return "Total replicated edits: " + totalReplicatedEdits +
", currently replicating from: " + this.currentPath +
- " at position: " + this.position;
+ " at position: " + position;
}
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java Wed Dec 19 22:23:27 2012
@@ -49,6 +49,9 @@ public class FaultySequenceFileLogReader
HLogKey key = HLogUtil.newKey(conf);
WALEdit val = new WALEdit();
HLog.Entry e = new HLog.Entry(key, val);
+ if (compressionContext != null) {
+ e.setCompressionContext(compressionContext);
+ }
b = this.reader.next(e.getKey(), e.getEdit());
nextQueue.offer(e);
numberOfFileEntries++;
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Wed Dec 19 22:23:27 2012
@@ -477,15 +477,13 @@ public class TestHLog {
throw t.exception;
// Make sure you can read all the content
- SequenceFile.Reader reader
- = new SequenceFile.Reader(this.fs, walPath, this.conf);
+ HLog.Reader reader = HLogFactory.createReader(this.fs, walPath, this.conf);
int count = 0;
- HLogKey key = HLogUtil.newKey(conf);
- WALEdit val = new WALEdit();
- while (reader.next(key, val)) {
+ HLog.Entry entry = new HLog.Entry();
+ while (reader.next(entry) != null) {
count++;
assertTrue("Should be one KeyValue per WALEdit",
- val.getKeyValues().size() == 1);
+ entry.getEdit().getKeyValues().size() == 1);
}
assertEquals(total, count);
reader.close();
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Wed Dec 19 22:23:27 2012
@@ -100,7 +100,7 @@ public class TestHLogSplit {
private Configuration conf;
private FileSystem fs;
- private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final Path HBASEDIR = new Path("/hbase");
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java?rev=1424174&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java Wed Dec 19 22:23:27 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestHLogSplitCompressed extends TestHLogSplit {
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TestHLogSplit.setUpBeforeClass();
+ TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+ }
+}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java Wed Dec 19 22:23:27 2012
@@ -60,7 +60,7 @@ public class TestReplication {
private static final Log LOG = LogFactory.getLog(TestReplication.class);
- private static Configuration conf1;
+ protected static Configuration conf1 = HBaseConfiguration.create();
private static Configuration conf2;
private static Configuration CONF_WITH_LOCALFS;
@@ -90,7 +90,6 @@ public class TestReplication {
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- conf1 = HBaseConfiguration.create();
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
// smaller block size and capacity to trigger more operations
// and test them
@@ -520,7 +519,7 @@ public class TestReplication {
// disable and start the peer
admin.disablePeer("2");
- utility2.startMiniHBaseCluster(1, 1);
+ utility2.startMiniHBaseCluster(1, 2);
Get get = new Get(rowkey);
for (int i = 0; i < NB_RETRIES; i++) {
Result res = htable2.get(get);
@@ -760,7 +759,8 @@ public class TestReplication {
int lastCount = 0;
final long start = System.currentTimeMillis();
- for (int i = 0; i < NB_RETRIES; i++) {
+ int i = 0;
+ while (true) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for queueFailover replication. " +
"Waited "+(System.currentTimeMillis() - start)+"ms.");
@@ -772,6 +772,8 @@ public class TestReplication {
if (res2.length < initialCount) {
if (lastCount < res2.length) {
i--; // Don't increment timeout if we make progress
+ } else {
+ i++;
}
lastCount = res2.length;
LOG.info("Only got " + lastCount + " rows instead of " +
@@ -791,7 +793,7 @@ public class TestReplication {
Thread.sleep(timeout);
utility.expireRegionServerSession(rs);
} catch (Exception e) {
- LOG.error(e);
+ LOG.error("Couldn't kill a region server", e);
}
}
};
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java?rev=1424174&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java Wed Dec 19 22:23:27 2012
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Run the same test as TestReplication but with HLog compression enabled
+ */
+@Category(LargeTests.class)
+public class TestReplicationWithCompression extends TestReplication {
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+ TestReplication.setUpBeforeClass();
+ }
+}