You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/04/09 07:22:16 UTC
[09/20] hbase git commit: HBASE-20167 Optimize the implementation of
ReplicationSourceWALReader
HBASE-20167 Optimize the implementation of ReplicationSourceWALReader
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2d0d6a3b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2d0d6a3b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2d0d6a3b
Branch: refs/heads/branch-2
Commit: 2d0d6a3ba1bdeac37e898d37d41eb6b079fc9a6d
Parents: cea5199
Author: zhangduo <zh...@apache.org>
Authored: Mon Mar 12 12:21:44 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Apr 9 15:18:44 2018 +0800
----------------------------------------------------------------------
.../RecoveredReplicationSource.java | 67 +++++------
.../RecoveredReplicationSourceShipper.java | 48 ++------
.../RecoveredReplicationSourceWALReader.java | 56 ----------
.../regionserver/ReplicationSource.java | 36 +++---
.../regionserver/ReplicationSourceShipper.java | 27 +++--
.../ReplicationSourceWALReader.java | 101 +++++------------
.../SerialReplicationSourceWALReader.java | 112 +++++++++++++++++++
7 files changed, 218 insertions(+), 229 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index d9506c0..169b469 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -59,31 +59,41 @@ public class RecoveredReplicationSource extends ReplicationSource {
}
@Override
- protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
- final RecoveredReplicationSourceShipper worker =
- new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this,
- this.queueStorage);
- ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
- if (extant != null) {
- LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
- } else {
- LOG.debug("Starting up worker for wal group " + walGroupId);
- worker.startup(this::uncaughtException);
- worker.setWALReader(
- startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
- workerThreads.put(walGroupId, worker);
- }
+ protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
+ PriorityBlockingQueue<Path> queue) {
+ return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
+ }
+
+ private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader,
+ BlockingQueue<WALEntryBatch> entryBatchQueue, Path currentPath) throws InterruptedException {
+ LOG.trace("Didn't read any new entries from WAL");
+ // we're done with queue recovery, shut ourself down
+ reader.setReaderRunning(false);
+ // shuts down shipper thread immediately
+ entryBatchQueue.put(new WALEntryBatch(0, currentPath));
}
@Override
- protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
+ protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
PriorityBlockingQueue<Path> queue, long startPosition) {
- ReplicationSourceWALReader walReader =
- new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
- Threads.setDaemonThreadRunning(walReader,
- threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId,
- this::uncaughtException);
- return walReader;
+ if (replicationPeer.getPeerConfig().isSerial()) {
+ return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter,
+ this) {
+
+ @Override
+ protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
+ handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
+ }
+ };
+ } else {
+ return new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) {
+
+ @Override
+ protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
+ handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
+ }
+ };
+ }
}
public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
@@ -166,21 +176,14 @@ public class RecoveredReplicationSource extends ReplicationSource {
return path;
}
- public void tryFinish() {
+ void tryFinish() {
// use synchronize to make sure one last thread will clean the queue
synchronized (workerThreads) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
- boolean allTasksDone = true;
- for (ReplicationSourceShipper worker : workerThreads.values()) {
- if (!worker.isFinished()) {
- allTasksDone = false;
- break;
- }
- }
+ boolean allTasksDone = workerThreads.values().stream().allMatch(w -> w.isFinished());
if (allTasksDone) {
manager.removeRecoveredSource(this);
- LOG.info("Finished recovering queue " + queueId + " with the following stats: "
- + getStats());
+ LOG.info("Finished recovering queue {} with the following stats: {}", queueId, getStats());
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index 9c36497..1ae5cb9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -48,46 +48,18 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
}
@Override
- public void run() {
- setWorkerState(WorkerState.RUNNING);
- // Loop until we close down
- while (isActive()) {
- int sleepMultiplier = 1;
- // Sleep until replication is enabled again
- if (!source.isPeerEnabled()) {
- if (source.sleepForRetries("Replication is disabled", sleepMultiplier)) {
- sleepMultiplier++;
- }
- continue;
- }
-
- while (entryReader == null) {
- if (source.sleepForRetries("Replication WAL entry reader thread not initialized",
- sleepMultiplier)) {
- sleepMultiplier++;
- }
- }
-
- try {
- WALEntryBatch entryBatch = entryReader.take();
- shipEdits(entryBatch);
- if (entryBatch.getWalEntries().isEmpty()) {
- LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
- + source.getQueueId());
- source.getSourceMetrics().incrCompletedRecoveryQueue();
- setWorkerState(WorkerState.FINISHED);
- continue;
- }
- } catch (InterruptedException e) {
- LOG.trace("Interrupted while waiting for next replication entry batch", e);
- Thread.currentThread().interrupt();
- }
+ protected void postShipEdits(WALEntryBatch entryBatch) {
+ if (entryBatch.getWalEntries().isEmpty()) {
+ LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ + source.getQueueId());
+ source.getSourceMetrics().incrCompletedRecoveryQueue();
+ setWorkerState(WorkerState.FINISHED);
}
+ }
+
+ @Override
+ protected void postFinish() {
source.tryFinish();
- // If the worker exits run loop without finishing its task, mark it as stopped.
- if (!isFinished()) {
- setWorkerState(WorkerState.STOPPED);
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
deleted file mode 100644
index 114f139..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.util.concurrent.PriorityBlockingQueue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.replication.WALEntryFilter;
-
-/**
- * Used by a {@link RecoveredReplicationSourceShipper}.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class);
-
- public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf,
- PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
- ReplicationSource source) {
- super(fs, conf, logQueue, startPosition, filter, source);
- }
-
- @Override
- protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
- LOG.trace("Didn't read any new entries from WAL");
- // we're done with queue recovery, shut ourself down
- setReaderRunning(false);
- // shuts down shipper thread immediately
- entryBatchQueue.put(new WALEntryBatch(replicationBatchCountCapacity, currentPath));
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index f5e4185..3480919 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -85,7 +85,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
// per group queue size, keep no more than this number of logs in each wal group
protected int queueSizePerGroup;
protected ReplicationQueueStorage queueStorage;
- private ReplicationPeer replicationPeer;
+ protected ReplicationPeer replicationPeer;
protected Configuration conf;
protected ReplicationQueueInfo replicationQueueInfo;
@@ -294,26 +294,32 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.walEntryFilter = new ChainWALEntryFilter(filters);
}
- protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
- ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, walGroupId, queue, this);
+ private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
+ ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
- LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
+ LOG.debug("Someone has beat us to start a worker thread for wal group {}", walGroupId);
} else {
- LOG.debug("Starting up worker for wal group " + walGroupId);
+ LOG.debug("Starting up worker for wal group {}", walGroupId);
+ ReplicationSourceWALReader walReader =
+ createNewWALReader(walGroupId, queue, worker.getStartPosition());
+ Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException);
+ worker.setWALReader(walReader);
worker.startup(this::uncaughtException);
- worker.setWALReader(
- startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
}
}
- protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
+ protected ReplicationSourceShipper createNewShipper(String walGroupId,
+ PriorityBlockingQueue<Path> queue) {
+ return new ReplicationSourceShipper(conf, walGroupId, queue, this);
+ }
+
+ protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
PriorityBlockingQueue<Path> queue, long startPosition) {
- ReplicationSourceWALReader walReader =
- new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
- return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader,
- threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
- this::uncaughtException);
+ return replicationPeer.getPeerConfig().isSerial()
+ ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
+ : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
}
protected final void uncaughtException(Thread t, Throwable e) {
@@ -392,10 +398,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
return replicationPeer.isPeerEnabled();
}
- public boolean isSerial() {
- return replicationPeer.getPeerConfig().isSerial();
- }
-
private void initialize() {
int sleepMultiplier = 1;
while (this.isSourceActive()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 50aaf95..aa5251e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -83,7 +83,7 @@ public class ReplicationSourceShipper extends Thread {
}
@Override
- public void run() {
+ public final void run() {
setWorkerState(WorkerState.RUNNING);
// Loop until we close down
while (isActive()) {
@@ -95,28 +95,31 @@ public class ReplicationSourceShipper extends Thread {
}
continue;
}
-
- while (entryReader == null) {
- if (sleepForRetries("Replication WAL entry reader thread not initialized",
- sleepMultiplier)) {
- sleepMultiplier++;
- }
- }
-
try {
WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch);
+ postShipEdits(entryBatch);
} catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt();
}
}
// If the worker exits run loop without finishing its task, mark it as stopped.
- if (state != WorkerState.FINISHED) {
+ if (!isFinished()) {
setWorkerState(WorkerState.STOPPED);
+ } else {
+ postFinish();
}
}
+ // To be implemented by recovered shipper
+ protected void postShipEdits(WALEntryBatch entryBatch) {
+ }
+
+ // To be implemented by recovered shipper
+ protected void postFinish() {
+ }
+
/**
* Do the shipping logic
*/
@@ -229,8 +232,8 @@ public class ReplicationSourceShipper extends Thread {
public void startup(UncaughtExceptionHandler handler) {
String name = Thread.currentThread().getName();
- Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
- + source.getQueueId(), handler);
+ Threads.setDaemonThreadRunning(this,
+ name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
}
public PriorityBlockingQueue<Path> getLogQueue() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index da92a09..b125133 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -51,7 +50,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class ReplicationSourceWALReader extends Thread {
+class ReplicationSourceWALReader extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
private final PriorityBlockingQueue<Path> logQueue;
@@ -64,28 +63,19 @@ public class ReplicationSourceWALReader extends Thread {
// max (heap) size of each batch - multiply by number of batches in queue to get total
private final long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total
- protected final int replicationBatchCountCapacity;
+ private final int replicationBatchCountCapacity;
// position in the WAL to start reading at
private long currentPosition;
private final long sleepForRetries;
private final int maxRetriesMultiplier;
private final boolean eofAutoRecovery;
- // used to store the first cell in an entry before filtering. This is because that if serial
- // replication is enabled, we may find out that an entry can not be pushed after filtering. And
- // when we try the next time, the cells maybe null since the entry has already been filtered,
- // especially for region event wal entries. And this can also used to determine whether we can
- // skip filtering.
- private Cell firstCellInEntryBeforeFiltering;
-
//Indicates whether this particular worker is running
private boolean isReaderRunning = true;
private AtomicLong totalBufferUsed;
private long totalBufferQuota;
- private final SerialReplicationChecker serialReplicationChecker;
-
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
@@ -120,7 +110,6 @@ public class ReplicationSourceWALReader extends Thread {
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
- this.serialReplicationChecker = new SerialReplicationChecker(conf, source);
LOG.info("peerClusterZnode=" + source.getQueueId()
+ ", ReplicationSourceWALReaderThread : " + source.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
@@ -169,75 +158,35 @@ public class ReplicationSourceWALReader extends Thread {
}
}
- private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch)
- throws IOException {
- entryStream.next();
- firstCellInEntryBeforeFiltering = null;
- batch.setLastWalPosition(entryStream.getPosition());
+ // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return.
+ protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
+ WALEdit edit = entry.getEdit();
+ if (edit == null || edit.isEmpty()) {
+ return false;
+ }
+ long entrySize = getEntrySize(entry);
+ batch.addEntry(entry);
+ updateBatchStats(batch, entry, entrySize);
+ boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
+ // Stop if too many entries or too big
+ return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
+ batch.getNbEntries() >= replicationBatchCountCapacity;
}
- private WALEntryBatch readWALEntries(WALEntryStream entryStream)
+ protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
if (!entryStream.hasNext()) {
return null;
}
- long positionBefore = entryStream.getPosition();
- WALEntryBatch batch =
- new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
+ WALEntryBatch batch = createBatch(entryStream);
do {
- Entry entry = entryStream.peek();
- boolean isSerial = source.isSerial();
- boolean doFiltering = true;
- if (isSerial) {
- if (firstCellInEntryBeforeFiltering == null) {
- assert !entry.getEdit().isEmpty() : "should not write empty edits";
- // Used to locate the region record in meta table. In WAL we only have the table name and
- // encoded region name which can not be mapping to region name without scanning all the
- // records for a table, so we need a start key, just like what we have done at client side
- // when locating a region. For the markers, we will use the start key of the region as the
- // row key for the edit. And we need to do this before filtering since all the cells may
- // be filtered out, especially that for the markers.
- firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0);
- } else {
- // if this is not null then we know that the entry has already been filtered.
- doFiltering = false;
- }
- }
-
- if (doFiltering) {
- entry = filterEntry(entry);
- }
+ Entry entry = entryStream.next();
+ batch.setLastWalPosition(entryStream.getPosition());
+ entry = filterEntry(entry);
if (entry != null) {
- if (isSerial) {
- if (!serialReplicationChecker.canPush(entry, firstCellInEntryBeforeFiltering)) {
- if (batch.getLastWalPosition() > positionBefore) {
- // we have something that can push, break
- break;
- } else {
- serialReplicationChecker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
- }
- }
- // arrive here means we can push the entry, record the last sequence id
- batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
- entry.getKey().getSequenceId());
+ if (addEntryToBatch(batch, entry)) {
+ break;
}
- // actually remove the entry.
- removeEntryFromStream(entryStream, batch);
- WALEdit edit = entry.getEdit();
- if (edit != null && !edit.isEmpty()) {
- long entrySize = getEntrySize(entry);
- batch.addEntry(entry);
- updateBatchStats(batch, entry, entrySize);
- boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
- // Stop if too many entries or too big
- if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
- batch.getNbEntries() >= replicationBatchCountCapacity) {
- break;
- }
- }
- } else {
- // actually remove the entry.
- removeEntryFromStream(entryStream, batch);
}
} while (entryStream.hasNext());
return batch;
@@ -286,7 +235,11 @@ public class ReplicationSourceWALReader extends Thread {
return true;
}
- private Entry filterEntry(Entry entry) {
+ protected final WALEntryBatch createBatch(WALEntryStream entryStream) {
+ return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
+ }
+
+ protected final Entry filterEntry(Entry entry) {
Entry filtered = filter.filter(entry);
if (entry != null && filtered == null) {
source.getSourceMetrics().incrLogEditsFiltered();
http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
new file mode 100644
index 0000000..5e9a9f6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.PriorityBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * WAL reader for a serial replication peer.
+ */
+@InterfaceAudience.Private
+public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader {
+
+ // used to store the first cell in an entry before filtering. This is because that if serial
+ // replication is enabled, we may find out that an entry can not be pushed after filtering. And
+ // when we try the next time, the cells maybe null since the entry has already been filtered,
+ // especially for region event wal entries. And this can also used to determine whether we can
+ // skip filtering.
+ private Cell firstCellInEntryBeforeFiltering;
+
+ private final SerialReplicationChecker checker;
+
+ public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
+ PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
+ ReplicationSource source) {
+ super(fs, conf, logQueue, startPosition, filter, source);
+ checker = new SerialReplicationChecker(conf, source);
+ }
+
+ @Override
+ protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
+ throws IOException, InterruptedException {
+ if (!entryStream.hasNext()) {
+ return null;
+ }
+ long positionBefore = entryStream.getPosition();
+ WALEntryBatch batch = createBatch(entryStream);
+ do {
+ Entry entry = entryStream.peek();
+ boolean doFiltering = true;
+ if (firstCellInEntryBeforeFiltering == null) {
+ assert !entry.getEdit().isEmpty() : "should not write empty edits";
+ // Used to locate the region record in meta table. In WAL we only have the table name and
+ // encoded region name which can not be mapping to region name without scanning all the
+ // records for a table, so we need a start key, just like what we have done at client side
+ // when locating a region. For the markers, we will use the start key of the region as the
+ // row key for the edit. And we need to do this before filtering since all the cells may
+ // be filtered out, especially that for the markers.
+ firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0);
+ } else {
+ // if this is not null then we know that the entry has already been filtered.
+ doFiltering = false;
+ }
+
+ if (doFiltering) {
+ entry = filterEntry(entry);
+ }
+ if (entry != null) {
+ if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) {
+ if (batch.getLastWalPosition() > positionBefore) {
+ // we have something that can push, break
+ break;
+ } else {
+ checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
+ }
+ }
+ // arrive here means we can push the entry, record the last sequence id
+ batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
+ entry.getKey().getSequenceId());
+ // actually remove the entry.
+ removeEntryFromStream(entryStream, batch);
+ if (addEntryToBatch(batch, entry)) {
+ break;
+ }
+ } else {
+ // actually remove the entry.
+ removeEntryFromStream(entryStream, batch);
+ }
+ } while (entryStream.hasNext());
+ return batch;
+ }
+
+ private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch)
+ throws IOException {
+ entryStream.next();
+ firstCellInEntryBeforeFiltering = null;
+ batch.setLastWalPosition(entryStream.getPosition());
+ }
+}