You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2012/12/19 23:23:28 UTC

svn commit: r1424174 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/wal/ main/java/org/apache/hadoop/hbase/replication/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/wal/ test/java/org/apache/hadoop/hba...

Author: jdcryans
Date: Wed Dec 19 22:23:27 2012
New Revision: 1424174

URL: http://svn.apache.org/viewvc?rev=1424174&view=rev
Log:
HBASE-5778  Fix HLog compression's incompatibilities

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java Wed Dec 19 22:23:27 2012
@@ -144,7 +144,12 @@ public class Compressor {
       // the status byte also acts as the higher order byte of the dictionary
       // entry
       short dictIdx = toShort(status, in.readByte());
-      byte[] entry = dict.getEntry(dictIdx);
+      byte[] entry;
+      try {
+        entry = dict.getEntry(dictIdx);
+      } catch (Exception ex) {
+        throw new IOException("Unable to uncompress the log entry", ex);
+      }
       if (entry == null) {
         throw new IOException("Missing dictionary entry for index "
             + dictIdx);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Wed Dec 19 22:23:27 2012
@@ -71,6 +71,7 @@ public interface HLog {
     void seek(long pos) throws IOException;
 
     long getPosition() throws IOException;
+    void reset() throws IOException;
   }
 
   public interface Writer {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java Wed Dec 19 22:23:27 2012
@@ -62,7 +62,9 @@ public class HLogFactory {
     }
     
     /**
-     * Create a reader for the WAL.
+     * Create a reader for the WAL. If you are reading from a file that's being written to
+     * and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method
+     * then just seek back to the last known good position.
      * @return A WAL reader.  Close when done with it.
      * @throws IOException
      */

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Wed Dec 19 22:23:27 2012
@@ -140,15 +140,17 @@ public class SequenceFileLogReader imple
 
   Configuration conf;
   WALReader reader;
+  FileSystem fs;
 
   // Needed logging exceptions
   Path path;
   int edit = 0;
   long entryStart = 0;
+  boolean emptyCompressionContext = true;
   /**
    * Compression context to use reading.  Can be null if no compression.
    */
-  private CompressionContext compressionContext = null;
+  protected CompressionContext compressionContext = null;
 
   protected Class<? extends HLogKey> keyClass;
 
@@ -174,6 +176,7 @@ public class SequenceFileLogReader imple
     this.conf = conf;
     this.path = path;
     reader = new WALReader(fs, path, conf);
+    this.fs = fs;
 
     // If compression is enabled, new dictionaries are created here.
     boolean compression = reader.isWALCompressionEnabled();
@@ -238,11 +241,22 @@ public class SequenceFileLogReader imple
       throw addFileInfoToException(ioe);
     }
     edit++;
+    if (compressionContext != null && emptyCompressionContext) {
+      emptyCompressionContext = false;
+    }
     return b? e: null;
   }
 
   @Override
   public void seek(long pos) throws IOException {
+    if (compressionContext != null && emptyCompressionContext) {
+      while (next() != null) {
+        if (getPosition() == pos) {
+          emptyCompressionContext = false;
+          break;
+        }
+      }
+    }
     try {
       reader.seek(pos);
     } catch (IOException ioe) {
@@ -287,4 +301,11 @@ public class SequenceFileLogReader imple
 
     return ioe;
   }
+
+  @Override
+  public void reset() throws IOException {
+    // Resetting the reader lets us see newly added data if the file is being written to
+    // We also keep the same compressionContext which was previously populated for this file
+    reader = new WALReader(fs, path, conf);
+  }
 }

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java?rev=1424174&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java Wed Dec 19 22:23:27 2012
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+
+import java.io.IOException;
+
+/**
+ * Wrapper class around HLog to help manage the implementation details
+ * such as compression.
+ */
+@InterfaceAudience.Private
+public class ReplicationHLogReaderManager {
+
+  private static final Log LOG = LogFactory.getLog(ReplicationHLogReaderManager.class);
+  private final FileSystem fs;
+  private final Configuration conf;
+  private long position = 0;
+  private HLog.Reader reader;
+  private Path lastPath;
+
+  /**
+   * Creates the helper but doesn't open any file
+   * Use setInitialPosition after using the constructor if some content needs to be skipped
+   * @param fs
+   * @param conf
+   */
+  public ReplicationHLogReaderManager(FileSystem fs, Configuration conf) {
+    this.fs = fs;
+    this.conf = conf;
+  }
+
+  /**
+   * Opens the file at the current position
+   * @param path
+   * @return
+   * @throws IOException
+   */
+  public HLog.Reader openReader(Path path) throws IOException {
+    // Detect if this is a new file, if so get a new reader else
+    // reset the current reader so that we see the new data
+    if (this.reader == null || !this.lastPath.equals(path)) {
+      this.reader = HLogFactory.createReader(this.fs, path, this.conf);
+      this.lastPath = path;
+    } else {
+      this.reader.reset();
+    }
+    return this.reader;
+  }
+
+  /**
+   * Get the next entry, returned and also added in the array
+   * @param entriesArray
+   * @param currentNbEntries
+   * @return a new entry or null
+   * @throws IOException
+   */
+  public HLog.Entry readNextAndSetPosition(HLog.Entry[] entriesArray,
+                                           int currentNbEntries) throws IOException {
+    HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
+    // Store the position so that in the future the reader can start
+    // reading from here. If the above call to next() throws an
+    // exception, the position won't be changed and retry will happen
+    // from the last known good position
+    this.position = this.reader.getPosition();
+    // We need to set the CC to null else it will be compressed when sent to the sink
+    if (entry != null) {
+      entry.setCompressionContext(null);
+    }
+    return entry;
+  }
+
+  /**
+   * Advance the reader to the current position
+   * @throws IOException
+   */
+  public void seek() throws IOException {
+    if (this.position != 0) {
+      this.reader.seek(this.position);
+    }
+  }
+
+  /**
+   * Get the position that we stopped reading at
+   * @return current position, cannot be negative
+   */
+  public long getPosition() {
+    return this.position;
+  }
+
+  public void setPosition(long pos) {
+    this.position = pos;
+  }
+
+  /**
+   * Close the current reader
+   * @throws IOException
+   */
+  public void closeReader() throws IOException {
+    if (this.reader != null) {
+      this.reader.close();
+    }
+  }
+
+  /**
+   * Tell the helper to reset internal state
+   */
+  public void finishCurrentFile() {
+    this.position = 0;
+    this.reader = null;
+  }
+
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Dec 19 22:23:27 2012
@@ -107,8 +107,6 @@ public class ReplicationSource extends T
   private int replicationQueueNbCapacity;
   // Our reader for the current log
   private HLog.Reader reader;
-  // Current position in the log
-  private long position = 0;
   // Last position in the log that we sent to ZooKeeper
   private long lastLoggedPosition = -1;
   // Path of the current log
@@ -134,10 +132,14 @@ public class ReplicationSource extends T
   private int currentNbEntries = 0;
   // Current number of operations (Put/Delete) that we need to replicate
   private int currentNbOperations = 0;
+  // Current size of data we need to replicate
+  private int currentSize = 0;
   // Indicates if this particular source is running
   private volatile boolean running = true;
   // Metrics for this source
   private MetricsSource metrics;
+  // Handle on the log reader helper
+  private ReplicationHLogReaderManager repLogReader;
 
   /**
    * Instantiation method used by region servers
@@ -185,7 +187,7 @@ public class ReplicationSource extends T
         this.conf.getLong("replication.source.sleepforretries", 1000);
     this.fs = fs;
     this.metrics = new MetricsSource(peerClusterZnode);
-
+    this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
     try {
       this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
     } catch (KeeperException ke) {
@@ -266,8 +268,8 @@ public class ReplicationSource extends T
     // normally has a position (unless the RS failed between 2 logs)
     if (this.queueRecovered) {
       try {
-        this.position = this.zkHelper.getHLogRepPosition(
-            this.peerClusterZnode, this.queue.peek().getName());
+        this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
+            this.peerClusterZnode, this.queue.peek().getName()));
       } catch (KeeperException e) {
         this.terminate("Couldn't get the position of this recovered queue " +
             peerClusterZnode, e);
@@ -325,6 +327,7 @@ public class ReplicationSource extends T
 
       boolean gotIOE = false;
       currentNbEntries = 0;
+      currentSize = 0;
       try {
         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) {
           continue;
@@ -360,9 +363,7 @@ public class ReplicationSource extends T
         }
       } finally {
         try {
-          if (this.reader != null) {
-            this.reader.close();
-          }
+          this.repLogReader.closeReader();
         } catch (IOException e) {
           gotIOE = true;
           LOG.warn("Unable to finalize the tailing of a file", e);
@@ -373,10 +374,11 @@ public class ReplicationSource extends T
       // wait a bit and retry.
       // But if we need to stop, don't bother sleeping
       if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
-        if (this.lastLoggedPosition != this.position) {
+        if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
           this.manager.logPositionAndCleanOldLogs(this.currentPath,
-              this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
-          this.lastLoggedPosition = this.position;
+              this.peerClusterZnode, this.repLogReader.getPosition(),
+              queueRecovered, currentWALisBeingWrittenTo);
+          this.lastLoggedPosition = this.repLogReader.getPosition();
         }
         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
           sleepMultiplier++;
@@ -409,11 +411,9 @@ public class ReplicationSource extends T
   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
       throws IOException{
     long seenEntries = 0;
-    if (this.position != 0) {
-      this.reader.seek(this.position);
-    }
-    long startPosition = this.position;
-    HLog.Entry entry = readNextAndSetPosition(); 
+    this.repLogReader.seek();
+    HLog.Entry entry =
+        this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
     while (entry != null) {
       WALEdit edit = entry.getEdit();
       this.metrics.incrLogEditsRead();
@@ -437,18 +437,18 @@ public class ReplicationSource extends T
           }
           currentNbOperations += countDistinctRowKeys(edit);
           currentNbEntries++;
+          currentSize += entry.getEdit().size();
         } else {
           this.metrics.incrLogEditsFiltered();
         }
       }
       // Stop if too many entries or too big
-      if ((this.reader.getPosition() - startPosition)
-          >= this.replicationQueueSizeCapacity ||
+      if (currentSize >= this.replicationQueueSizeCapacity ||
           currentNbEntries >= this.replicationQueueNbCapacity) {
         break;
       }
       try {
-        entry = readNextAndSetPosition();
+        entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
       } catch (IOException ie) {
         LOG.debug("Break on IOE: " + ie.getMessage());
         break;
@@ -462,16 +462,6 @@ public class ReplicationSource extends T
     return seenEntries == 0 && processEndOfFile();
   }
 
-  private HLog.Entry readNextAndSetPosition() throws IOException {
-    HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
-    // Store the position so that in the future the reader can start
-    // reading from here. If the above call to next() throws an
-    // exception, the position won't be changed and retry will happen
-    // from the last known good position
-    this.position = this.reader.getPosition();
-    return entry;
-  } 
-
   private void connectToPeers() {
     // Connect to peer cluster first, unless we have to stop
     while (this.isActive() && this.currentPeers.size() == 0) {
@@ -510,9 +500,7 @@ public class ReplicationSource extends T
   protected boolean openReader(int sleepMultiplier) {
     try {
       try {
-       this.reader = null;
-       this.reader = HLogFactory.createReader(this.fs, 
-           this.currentPath, this.conf);
+        this.reader = repLogReader.openReader(this.currentPath);
       } catch (FileNotFoundException fnfe) {
         if (this.queueRecovered) {
           // We didn't find the log in the archive directory, look if it still
@@ -648,10 +636,11 @@ public class ReplicationSource extends T
         AdminProtocol rrs = getRS();
         ProtobufUtil.replicateWALEntry(rrs,
           Arrays.copyOf(this.entriesArray, currentNbEntries));
-        if (this.lastLoggedPosition != this.position) {
+        if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
           this.manager.logPositionAndCleanOldLogs(this.currentPath,
-              this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
-          this.lastLoggedPosition = this.position;
+              this.peerClusterZnode, this.repLogReader.getPosition(),
+              queueRecovered, currentWALisBeingWrittenTo);
+          this.lastLoggedPosition = this.repLogReader.getPosition();
         }
         this.totalReplicatedEdits += currentNbEntries;
         this.metrics.shipBatch(this.currentNbOperations);
@@ -718,7 +707,8 @@ public class ReplicationSource extends T
   protected boolean processEndOfFile() {
     if (this.queue.size() != 0) {
       this.currentPath = null;
-      this.position = 0;
+      this.repLogReader.finishCurrentFile();
+      this.reader = null;
       return true;
     } else if (this.queueRecovered) {
       this.manager.closeRecoveredQueue(this);
@@ -842,8 +832,14 @@ public class ReplicationSource extends T
 
   @Override
   public String getStats() {
+    String position;
+    try {
+      position = this.reader.getPosition()+"";
+    } catch (IOException ioe) {
+      position = "N/A";
+    }
     return "Total replicated edits: " + totalReplicatedEdits +
       ", currently replicating from: " + this.currentPath +
-      " at position: " + this.position;
+      " at position: " + position;
   }
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java Wed Dec 19 22:23:27 2012
@@ -49,6 +49,9 @@ public class FaultySequenceFileLogReader
         HLogKey key = HLogUtil.newKey(conf);
         WALEdit val = new WALEdit();
         HLog.Entry e = new HLog.Entry(key, val);
+        if (compressionContext != null) {
+          e.setCompressionContext(compressionContext);
+        }
         b = this.reader.next(e.getKey(), e.getEdit());
         nextQueue.offer(e);
         numberOfFileEntries++;

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Wed Dec 19 22:23:27 2012
@@ -477,15 +477,13 @@ public class TestHLog  {
       throw t.exception;
 
     // Make sure you can read all the content
-    SequenceFile.Reader reader
-      = new SequenceFile.Reader(this.fs, walPath, this.conf);
+    HLog.Reader reader = HLogFactory.createReader(this.fs, walPath, this.conf);
     int count = 0;
-    HLogKey key = HLogUtil.newKey(conf);
-    WALEdit val = new WALEdit();
-    while (reader.next(key, val)) {
+    HLog.Entry entry = new HLog.Entry();
+    while (reader.next(entry) != null) {
       count++;
       assertTrue("Should be one KeyValue per WALEdit",
-                 val.getKeyValues().size() == 1);
+                  entry.getEdit().getKeyValues().size() == 1);
     }
     assertEquals(total, count);
     reader.close();

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Wed Dec 19 22:23:27 2012
@@ -100,7 +100,7 @@ public class TestHLogSplit {
   private Configuration conf;
   private FileSystem fs;
 
-  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
 
   private static final Path HBASEDIR = new Path("/hbase");

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java?rev=1424174&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplitCompressed.java Wed Dec 19 22:23:27 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestHLogSplitCompressed extends TestHLogSplit {
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TestHLogSplit.setUpBeforeClass();
+    TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1424174&r1=1424173&r2=1424174&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java Wed Dec 19 22:23:27 2012
@@ -60,7 +60,7 @@ public class TestReplication {
 
   private static final Log LOG = LogFactory.getLog(TestReplication.class);
 
-  private static Configuration conf1;
+  protected static Configuration conf1 = HBaseConfiguration.create();
   private static Configuration conf2;
   private static Configuration CONF_WITH_LOCALFS;
 
@@ -90,7 +90,6 @@ public class TestReplication {
    */
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    conf1 = HBaseConfiguration.create();
     conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
     // smaller block size and capacity to trigger more operations
     // and test them
@@ -520,7 +519,7 @@ public class TestReplication {
 
     // disable and start the peer
     admin.disablePeer("2");
-    utility2.startMiniHBaseCluster(1, 1);
+    utility2.startMiniHBaseCluster(1, 2);
     Get get = new Get(rowkey);
     for (int i = 0; i < NB_RETRIES; i++) {
       Result res = htable2.get(get);
@@ -760,7 +759,8 @@ public class TestReplication {
     int lastCount = 0;
 
     final long start = System.currentTimeMillis();
-    for (int i = 0; i < NB_RETRIES; i++) {
+    int i = 0;
+    while (true) {
       if (i==NB_RETRIES-1) {
         fail("Waited too much time for queueFailover replication. " +
           "Waited "+(System.currentTimeMillis() - start)+"ms.");
@@ -772,6 +772,8 @@ public class TestReplication {
       if (res2.length < initialCount) {
         if (lastCount < res2.length) {
           i--; // Don't increment timeout if we make progress
+        } else {
+          i++;
         }
         lastCount = res2.length;
         LOG.info("Only got " + lastCount + " rows instead of " +
@@ -791,7 +793,7 @@ public class TestReplication {
           Thread.sleep(timeout);
           utility.expireRegionServerSession(rs);
         } catch (Exception e) {
-          LOG.error(e);
+          LOG.error("Couldn't kill a region server", e);
         }
       }
     };

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java?rev=1424174&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java Wed Dec 19 22:23:27 2012
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Run the same test as TestReplication but with HLog compression enabled
+ */
+@Category(LargeTests.class)
+public class TestReplicationWithCompression extends TestReplication {
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    TestReplication.setUpBeforeClass();
+  }
+}