You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/11/12 23:14:33 UTC
svn commit: r1541269 - in
/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase:
regionserver/wal/WALEdit.java
replication/regionserver/ReplicationHLogReaderManager.java
replication/regionserver/ReplicationSource.java
Author: larsh
Date: Tue Nov 12 22:14:32 2013
New Revision: 1541269
URL: http://svn.apache.org/r1541269
Log:
HBASE-9865 Reused WALEdits in replication may cause RegionServers to go OOM
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java?rev=1541269&r1=1541268&r2=1541269&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java Tue Nov 12 22:14:32 2013
@@ -84,7 +84,7 @@ public class WALEdit implements Writable
private static final String PREFIX_CLUSTER_KEY = ".";
private final int VERSION_2 = -1;
- private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
+ private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>(1);
/**
* This variable contains the information of the column family replication settings and contains
@@ -132,7 +132,7 @@ public class WALEdit implements Writable
return kvs.size();
}
- public List<KeyValue> getKeyValues() {
+ public ArrayList<KeyValue> getKeyValues() {
return kvs;
}
@@ -218,6 +218,7 @@ public class WALEdit implements Writable
}
// read in all the key values
+ kvs.ensureCapacity(length);
for(int i=0; i< length && decoder.advance(); i++) {
kvs.add(decoder.current());
}
@@ -254,7 +255,7 @@ public class WALEdit implements Writable
}
public long heapSize() {
- long ret = 0;
+ long ret = ClassSize.ARRAYLIST;
for (KeyValue kv : kvs) {
ret += kv.heapSize();
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java?rev=1541269&r1=1541268&r2=1541269&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java Tue Nov 12 22:14:32 2013
@@ -80,14 +80,11 @@ public class ReplicationHLogReaderManage
/**
* Get the next entry, returned and also added in the array
- * @param entriesArray
- * @param currentNbEntries
* @return a new entry or null
* @throws IOException
*/
- public HLog.Entry readNextAndSetPosition(HLog.Entry[] entriesArray,
- int currentNbEntries) throws IOException {
- HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
+ public HLog.Entry readNextAndSetPosition() throws IOException {
+ HLog.Entry entry = this.reader.next();
// Store the position so that in the future the reader can start
// reading from here. If the above call to next() throws an
// exception, the position won't be changed and retry will happen
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1541269&r1=1541268&r2=1541269&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Nov 12 22:14:32 2013
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -79,8 +78,6 @@ public class ReplicationSource extends T
private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
// Queue of logs to process
private PriorityBlockingQueue<Path> queue;
- // container of entries to replicate
- private HLog.Entry[] entriesArray;
private HConnection conn;
// Helper class for zookeeper
private ReplicationZookeeper zkHelper;
@@ -127,8 +124,6 @@ public class ReplicationSource extends T
private int maxRetriesMultiplier;
// Socket timeouts require even bolder actions since we don't want to DDOS
private int socketTimeoutMultiplier;
- // Current number of entries that we need to replicate
- private int currentNbEntries = 0;
// Current number of operations (Put/Delete) that we need to replicate
private int currentNbOperations = 0;
// Current size of data we need to replicate
@@ -165,10 +160,6 @@ public class ReplicationSource extends T
this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
this.replicationQueueNbCapacity =
this.conf.getInt("replication.source.nb.capacity", 25000);
- this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity];
- for (int i = 0; i < this.replicationQueueNbCapacity; i++) {
- this.entriesArray[i] = new HLog.Entry();
- }
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
maxRetriesMultiplier * maxRetriesMultiplier);
@@ -382,10 +373,10 @@ public class ReplicationSource extends T
boolean gotIOE = false;
currentNbOperations = 0;
- currentNbEntries = 0;
+ List<HLog.Entry> entries = new ArrayList<HLog.Entry>(1);
currentSize = 0;
try {
- if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) {
+ if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
continue;
}
} catch (IOException ioe) {
@@ -404,11 +395,6 @@ public class ReplicationSource extends T
} catch (IOException e) {
LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
}
- } else if (currentNbEntries != 0) {
- LOG.warn(peerClusterZnode + " Got EOF while reading, " +
- "looks like this file is broken? " + currentPath);
- considerDumping = true;
- currentNbEntries = 0;
}
if (considerDumping &&
@@ -430,7 +416,7 @@ public class ReplicationSource extends T
// If we didn't get anything to replicate, or if we hit a IOE,
// wait a bit and retry.
// But if we need to stop, don't bother sleeping
- if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
+ if (this.isActive() && (gotIOE || entries.isEmpty())) {
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo);
@@ -442,8 +428,7 @@ public class ReplicationSource extends T
continue;
}
sleepMultiplier = 1;
- shipEdits(currentWALisBeingWrittenTo);
-
+ shipEdits(currentWALisBeingWrittenTo, entries);
}
if (this.conn != null) {
try {
@@ -459,16 +444,17 @@ public class ReplicationSource extends T
* Read all the entries from the current log files and retain those
* that need to be replicated. Else, process the end of the current file.
* @param currentWALisBeingWrittenTo is the current WAL being written to
+ * @param entries resulting entries to be replicated
* @return true if we got nothing and went to the next file, false if we got
* entries
* @throws IOException
*/
- protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
+ protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries)
throws IOException{
long seenEntries = 0;
this.repLogReader.seek();
HLog.Entry entry =
- this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
+ this.repLogReader.readNextAndSetPosition();
while (entry != null) {
WALEdit edit = entry.getEdit();
this.metrics.logEditsReadRate.inc(1);
@@ -499,7 +485,7 @@ public class ReplicationSource extends T
edit.addClusterId(clusterId);
}
currentNbOperations += countDistinctRowKeys(edit);
- currentNbEntries++;
+ entries.add(entry);
currentSize += entry.getEdit().heapSize();
} else {
this.metrics.logEditsFilteredRate.inc(1);
@@ -507,11 +493,11 @@ public class ReplicationSource extends T
}
// Stop if too many entries or too big
if (currentSize >= this.replicationQueueSizeCapacity ||
- currentNbEntries >= this.replicationQueueNbCapacity) {
+ entries.size() >= this.replicationQueueNbCapacity) {
break;
}
try {
- entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
+ entry = this.repLogReader.readNextAndSetPosition();
} catch (IOException ie) {
LOG.debug("Break on IOE: " + ie.getMessage());
break;
@@ -670,8 +656,9 @@ public class ReplicationSource extends T
* @param edit The KV to check for replication
*/
protected void removeNonReplicableEdits(WALEdit edit) {
- List<KeyValue> kvs = edit.getKeyValues();
- for (int i = edit.size()-1; i >= 0; i--) {
+ ArrayList<KeyValue> kvs = edit.getKeyValues();
+ int size = edit.size();
+ for (int i = size-1; i >= 0; i--) {
KeyValue kv = kvs.get(i);
// The scope will be null or empty if
// there's nothing to replicate in that WALEdit
@@ -679,6 +666,9 @@ public class ReplicationSource extends T
kvs.remove(i);
}
}
+ if (edit.size() < size/2) {
+ kvs.trimToSize();
+ }
}
/**
@@ -704,9 +694,9 @@ public class ReplicationSource extends T
* @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
* written to when this method was called
*/
- protected void shipEdits(boolean currentWALisBeingWrittenTo) {
+ protected void shipEdits(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries) {
int sleepMultiplier = 1;
- if (this.currentNbEntries == 0) {
+ if (entries.isEmpty()) {
LOG.warn("Was given 0 edits to ship");
return;
}
@@ -719,19 +709,20 @@ public class ReplicationSource extends T
}
try {
HRegionInterface rrs = getRS();
- LOG.debug("Replicating " + currentNbEntries);
- rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
+ LOG.debug("Replicating " + entries.size());
+ // can't avoid the copy here, the replicateLogEntries RPC require an HLog.Entry[]
+ rrs.replicateLogEntries(entries.toArray(new HLog.Entry[entries.size()]));
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.repLogReader.getPosition();
}
- this.totalReplicatedEdits += currentNbEntries;
+ this.totalReplicatedEdits += entries.size();
this.metrics.shippedBatchesRate.inc(1);
this.metrics.shippedOpsRate.inc(
this.currentNbOperations);
this.metrics.setAgeOfLastShippedOp(
- this.entriesArray[currentNbEntries-1].getKey().getWriteTime());
+ entries.get(entries.size()-1).getKey().getWriteTime());
LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
break;