You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/11/12 23:13:01 UTC

svn commit: r1541265 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/wal/ main/java/org/apache/hadoop/hbase/replication/regionserver/ test/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/repl...

Author: larsh
Date: Tue Nov 12 22:13:01 2013
New Revision: 1541265

URL: http://svn.apache.org/r1541265
Log:
HBASE-9865 Reused WALEdits in replication may cause RegionServers to go OOM

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java?rev=1541265&r1=1541264&r2=1541265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java Tue Nov 12 22:13:01 2013
@@ -85,7 +85,7 @@ public class WALEdit implements Writable
   private final int VERSION_2 = -1;
   private final boolean isReplay;
 
-  private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
+  private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>(1);
 
   // Only here for legacy writable deserialization
   @Deprecated
@@ -134,7 +134,7 @@ public class WALEdit implements Writable
     return kvs.size();
   }
 
-  public List<KeyValue> getKeyValues() {
+  public ArrayList<KeyValue> getKeyValues() {
     return kvs;
   }
 
@@ -210,6 +210,7 @@ public class WALEdit implements Writable
    */
   public int readFromCells(Codec.Decoder cellDecoder, int expectedCount) throws IOException {
     kvs.clear();
+    kvs.ensureCapacity(expectedCount);
     while (kvs.size() < expectedCount && cellDecoder.advance()) {
       Cell cell = cellDecoder.current();
       if (!(cell instanceof KeyValue)) {
@@ -221,7 +222,7 @@ public class WALEdit implements Writable
   }
 
   public long heapSize() {
-    long ret = 0;
+    long ret = ClassSize.ARRAYLIST;
     for (KeyValue kv : kvs) {
       ret += kv.heapSize();
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java?rev=1541265&r1=1541264&r2=1541265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java Tue Nov 12 22:13:01 2013
@@ -79,14 +79,11 @@ public class ReplicationHLogReaderManage
 
   /**
    * Get the next entry, returned and also added in the array
-   * @param entriesArray
-   * @param currentNbEntries
    * @return a new entry or null
    * @throws IOException
    */
-  public HLog.Entry readNextAndSetPosition(HLog.Entry[] entriesArray,
-                                           int currentNbEntries) throws IOException {
-    HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
+  public HLog.Entry readNextAndSetPosition() throws IOException {
+    HLog.Entry entry = this.reader.next();
     // Store the position so that in the future the reader can start
     // reading from here. If the above call to next() throws an
     // exception, the position won't be changed and retry will happen

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1541265&r1=1541264&r2=1541265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Nov 12 22:13:01 2013
@@ -23,7 +23,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.SocketTimeoutException;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.NavigableMap;
@@ -76,8 +76,6 @@ public class ReplicationSource extends T
   public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
   // Queue of logs to process
   private PriorityBlockingQueue<Path> queue;
-  // container of entries to replicate
-  private HLog.Entry[] entriesArray;
   private HConnection conn;
   private ReplicationQueues replicationQueues;
   private ReplicationPeers replicationPeers;
@@ -116,8 +114,6 @@ public class ReplicationSource extends T
   private int maxRetriesMultiplier;
   // Socket timeouts require even bolder actions since we don't want to DDOS
   private int socketTimeoutMultiplier;
-  // Current number of entries that we need to replicate
-  private int currentNbEntries = 0;
   // Current number of operations (Put/Delete) that we need to replicate
   private int currentNbOperations = 0;
   // Current size of data we need to replicate
@@ -153,10 +149,6 @@ public class ReplicationSource extends T
         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
     this.replicationQueueNbCapacity =
         this.conf.getInt("replication.source.nb.capacity", 25000);
-    this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity];
-    for (int i = 0; i < this.replicationQueueNbCapacity; i++) {
-      this.entriesArray[i] = new HLog.Entry();
-    }
     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
     this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
         maxRetriesMultiplier * maxRetriesMultiplier);
@@ -289,10 +281,10 @@ public class ReplicationSource extends T
 
       boolean gotIOE = false;
       currentNbOperations = 0;
-      currentNbEntries = 0;
+      List<HLog.Entry> entries = new ArrayList<HLog.Entry>(1);
       currentSize = 0;
       try {
-        if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) {
+        if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
           continue;
         }
       } catch (IOException ioe) {
@@ -311,11 +303,6 @@ public class ReplicationSource extends T
             } catch (IOException e) {
               LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
             }
-          } else if (currentNbEntries != 0) {
-            LOG.warn(this.peerClusterZnode +
-                " Got EOF while reading, " + "looks like this file is broken? " + currentPath);
-            considerDumping = true;
-            currentNbEntries = 0;
           }
 
           if (considerDumping &&
@@ -337,7 +324,7 @@ public class ReplicationSource extends T
       // If we didn't get anything to replicate, or if we hit a IOE,
       // wait a bit and retry.
       // But if we need to stop, don't bother sleeping
-      if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
+      if (this.isActive() && (gotIOE || entries.isEmpty())) {
         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
           this.manager.logPositionAndCleanOldLogs(this.currentPath,
               this.peerClusterZnode, this.repLogReader.getPosition(),
@@ -354,8 +341,7 @@ public class ReplicationSource extends T
         continue;
       }
       sleepMultiplier = 1;
-      shipEdits(currentWALisBeingWrittenTo);
-
+      shipEdits(currentWALisBeingWrittenTo, entries);
     }
     if (this.conn != null) {
       try {
@@ -372,11 +358,12 @@ public class ReplicationSource extends T
    * Read all the entries from the current log files and retain those
    * that need to be replicated. Else, process the end of the current file.
    * @param currentWALisBeingWrittenTo is the current WAL being written to
+   * @param entries resulting entries to be replicated
    * @return true if we got nothing and went to the next file, false if we got
    * entries
    * @throws IOException
    */
-  protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
+  protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries)
       throws IOException{
     long seenEntries = 0;
     if (LOG.isTraceEnabled()) {
@@ -385,7 +372,7 @@ public class ReplicationSource extends T
     }
     this.repLogReader.seek();
     HLog.Entry entry =
-        this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
+        this.repLogReader.readNextAndSetPosition();
     while (entry != null) {
       WALEdit edit = entry.getEdit();
       this.metrics.incrLogEditsRead();
@@ -402,7 +389,7 @@ public class ReplicationSource extends T
           //Mark that the current cluster has the change
           logKey.addClusterId(clusterId);
           currentNbOperations += countDistinctRowKeys(edit);
-          currentNbEntries++;
+          entries.add(entry);
           currentSize += entry.getEdit().heapSize();
         } else {
           this.metrics.incrLogEditsFiltered();
@@ -410,11 +397,11 @@ public class ReplicationSource extends T
       }
       // Stop if too many entries or too big
       if (currentSize >= this.replicationQueueSizeCapacity ||
-          currentNbEntries >= this.replicationQueueNbCapacity) {
+          entries.size() >= this.replicationQueueNbCapacity) {
         break;
       }
       try {
-        entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
+        entry = this.repLogReader.readNextAndSetPosition();
       } catch (IOException ie) {
         LOG.debug("Break on IOE: " + ie.getMessage());
         break;
@@ -583,8 +570,9 @@ public class ReplicationSource extends T
    */
   protected void removeNonReplicableEdits(HLog.Entry entry) {
     NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
-    List<KeyValue> kvs = entry.getEdit().getKeyValues();
-    for (int i = kvs.size()-1; i >= 0; i--) {
+    ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
+    int size = kvs.size();
+    for (int i = size-1; i >= 0; i--) {
       KeyValue kv = kvs.get(i);
       // The scope will be null or empty if
       // there's nothing to replicate in that WALEdit
@@ -592,6 +580,9 @@ public class ReplicationSource extends T
         kvs.remove(i);
       }
     }
+    if (kvs.size() < size/2) {
+      kvs.trimToSize();
+    }
   }
 
   /**
@@ -617,9 +608,9 @@ public class ReplicationSource extends T
    * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
    * written to when this method was called
    */
-  protected void shipEdits(boolean currentWALisBeingWrittenTo) {
+  protected void shipEdits(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries) {
     int sleepMultiplier = 1;
-    if (this.currentNbEntries == 0) {
+    if (entries.isEmpty()) {
       LOG.warn("Was given 0 edits to ship");
       return;
     }
@@ -635,22 +626,21 @@ public class ReplicationSource extends T
         sinkPeer = replicationSinkMgr.getReplicationSink();
         BlockingInterface rrs = sinkPeer.getRegionServer();
         if (LOG.isTraceEnabled()) {
-          LOG.trace("Replicating " + this.currentNbEntries +
+          LOG.trace("Replicating " + entries.size() +
               " entries of total size " + currentSize);
         }
         ReplicationProtbufUtil.replicateWALEntry(rrs,
-            Arrays.copyOf(this.entriesArray, currentNbEntries));
+            entries.toArray(new HLog.Entry[entries.size()]));
         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
           this.manager.logPositionAndCleanOldLogs(this.currentPath,
               this.peerClusterZnode, this.repLogReader.getPosition(),
               this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
           this.lastLoggedPosition = this.repLogReader.getPosition();
         }
-        this.totalReplicatedEdits += currentNbEntries;
+        this.totalReplicatedEdits += entries.size();
         this.totalReplicatedOperations += currentNbOperations;
         this.metrics.shipBatch(this.currentNbOperations);
-        this.metrics.setAgeOfLastShippedOp(
-            this.entriesArray[currentNbEntries-1].getKey().getWriteTime());
+        this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
         if (LOG.isTraceEnabled()) {
           LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
               + this.totalReplicatedOperations + " operations");

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java?rev=1541265&r1=1541264&r2=1541265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java Tue Nov 12 22:13:01 2013
@@ -143,7 +143,7 @@ public class TestWALPlayer {
     when(context.getConfiguration()).thenReturn(configuration);
 
     WALEdit value = mock(WALEdit.class);
-    List<KeyValue> values = new ArrayList<KeyValue>();
+    ArrayList<KeyValue> values = new ArrayList<KeyValue>();
     KeyValue kv1 = mock(KeyValue.class);
     when(kv1.getFamily()).thenReturn(Bytes.toBytes("family"));
     when(kv1.getRow()).thenReturn(Bytes.toBytes("row"));

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java?rev=1541265&r1=1541264&r2=1541265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java Tue Nov 12 22:13:01 2013
@@ -154,10 +154,9 @@ public class TestReplicationHLogReaderMa
     // There's one edit in the log, read it. Reading past it needs to return nulls
     assertNotNull(logManager.openReader(path));
     logManager.seek();
-    HLog.Entry[] entriesArray = new HLog.Entry[1];
-    HLog.Entry entry = logManager.readNextAndSetPosition(entriesArray, 0);
+    HLog.Entry entry = logManager.readNextAndSetPosition();
     assertNotNull(entry);
-    entry = logManager.readNextAndSetPosition(entriesArray, 0);
+    entry = logManager.readNextAndSetPosition();
     assertNull(entry);
     logManager.closeReader();
     long oldPos = logManager.getPosition();
@@ -167,7 +166,7 @@ public class TestReplicationHLogReaderMa
     // Read the newly added entry, make sure we made progress
     assertNotNull(logManager.openReader(path));
     logManager.seek();
-    entry = logManager.readNextAndSetPosition(entriesArray, 0);
+    entry = logManager.readNextAndSetPosition();
     assertNotEquals(oldPos, logManager.getPosition());
     assertNotNull(entry);
     logManager.closeReader();
@@ -178,7 +177,7 @@ public class TestReplicationHLogReaderMa
     // We rolled but we still should see the end of the first log and not get data
     assertNotNull(logManager.openReader(path));
     logManager.seek();
-    entry = logManager.readNextAndSetPosition(entriesArray, 0);
+    entry = logManager.readNextAndSetPosition();
     assertEquals(oldPos, logManager.getPosition());
     assertNull(entry);
     logManager.finishCurrentFile();
@@ -196,7 +195,7 @@ public class TestReplicationHLogReaderMa
     logManager.openReader(path);
     logManager.seek();
     for (int i = 0; i < nbRows; i++) {
-      HLog.Entry e = logManager.readNextAndSetPosition(entriesArray, 0);
+      HLog.Entry e = logManager.readNextAndSetPosition();
       if (e == null) {
         fail("Should have enough entries");
       }