You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/12/21 06:59:06 UTC
svn commit: r1424810 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/regionserver/wal/
main/java/org/apache/hadoop/hbase/replication/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/wal/
test/java/org/apache/hadoop/hbase/re...
Author: larsh
Date: Fri Dec 21 05:59:06 2012
New Revision: 1424810
URL: http://svn.apache.org/viewvc?rev=1424810&view=rev
Log:
HBASE-5778 Reapply, Test failures not caused by this. Sorry for the noise.
Added:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java Fri Dec 21 05:59:06 2012
@@ -143,7 +143,12 @@ public class Compressor {
// the status byte also acts as the higher order byte of the dictionary
// entry
short dictIdx = toShort(status, in.readByte());
- byte[] entry = dict.getEntry(dictIdx);
+ byte[] entry;
+ try {
+ entry = dict.getEntry(dictIdx);
+ } catch (Exception ex) {
+ throw new IOException("Unable to uncompress the log entry", ex);
+ }
if (entry == null) {
throw new IOException("Missing dictionary entry for index "
+ dictIdx);
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Dec 21 05:59:06 2012
@@ -167,6 +167,7 @@ public class HLog implements Syncable {
Entry next(Entry reuse) throws IOException;
void seek(long pos) throws IOException;
long getPosition() throws IOException;
+ void reset() throws IOException;
}
public interface Writer {
@@ -695,15 +696,18 @@ public class HLog implements Syncable {
/**
* Get a reader for the WAL.
+ * The proper way to tail a log that can be under construction is to first use this method
+ * to get a reader then call {@link HLog.Reader#reset()} to see the new data. It will also
+ * take care of keeping implementation-specific context (like compression).
* @param fs
* @param path
* @param conf
* @return A WAL reader. Close when done with it.
* @throws IOException
*/
- public static Reader getReader(final FileSystem fs,
- final Path path, Configuration conf)
- throws IOException {
+ public static Reader getReader(final FileSystem fs, final Path path,
+ Configuration conf)
+ throws IOException {
try {
if (logReaderClass == null) {
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Fri Dec 21 05:59:06 2012
@@ -139,15 +139,17 @@ public class SequenceFileLogReader imple
Configuration conf;
WALReader reader;
+ FileSystem fs;
// Needed logging exceptions
Path path;
int edit = 0;
long entryStart = 0;
+ boolean emptyCompressionContext = true;
/**
* Compression context to use reading. Can be null if no compression.
*/
- private CompressionContext compressionContext = null;
+ protected CompressionContext compressionContext = null;
protected Class<? extends HLogKey> keyClass;
@@ -173,6 +175,7 @@ public class SequenceFileLogReader imple
this.conf = conf;
this.path = path;
reader = new WALReader(fs, path, conf);
+ this.fs = fs;
// If compression is enabled, new dictionaries are created here.
boolean compression = reader.isWALCompressionEnabled();
@@ -237,11 +240,22 @@ public class SequenceFileLogReader imple
throw addFileInfoToException(ioe);
}
edit++;
+ if (compressionContext != null && emptyCompressionContext) {
+ emptyCompressionContext = false;
+ }
return b? e: null;
}
@Override
public void seek(long pos) throws IOException {
+ if (compressionContext != null && emptyCompressionContext) {
+ while (next() != null) {
+ if (getPosition() == pos) {
+ emptyCompressionContext = false;
+ break;
+ }
+ }
+ }
try {
reader.seek(pos);
} catch (IOException ioe) {
@@ -286,4 +300,11 @@ public class SequenceFileLogReader imple
return ioe;
}
+
+ @Override
+ public void reset() throws IOException {
+ // Resetting the reader lets us see newly added data if the file is being written to
+ // We also keep the same compressionContext which was previously populated for this file
+ reader = new WALReader(fs, path, conf);
+ }
}
\ No newline at end of file
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java?rev=1424810&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java Fri Dec 21 05:59:06 2012
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+import java.io.IOException;
+
+/**
+ * Wrapper class around HLog to help manage the implementation details
+ * such as compression.
+ */
+@InterfaceAudience.Private
+public class ReplicationHLogReaderManager {
+
+ private static final Log LOG = LogFactory.getLog(ReplicationHLogReaderManager.class);
+ private final FileSystem fs;
+ private final Configuration conf;
+ private long position = 0;
+ private HLog.Reader reader;
+ private Path lastPath;
+
+ /**
+ * Creates the helper but doesn't open any file
+ * Use setInitialPosition after using the constructor if some content needs to be skipped
+ * @param fs
+ * @param conf
+ */
+ public ReplicationHLogReaderManager(FileSystem fs, Configuration conf) {
+ this.fs = fs;
+ this.conf = conf;
+ }
+
+ /**
+ * Opens the file at the current position
+ * @param path
+ * @return
+ * @throws IOException
+ */
+ public HLog.Reader openReader(Path path) throws IOException {
+ // Detect if this is a new file, if so get a new reader else
+ // reset the current reader so that we see the new data
+ if (this.reader == null || !this.lastPath.equals(path)) {
+ this.reader = HLog.getReader(this.fs, path, this.conf);
+ this.lastPath = path;
+ } else {
+ this.reader.reset();
+ }
+ return this.reader;
+ }
+
+ /**
+ * Get the next entry, returned and also added in the array
+ * @param entriesArray
+ * @param currentNbEntries
+ * @return a new entry or null
+ * @throws IOException
+ */
+ public HLog.Entry readNextAndSetPosition(HLog.Entry[] entriesArray,
+ int currentNbEntries) throws IOException {
+ HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
+ // Store the position so that in the future the reader can start
+ // reading from here. If the above call to next() throws an
+ // exception, the position won't be changed and retry will happen
+ // from the last known good position
+ this.position = this.reader.getPosition();
+ // We need to set the CC to null else it will be compressed when sent to the sink
+ if (entry != null) {
+ entry.setCompressionContext(null);
+ }
+ return entry;
+ }
+
+ /**
+ * Advance the reader to the current position
+ * @throws IOException
+ */
+ public void seek() throws IOException {
+ if (this.position != 0) {
+ this.reader.seek(this.position);
+ }
+ }
+
+ /**
+ * Get the position that we stopped reading at
+ * @return current position, cannot be negative
+ */
+ public long getPosition() {
+ return this.position;
+ }
+
+ public void setPosition(long pos) {
+ this.position = pos;
+ }
+
+ /**
+ * Close the current reader
+ * @throws IOException
+ */
+ public void closeReader() throws IOException {
+ if (this.reader != null) {
+ this.reader.close();
+ }
+ }
+
+ /**
+ * Tell the helper to reset internal state
+ */
+ public void finishCurrentFile() {
+ this.position = 0;
+ this.reader = null;
+ }
+
+}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Fri Dec 21 05:59:06 2012
@@ -105,8 +105,6 @@ public class ReplicationSource extends T
private int replicationQueueNbCapacity;
// Our reader for the current log
private HLog.Reader reader;
- // Current position in the log
- private long position = 0;
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
@@ -132,10 +130,15 @@ public class ReplicationSource extends T
private int currentNbEntries = 0;
// Current number of operations (Put/Delete) that we need to replicate
private int currentNbOperations = 0;
+ // Current size of data we need to replicate
+ private int currentSize = 0;
// Indicates if this particular source is running
private volatile boolean running = true;
// Metrics for this source
private ReplicationSourceMetrics metrics;
+ // Handle on the log reader helper
+ private ReplicationHLogReaderManager repLogReader;
+
/**
* Instantiation method used by region servers
@@ -183,7 +186,7 @@ public class ReplicationSource extends T
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs;
this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
-
+ this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
try {
this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
} catch (KeeperException ke) {
@@ -263,8 +266,8 @@ public class ReplicationSource extends T
// normally has a position (unless the RS failed between 2 logs)
if (this.queueRecovered) {
try {
- this.position = this.zkHelper.getHLogRepPosition(
- this.peerClusterZnode, this.queue.peek().getName());
+ this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
+ this.peerClusterZnode, this.queue.peek().getName()));
} catch (KeeperException e) {
this.terminate("Couldn't get the position of this recovered queue " +
peerClusterZnode, e);
@@ -322,6 +325,7 @@ public class ReplicationSource extends T
boolean gotIOE = false;
currentNbEntries = 0;
+ currentSize = 0;
try {
if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) {
continue;
@@ -357,9 +361,7 @@ public class ReplicationSource extends T
}
} finally {
try {
- if (this.reader != null) {
- this.reader.close();
- }
+ this.repLogReader.closeReader();
} catch (IOException e) {
gotIOE = true;
LOG.warn("Unable to finalize the tailing of a file", e);
@@ -370,10 +372,10 @@ public class ReplicationSource extends T
// wait a bit and retry.
// But if we need to stop, don't bother sleeping
if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
- if (this.lastLoggedPosition != this.position) {
+ if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
- this.lastLoggedPosition = this.position;
+ this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo);
+ this.lastLoggedPosition = this.repLogReader.getPosition();
}
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
sleepMultiplier++;
@@ -405,11 +407,9 @@ public class ReplicationSource extends T
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
throws IOException{
long seenEntries = 0;
- if (this.position != 0) {
- this.reader.seek(this.position);
- }
- long startPosition = this.position;
- HLog.Entry entry = readNextAndSetPosition();
+ this.repLogReader.seek();
+ HLog.Entry entry =
+ this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
while (entry != null) {
WALEdit edit = entry.getEdit();
this.metrics.logEditsReadRate.inc(1);
@@ -433,18 +433,18 @@ public class ReplicationSource extends T
}
currentNbOperations += countDistinctRowKeys(edit);
currentNbEntries++;
+ currentSize += entry.getEdit().heapSize();
} else {
this.metrics.logEditsFilteredRate.inc(1);
}
}
// Stop if too many entries or too big
- if ((this.reader.getPosition() - startPosition)
- >= this.replicationQueueSizeCapacity ||
+ if (currentSize >= this.replicationQueueSizeCapacity ||
currentNbEntries >= this.replicationQueueNbCapacity) {
break;
}
try {
- entry = readNextAndSetPosition();
+ entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
} catch (IOException ie) {
LOG.debug("Break on IOE: " + ie.getMessage());
break;
@@ -452,7 +452,7 @@ public class ReplicationSource extends T
}
LOG.debug("currentNbOperations:" + currentNbOperations +
" and seenEntries:" + seenEntries +
- " and size: " + (this.reader.getPosition() - startPosition));
+ " and size: " + this.currentSize);
if (currentWALisBeingWrittenTo) {
return false;
}
@@ -461,16 +461,6 @@ public class ReplicationSource extends T
return seenEntries == 0 && processEndOfFile();
}
- private HLog.Entry readNextAndSetPosition() throws IOException {
- HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
- // Store the position so that in the future the reader can start
- // reading from here. If the above call to next() throws an
- // exception, the position won't be changed and retry will happen
- // from the last known good position
- this.position = this.reader.getPosition();
- return entry;
- }
-
private void connectToPeers() {
// Connect to peer cluster first, unless we have to stop
while (this.isActive() && this.currentPeers.size() == 0) {
@@ -509,10 +499,9 @@ public class ReplicationSource extends T
protected boolean openReader(int sleepMultiplier) {
try {
LOG.debug("Opening log for replication " + this.currentPath.getName() +
- " at " + this.position);
+ " at " + this.repLogReader.getPosition());
try {
- this.reader = null;
- this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
+ this.reader = repLogReader.openReader(this.currentPath);
} catch (FileNotFoundException fnfe) {
if (this.queueRecovered) {
// We didn't find the log in the archive directory, look if it still
@@ -648,10 +637,10 @@ public class ReplicationSource extends T
HRegionInterface rrs = getRS();
LOG.debug("Replicating " + currentNbEntries);
rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
- if (this.lastLoggedPosition != this.position) {
+ if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
- this.lastLoggedPosition = this.position;
+ this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo);
+ this.lastLoggedPosition = this.repLogReader.getPosition();
}
this.totalReplicatedEdits += currentNbEntries;
this.metrics.shippedBatchesRate.inc(1);
@@ -721,7 +710,8 @@ public class ReplicationSource extends T
protected boolean processEndOfFile() {
if (this.queue.size() != 0) {
this.currentPath = null;
- this.position = 0;
+ this.repLogReader.finishCurrentFile();
+ this.reader = null;
return true;
} else if (this.queueRecovered) {
this.manager.closeRecoveredQueue(this);
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java Fri Dec 21 05:59:06 2012
@@ -49,6 +49,9 @@ public class FaultySequenceFileLogReader
HLogKey key = HLog.newKey(conf);
WALEdit val = new WALEdit();
HLog.Entry e = new HLog.Entry(key, val);
+ if (compressionContext != null) {
+ e.setCompressionContext(compressionContext);
+ }
b = this.reader.next(e.getKey(), e.getEdit());
nextQueue.offer(e);
numberOfFileEntries++;
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Fri Dec 21 05:59:06 2012
@@ -475,15 +475,13 @@ public class TestHLog {
throw t.exception;
// Make sure you can read all the content
- SequenceFile.Reader reader
- = new SequenceFile.Reader(this.fs, walPath, this.conf);
+ HLog.Reader reader = HLog.getReader(this.fs, walPath, this.conf);
int count = 0;
- HLogKey key = HLog.newKey(conf);
- WALEdit val = new WALEdit();
- while (reader.next(key, val)) {
+ HLog.Entry entry = new HLog.Entry();
+ while (reader.next(entry) != null) {
count++;
assertTrue("Should be one KeyValue per WALEdit",
- val.getKeyValues().size() == 1);
+ entry.getEdit().getKeyValues().size() == 1);
}
assertEquals(total, count);
reader.close();
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Fri Dec 21 05:59:06 2012
@@ -86,7 +86,7 @@ public class TestHLogSplit {
private Configuration conf;
private FileSystem fs;
- private final static HBaseTestingUtility
+ protected final static HBaseTestingUtility
TEST_UTIL = new HBaseTestingUtility();
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java?rev=1424810&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java Fri Dec 21 05:59:06 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestHLogSplitCompressed extends TestHLogSplit {
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TestHLogSplit.setUpBeforeClass();
+ TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+ }
+}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1424810&r1=1424809&r2=1424810&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java Fri Dec 21 05:59:06 2012
@@ -61,7 +61,7 @@ public class TestReplication {
private static final Log LOG = LogFactory.getLog(TestReplication.class);
- private static Configuration conf1;
+ protected static Configuration conf1 = HBaseConfiguration.create();
private static Configuration conf2;
private static Configuration CONF_WITH_LOCALFS;
@@ -91,7 +91,6 @@ public class TestReplication {
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- conf1 = HBaseConfiguration.create();
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
// smaller block size and capacity to trigger more operations
// and test them
@@ -520,7 +519,7 @@ public class TestReplication {
// disable and start the peer
admin.disablePeer("2");
- utility2.startMiniHBaseCluster(1, 1);
+ utility2.startMiniHBaseCluster(1, 2);
Get get = new Get(rowkey);
for (int i = 0; i < NB_RETRIES; i++) {
Result res = htable2.get(get);
@@ -760,7 +759,8 @@ public class TestReplication {
int lastCount = 0;
final long start = System.currentTimeMillis();
- for (int i = 0; i < NB_RETRIES; i++) {
+ int i = 0;
+ while (true) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for queueFailover replication. " +
"Waited "+(System.currentTimeMillis() - start)+"ms.");
@@ -772,6 +772,8 @@ public class TestReplication {
if (res2.length < initialCount) {
if (lastCount < res2.length) {
i--; // Don't increment timeout if we make progress
+ } else {
+ i++;
}
lastCount = res2.length;
LOG.info("Only got " + lastCount + " rows instead of " +
@@ -791,7 +793,7 @@ public class TestReplication {
Thread.sleep(timeout);
utility.expireRegionServerSession(rs);
} catch (Exception e) {
- LOG.error(e);
+ LOG.error("Couldn't kill a region server", e);
}
}
};
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java?rev=1424810&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java Fri Dec 21 05:59:06 2012
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Run the same test as TestReplication but with HLog compression enabled
+ */
+@Category(LargeTests.class)
+public class TestReplicationWithCompression extends TestReplication {
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+ TestReplication.setUpBeforeClass();
+ }
+}