You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/06/19 01:30:07 UTC
[1/2] hbase git commit: HBASE-18170 Refactor
ReplicationSourceWALReaderThread
Repository: hbase
Updated Branches:
refs/heads/master d49208b05 -> c6e71f159
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
deleted file mode 100644
index c1af6e6..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ /dev/null
@@ -1,502 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.regionserver.RSRpcServices;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
-import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-
-/**
- * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
- *
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class ReplicationSourceWALReaderThread extends Thread {
- private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReaderThread.class);
-
- private PriorityBlockingQueue<Path> logQueue;
- private FileSystem fs;
- private Configuration conf;
- private BlockingQueue<WALEntryBatch> entryBatchQueue;
- // max (heap) size of each batch - multiply by number of batches in queue to get total
- private long replicationBatchSizeCapacity;
- // max count of each batch - multiply by number of batches in queue to get total
- private int replicationBatchCountCapacity;
- // position in the WAL to start reading at
- private long currentPosition;
- private WALEntryFilter filter;
- private long sleepForRetries;
- //Indicates whether this particular worker is running
- private boolean isReaderRunning = true;
- private ReplicationQueueInfo replicationQueueInfo;
- private int maxRetriesMultiplier;
- private MetricsSource metrics;
-
- private AtomicLong totalBufferUsed;
- private long totalBufferQuota;
-
- /**
- * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
- * entries, and puts them on a batch queue.
- * @param manager replication manager
- * @param replicationQueueInfo
- * @param logQueue The WAL queue to read off of
- * @param startPosition position in the first WAL to start reading from
- * @param fs the files system to use
- * @param conf configuration to use
- * @param filter The filter to use while reading
- * @param metrics replication metrics
- */
- public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
- ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue<Path> logQueue,
- long startPosition,
- FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) {
- this.replicationQueueInfo = replicationQueueInfo;
- this.logQueue = logQueue;
- this.currentPosition = startPosition;
- this.fs = fs;
- this.conf = conf;
- this.filter = filter;
- this.replicationBatchSizeCapacity =
- this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
- this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
- // memory used will be batchSizeCapacity * (nb.batches + 1)
- // the +1 is for the current thread reading before placing onto the queue
- int batchCount = conf.getInt("replication.source.nb.batches", 1);
- this.totalBufferUsed = manager.getTotalBufferUsed();
- this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
- HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
- this.sleepForRetries =
- this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
- this.maxRetriesMultiplier =
- this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
- this.metrics = metrics;
- this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
- LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode()
- + ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId()
- + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
- + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
- + ", replicationBatchQueueCapacity=" + batchCount);
- }
-
- @Override
- public void run() {
- int sleepMultiplier = 1;
- while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
- try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
- while (isReaderRunning()) { // loop here to keep reusing stream while we can
- if (!checkQuota()) {
- continue;
- }
- WALEntryBatch batch = null;
- while (entryStream.hasNext()) {
- if (batch == null) {
- batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
- }
- Entry entry = entryStream.next();
- if (updateSerialReplPos(batch, entry)) {
- batch.lastWalPosition = entryStream.getPosition();
- break;
- }
- entry = filterEntry(entry);
- if (entry != null) {
- WALEdit edit = entry.getEdit();
- if (edit != null && !edit.isEmpty()) {
- long entrySize = getEntrySize(entry);
- batch.addEntry(entry);
- updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
- boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
- // Stop if too many entries or too big
- if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
- || batch.getNbEntries() >= replicationBatchCountCapacity) {
- break;
- }
- }
- }
- }
- if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format("Read %s WAL entries eligible for replication",
- batch.getNbEntries()));
- }
- entryBatchQueue.put(batch);
- sleepMultiplier = 1;
- } else { // got no entries and didn't advance position in WAL
- LOG.trace("Didn't read any new entries from WAL");
- if (replicationQueueInfo.isQueueRecovered()) {
- // we're done with queue recovery, shut ourself down
- setReaderRunning(false);
- // shuts down shipper thread immediately
- entryBatchQueue.put(batch != null ? batch
- : new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()));
- } else {
- Thread.sleep(sleepForRetries);
- }
- }
- currentPosition = entryStream.getPosition();
- entryStream.reset(); // reuse stream
- }
- } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
- if (sleepMultiplier < maxRetriesMultiplier) {
- LOG.debug("Failed to read stream of replication entries: " + e);
- sleepMultiplier++;
- } else {
- LOG.error("Failed to read stream of replication entries", e);
- handleEofException(e);
- }
- Threads.sleep(sleepForRetries * sleepMultiplier);
- } catch (InterruptedException e) {
- LOG.trace("Interrupted while sleeping between WAL reads");
- Thread.currentThread().interrupt();
- }
- }
- }
-
- // if we get an EOF due to a zero-length log, and there are other logs in queue
- // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
- // enabled, then dump the log
- private void handleEofException(Exception e) {
- if (e.getCause() instanceof EOFException && logQueue.size() > 1
- && conf.getBoolean("replication.source.eof.autorecovery", false)) {
- try {
- if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
- LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
- logQueue.remove();
- currentPosition = 0;
- }
- } catch (IOException ioe) {
- LOG.warn("Couldn't get file length information about log " + logQueue.peek());
- }
- }
- }
-
- public Path getCurrentPath() {
- // if we've read some WAL entries, get the Path we read from
- WALEntryBatch batchQueueHead = entryBatchQueue.peek();
- if (batchQueueHead != null) {
- return batchQueueHead.lastWalPath;
- }
- // otherwise, we must be currently reading from the head of the log queue
- return logQueue.peek();
- }
-
- //returns false if we've already exceeded the global quota
- private boolean checkQuota() {
- // try not to go over total quota
- if (totalBufferUsed.get() > totalBufferQuota) {
- Threads.sleep(sleepForRetries);
- return false;
- }
- return true;
- }
-
- private Entry filterEntry(Entry entry) {
- Entry filtered = filter.filter(entry);
- if (entry != null && filtered == null) {
- metrics.incrLogEditsFiltered();
- }
- return filtered;
- }
-
- /**
- * @return true if we should stop reading because we're at REGION_CLOSE
- */
- private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException {
- if (entry.hasSerialReplicationScope()) {
- String key = Bytes.toString(entry.getKey().getEncodedRegionName());
- batch.setLastPosition(key, entry.getKey().getSequenceId());
- if (!entry.getEdit().getCells().isEmpty()) {
- WALProtos.RegionEventDescriptor maybeEvent =
- WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
- if (maybeEvent != null && maybeEvent
- .getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
- // In serially replication, if we move a region to another RS and move it back, we may
- // read logs crossing two sections. We should break at REGION_CLOSE and push the first
- // section first in case of missing the middle section belonging to the other RS.
- // In a worker thread, if we can push the first log of a region, we can push all logs
- // in the same region without waiting until we read a close marker because next time
- // we read logs in this region, it must be a new section and not adjacent with this
- // region. Mark it negative.
- batch.setLastPosition(key, -entry.getKey().getSequenceId());
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
- * batch to become available
- * @return A batch of entries, along with the position in the log after reading the batch
- * @throws InterruptedException if interrupted while waiting
- */
- public WALEntryBatch take() throws InterruptedException {
- return entryBatchQueue.take();
- }
-
- private long getEntrySize(Entry entry) {
- WALEdit edit = entry.getEdit();
- return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
- }
-
- private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
- WALEdit edit = entry.getEdit();
- if (edit != null && !edit.isEmpty()) {
- batch.incrementHeapSize(entrySize);
- Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
- batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
- batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
- }
- batch.lastWalPosition = entryPosition;
- }
-
- /**
- * Count the number of different row keys in the given edit because of mini-batching. We assume
- * that there's at least one Cell in the WALEdit.
- * @param edit edit to count row keys from
- * @return number of different row keys and HFiles
- */
- private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
- List<Cell> cells = edit.getCells();
- int distinctRowKeys = 1;
- int totalHFileEntries = 0;
- Cell lastCell = cells.get(0);
-
- int totalCells = edit.size();
- for (int i = 0; i < totalCells; i++) {
- // Count HFiles to be replicated
- if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
- try {
- BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
- List<StoreDescriptor> stores = bld.getStoresList();
- int totalStores = stores.size();
- for (int j = 0; j < totalStores; j++) {
- totalHFileEntries += stores.get(j).getStoreFileList().size();
- }
- } catch (IOException e) {
- LOG.error("Failed to deserialize bulk load entry from wal edit. "
- + "Then its hfiles count will not be added into metric.");
- }
- }
-
- if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
- distinctRowKeys++;
- }
- lastCell = cells.get(i);
- }
-
- Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
- return result;
- }
-
- /**
- * Calculate the total size of all the store files
- * @param edit edit to count row keys from
- * @return the total size of the store files
- */
- private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
- List<Cell> cells = edit.getCells();
- int totalStoreFilesSize = 0;
-
- int totalCells = edit.size();
- for (int i = 0; i < totalCells; i++) {
- if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
- try {
- BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
- List<StoreDescriptor> stores = bld.getStoresList();
- int totalStores = stores.size();
- for (int j = 0; j < totalStores; j++) {
- totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes();
- }
- } catch (IOException e) {
- LOG.error("Failed to deserialize bulk load entry from wal edit. "
- + "Size of HFiles part of cell will not be considered in replication "
- + "request size calculation.",
- e);
- }
- }
- }
- return totalStoreFilesSize;
- }
-
- /**
- * @param size delta size for grown buffer
- * @return true if we should clear buffer and push all
- */
- private boolean acquireBufferQuota(long size) {
- return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
- }
-
- /**
- * @return whether the reader thread is running
- */
- public boolean isReaderRunning() {
- return isReaderRunning && !isInterrupted();
- }
-
- /**
- * @param readerRunning the readerRunning to set
- */
- public void setReaderRunning(boolean readerRunning) {
- this.isReaderRunning = readerRunning;
- }
-
- /**
- * Holds a batch of WAL entries to replicate, along with some statistics
- *
- */
- static class WALEntryBatch {
- private List<Entry> walEntries;
- // last WAL that was read
- private Path lastWalPath;
- // position in WAL of last entry in this batch
- private long lastWalPosition = 0;
- // number of distinct row keys in this batch
- private int nbRowKeys = 0;
- // number of HFiles
- private int nbHFiles = 0;
- // heap size of data we need to replicate
- private long heapSize = 0;
- // save the last sequenceid for each region if the table has serial-replication scope
- private Map<String, Long> lastSeqIds = new HashMap<>();
-
- /**
- * @param walEntries
- * @param lastWalPath Path of the WAL the last entry in this batch was read from
- * @param lastWalPosition Position in the WAL the last entry in this batch was read from
- */
- private WALEntryBatch(int maxNbEntries, Path lastWalPath) {
- this.walEntries = new ArrayList<>(maxNbEntries);
- this.lastWalPath = lastWalPath;
- }
-
- public void addEntry(Entry entry) {
- walEntries.add(entry);
- }
-
- /**
- * @return the WAL Entries.
- */
- public List<Entry> getWalEntries() {
- return walEntries;
- }
-
- /**
- * @return the path of the last WAL that was read.
- */
- public Path getLastWalPath() {
- return lastWalPath;
- }
-
- /**
- * @return the position in the last WAL that was read.
- */
- public long getLastWalPosition() {
- return lastWalPosition;
- }
-
- public int getNbEntries() {
- return walEntries.size();
- }
-
- /**
- * @return the number of distinct row keys in this batch
- */
- public int getNbRowKeys() {
- return nbRowKeys;
- }
-
- /**
- * @return the number of HFiles in this batch
- */
- public int getNbHFiles() {
- return nbHFiles;
- }
-
- /**
- * @return total number of operations in this batch
- */
- public int getNbOperations() {
- return getNbRowKeys() + getNbHFiles();
- }
-
- /**
- * @return the heap size of this batch
- */
- public long getHeapSize() {
- return heapSize;
- }
-
- /**
- * @return the last sequenceid for each region if the table has serial-replication scope
- */
- public Map<String, Long> getLastSeqIds() {
- return lastSeqIds;
- }
-
- private void incrementNbRowKeys(int increment) {
- nbRowKeys += increment;
- }
-
- private void incrementNbHFiles(int increment) {
- nbHFiles += increment;
- }
-
- private void incrementHeapSize(long increment) {
- heapSize += increment;
- }
-
- private void setLastPosition(String region, Long sequenceId) {
- getLastSeqIds().put(region, sequenceId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 5337f38..ebbdef1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -348,8 +348,11 @@ public class TestWALEntryStream {
// start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
- ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
- fs, conf, getDummyFilter(), new MetricsSource("1"));
+ ReplicationSource source = Mockito.mock(ReplicationSource.class);
+ when(source.getSourceManager()).thenReturn(mockSourceManager);
+ when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
+ ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
+ walQueue, 0, getDummyFilter(), source);
Path walPath = walQueue.peek();
batcher.start();
WALEntryBatch entryBatch = batcher.take();
[2/2] hbase git commit: HBASE-18170 Refactor
ReplicationSourceWALReaderThread
Posted by zg...@apache.org.
HBASE-18170 Refactor ReplicationSourceWALReaderThread
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c6e71f15
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c6e71f15
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c6e71f15
Branch: refs/heads/master
Commit: c6e71f159cbd3f993b7fe361a5e5d50352efb306
Parents: d49208b
Author: Guanghao Zhang <zg...@apache.org>
Authored: Sat Jun 17 00:45:52 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Mon Jun 19 09:26:45 2017 +0800
----------------------------------------------------------------------
.../RecoveredReplicationSource.java | 23 +-
.../RecoveredReplicationSourceShipper.java | 151 ++++++
...RecoveredReplicationSourceShipperThread.java | 151 ------
.../RecoveredReplicationSourceWALReader.java | 55 ++
.../regionserver/ReplicationSource.java | 67 +--
.../regionserver/ReplicationSourceShipper.java | 359 +++++++++++++
.../ReplicationSourceShipperThread.java | 359 -------------
.../ReplicationSourceWALReader.java | 502 +++++++++++++++++++
.../ReplicationSourceWALReaderThread.java | 502 -------------------
.../regionserver/TestWALEntryStream.java | 9 +-
10 files changed, 1124 insertions(+), 1054 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index d3bcff1..158330e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -61,22 +61,33 @@ public class RecoveredReplicationSource extends ReplicationSource {
}
@Override
- protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue) {
- final RecoveredReplicationSourceShipperThread worker =
- new RecoveredReplicationSourceShipperThread(conf, walGroupId, queue, this,
+ protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
+ final RecoveredReplicationSourceShipper worker =
+ new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this,
this.replicationQueues);
- ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
+ ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup(getUncaughtExceptionHandler());
worker.setWALReader(
- startNewWALReaderThread(worker.getName(), walGroupId, queue, worker.getStartPosition()));
+ startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
workerThreads.put(walGroupId, worker);
}
}
+ @Override
+ protected ReplicationSourceWALReader startNewWALReader(String threadName,
+ String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) {
+ ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs,
+ conf, queue, startPosition, walEntryFilter, this);
+ Threads.setDaemonThreadRunning(walReader, threadName
+ + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
+ getUncaughtExceptionHandler());
+ return walReader;
+ }
+
public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
boolean hasPathChanged = false;
PriorityBlockingQueue<Path> newPaths =
@@ -161,7 +172,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
synchronized (workerThreads) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
boolean allTasksDone = true;
- for (ReplicationSourceShipperThread worker : workerThreads.values()) {
+ for (ReplicationSourceShipper worker : workerThreads.values()) {
if (!worker.isFinished()) {
allTasksDone = false;
break;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
new file mode 100644
index 0000000..a737910
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * Used by a {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper {
+
+ private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceShipper.class);
+ protected final RecoveredReplicationSource source;
+ private final ReplicationQueues replicationQueues;
+
+ public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
+ PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
+ ReplicationQueues replicationQueues) {
+ super(conf, walGroupId, queue, source);
+ this.source = source;
+ this.replicationQueues = replicationQueues;
+ }
+
+ @Override
+ public void run() {
+ setWorkerState(WorkerState.RUNNING);
+ // Loop until we close down
+ while (isActive()) {
+ int sleepMultiplier = 1;
+ // Sleep until replication is enabled again
+ if (!source.isPeerEnabled()) {
+ if (source.sleepForRetries("Replication is disabled", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+
+ while (entryReader == null) {
+ if (source.sleepForRetries("Replication WAL entry reader thread not initialized",
+ sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+
+ try {
+ WALEntryBatch entryBatch = entryReader.take();
+ shipEdits(entryBatch);
+ if (entryBatch.getWalEntries().isEmpty()
+ && entryBatch.getLastSeqIds().isEmpty()) {
+ LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ + source.getPeerClusterZnode());
+ source.getSourceMetrics().incrCompletedRecoveryQueue();
+ setWorkerState(WorkerState.FINISHED);
+ continue;
+ }
+ } catch (InterruptedException e) {
+ LOG.trace("Interrupted while waiting for next replication entry batch", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ source.tryFinish();
+ // If the worker exits run loop without finishing its task, mark it as stopped.
+ if (!isFinished()) {
+ setWorkerState(WorkerState.STOPPED);
+ }
+ }
+
+ @Override
+ public long getStartPosition() {
+ long startPosition = getRecoveredQueueStartPos();
+ int numRetries = 0;
+ while (numRetries <= maxRetriesMultiplier) {
+ try {
+ source.locateRecoveredPaths(queue);
+ break;
+ } catch (IOException e) {
+ LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
+ numRetries++;
+ }
+ }
+ return startPosition;
+ }
+
+ // If this is a recovered queue, the queue is already full and the first log
+ // normally has a position (unless the RS failed between 2 logs)
+ private long getRecoveredQueueStartPos() {
+ long startPosition = 0;
+ String peerClusterZnode = source.getPeerClusterZnode();
+ try {
+ startPosition = this.replicationQueues.getLogPosition(peerClusterZnode,
+ this.queue.peek().getName());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
+ + startPosition);
+ }
+ } catch (ReplicationException e) {
+ terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
+ }
+ return startPosition;
+ }
+
+ @Override
+ protected void updateLogPosition(long lastReadPosition) {
+ source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
+ lastReadPosition, true, false);
+ lastLoggedPosition = lastReadPosition;
+ }
+
+ private void terminate(String reason, Exception cause) {
+ if (cause == null) {
+ LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
+
+ } else {
+ LOG.error("Closing worker for wal group " + this.walGroupId
+ + " because an error occurred: " + reason, cause);
+ }
+ entryReader.interrupt();
+ Threads.shutdown(entryReader, sleepForRetries);
+ this.interrupt();
+ Threads.shutdown(this, sleepForRetries);
+ LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
deleted file mode 100644
index 65aeb2f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-import java.util.concurrent.PriorityBlockingQueue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
-import org.apache.hadoop.hbase.util.Threads;
-
-/**
- * Used by a {@link RecoveredReplicationSource}.
- */
-@InterfaceAudience.Private
-public class RecoveredReplicationSourceShipperThread extends ReplicationSourceShipperThread {
-
- private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceShipperThread.class);
- protected final RecoveredReplicationSource source;
- private final ReplicationQueues replicationQueues;
-
- public RecoveredReplicationSourceShipperThread(Configuration conf, String walGroupId,
- PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
- ReplicationQueues replicationQueues) {
- super(conf, walGroupId, queue, source);
- this.source = source;
- this.replicationQueues = replicationQueues;
- }
-
- @Override
- public void run() {
- setWorkerState(WorkerState.RUNNING);
- // Loop until we close down
- while (isActive()) {
- int sleepMultiplier = 1;
- // Sleep until replication is enabled again
- if (!source.isPeerEnabled()) {
- if (source.sleepForRetries("Replication is disabled", sleepMultiplier)) {
- sleepMultiplier++;
- }
- continue;
- }
-
- while (entryReader == null) {
- if (source.sleepForRetries("Replication WAL entry reader thread not initialized",
- sleepMultiplier)) {
- sleepMultiplier++;
- }
- }
-
- try {
- WALEntryBatch entryBatch = entryReader.take();
- shipEdits(entryBatch);
- if (entryBatch.getWalEntries().isEmpty()
- && entryBatch.getLastSeqIds().isEmpty()) {
- LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
- + source.getPeerClusterZnode());
- source.getSourceMetrics().incrCompletedRecoveryQueue();
- setWorkerState(WorkerState.FINISHED);
- continue;
- }
- } catch (InterruptedException e) {
- LOG.trace("Interrupted while waiting for next replication entry batch", e);
- Thread.currentThread().interrupt();
- }
- }
- source.tryFinish();
- // If the worker exits run loop without finishing its task, mark it as stopped.
- if (!isFinished()) {
- setWorkerState(WorkerState.STOPPED);
- }
- }
-
- @Override
- public long getStartPosition() {
- long startPosition = getRecoveredQueueStartPos();
- int numRetries = 0;
- while (numRetries <= maxRetriesMultiplier) {
- try {
- source.locateRecoveredPaths(queue);
- break;
- } catch (IOException e) {
- LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
- numRetries++;
- }
- }
- return startPosition;
- }
-
- // If this is a recovered queue, the queue is already full and the first log
- // normally has a position (unless the RS failed between 2 logs)
- private long getRecoveredQueueStartPos() {
- long startPosition = 0;
- String peerClusterZnode = source.getPeerClusterZnode();
- try {
- startPosition = this.replicationQueues.getLogPosition(peerClusterZnode,
- this.queue.peek().getName());
- if (LOG.isTraceEnabled()) {
- LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
- + startPosition);
- }
- } catch (ReplicationException e) {
- terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
- }
- return startPosition;
- }
-
- @Override
- protected void updateLogPosition(long lastReadPosition) {
- source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
- lastReadPosition, true, false);
- lastLoggedPosition = lastReadPosition;
- }
-
- private void terminate(String reason, Exception cause) {
- if (cause == null) {
- LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
-
- } else {
- LOG.error("Closing worker for wal group " + this.walGroupId
- + " because an error occurred: " + reason, cause);
- }
- entryReader.interrupt();
- Threads.shutdown(entryReader, sleepForRetries);
- this.interrupt();
- Threads.shutdown(this, sleepForRetries);
- LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
new file mode 100644
index 0000000..6462a2a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
@@ -0,0 +1,55 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+
+/**
+ * Used by a {@link RecoveredReplicationSourceShipper}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader {
+ private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceWALReader.class);
+
+ public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf,
+ PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
+ ReplicationSource source) {
+ super(fs, conf, logQueue, startPosition, filter, source);
+ }
+
+ protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
+ throws InterruptedException {
+ LOG.trace("Didn't read any new entries from WAL");
+ // we're done with queue recovery, shut ourself down
+ setReaderRunning(false);
+ // shuts down shipper thread immediately
+ entryBatchQueue.put(batch != null ? batch
+ : new WALEntryBatch(replicationBatchCountCapacity, currentPath));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 1dbf07f..3d4353f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -119,12 +119,12 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// ReplicationEndpoint which will handle the actual replication
private ReplicationEndpoint replicationEndpoint;
// A filter (or a chain of filters) for the WAL entries.
- private WALEntryFilter walEntryFilter;
+ protected WALEntryFilter walEntryFilter;
// throttler
private ReplicationThrottler throttler;
private long defaultBandwidth;
private long currentBandwidth;
- protected final ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads =
+ protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
new ConcurrentHashMap<>();
private AtomicLong totalBufferUsed;
@@ -197,7 +197,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
- tryStartNewShipperThread(logPrefix, queue);
+ tryStartNewShipper(logPrefix, queue);
}
}
queue.put(log);
@@ -255,15 +255,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
throw new RuntimeException(ex);
}
- // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
- ArrayList<WALEntryFilter> filters = Lists.newArrayList(
- (WALEntryFilter)new SystemTableWALEntryFilter());
- WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
- if (filterFromEndpoint != null) {
- filters.add(filterFromEndpoint);
- }
- this.walEntryFilter = new ChainWALEntryFilter(filters);
-
int sleepMultiplier = 1;
// delay this until we are in an asynchronous thread
while (this.isSourceActive() && this.peerClusterId == null) {
@@ -285,40 +276,50 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return;
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
+
+ initializeWALEntryFilter();
// start workers
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
- tryStartNewShipperThread(walGroupId, queue);
+ tryStartNewShipper(walGroupId, queue);
}
}
- protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue) {
- final ReplicationSourceShipperThread worker = new ReplicationSourceShipperThread(conf,
+ private void initializeWALEntryFilter() {
+ // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
+ ArrayList<WALEntryFilter> filters = Lists.newArrayList(
+ (WALEntryFilter)new SystemTableWALEntryFilter());
+ WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
+ if (filterFromEndpoint != null) {
+ filters.add(filterFromEndpoint);
+ }
+ filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
+ this.walEntryFilter = new ChainWALEntryFilter(filters);
+ }
+
+ protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
+ final ReplicationSourceShipper worker = new ReplicationSourceShipper(conf,
walGroupId, queue, this);
- ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
+ ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup(getUncaughtExceptionHandler());
- worker.setWALReader(startNewWALReaderThread(worker.getName(), walGroupId, queue,
+ worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue,
worker.getStartPosition()));
workerThreads.put(walGroupId, worker);
}
}
- protected ReplicationSourceWALReaderThread startNewWALReaderThread(String threadName,
- String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) {
- ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
- new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
- ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
- ReplicationSourceWALReaderThread walReader = new ReplicationSourceWALReaderThread(manager,
- replicationQueueInfo, queue, startPosition, fs, conf, readerFilter, metrics);
- Threads.setDaemonThreadRunning(walReader, threadName
- + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
+ protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
+ PriorityBlockingQueue<Path> queue, long startPosition) {
+ ReplicationSourceWALReader walReader =
+ new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
+ return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader,
+ threadName + ".replicationSource.wal-reader." + walGroupId + "," + peerClusterZnode,
getUncaughtExceptionHandler());
- return walReader;
}
public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
@@ -446,8 +447,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
+ " because an error occurred: " + reason, cause);
}
this.sourceRunning = false;
- Collection<ReplicationSourceShipperThread> workers = workerThreads.values();
- for (ReplicationSourceShipperThread worker : workers) {
+ Collection<ReplicationSourceShipper> workers = workerThreads.values();
+ for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
worker.entryReader.interrupt();
worker.interrupt();
@@ -457,7 +458,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
future = this.replicationEndpoint.stop();
}
if (join) {
- for (ReplicationSourceShipperThread worker : workers) {
+ for (ReplicationSourceShipper worker : workers) {
Threads.shutdown(worker, this.sleepForRetries);
LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
}
@@ -486,7 +487,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
@Override
public Path getCurrentPath() {
// only for testing
- for (ReplicationSourceShipperThread worker : workerThreads.values()) {
+ for (ReplicationSourceShipper worker : workerThreads.values()) {
if (worker.getCurrentPath() != null) return worker.getCurrentPath();
}
return null;
@@ -524,9 +525,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
StringBuilder sb = new StringBuilder();
sb.append("Total replicated edits: ").append(totalReplicatedEdits)
.append(", current progress: \n");
- for (Map.Entry<String, ReplicationSourceShipperThread> entry : workerThreads.entrySet()) {
+ for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
String walGroupId = entry.getKey();
- ReplicationSourceShipperThread worker = entry.getValue();
+ ReplicationSourceShipper worker = entry.getValue();
long position = worker.getCurrentPosition();
Path currentPath = worker.getCurrentPath();
sb.append("walGroup [").append(walGroupId).append("]: ");
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
new file mode 100644
index 0000000..3e1e50b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -0,0 +1,359 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
+ * ReplicationSourceWALReaderThread
+ */
+@InterfaceAudience.Private
+public class ReplicationSourceShipper extends Thread {
+ private static final Log LOG = LogFactory.getLog(ReplicationSourceShipper.class);
+
+ // Hold the state of a replication worker thread
+ public enum WorkerState {
+ RUNNING,
+ STOPPED,
+ FINISHED, // The worker is done processing a recovered queue
+ }
+
+ protected final Configuration conf;
+ protected final String walGroupId;
+ protected final PriorityBlockingQueue<Path> queue;
+ protected final ReplicationSourceInterface source;
+
+ // Last position in the log that we sent to ZooKeeper
+ protected long lastLoggedPosition = -1;
+ // Path of the current log
+ protected volatile Path currentPath;
+ // Current state of the worker thread
+ private WorkerState state;
+ protected ReplicationSourceWALReader entryReader;
+
+ // How long should we sleep for each retry
+ protected final long sleepForRetries;
+ // Maximum number of retries before taking bold actions
+ protected final int maxRetriesMultiplier;
+
+ // Use guava cache to set ttl for each key
+ private final LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
+ .expireAfterAccess(1, TimeUnit.DAYS).build(
+ new CacheLoader<String, Boolean>() {
+ @Override
+ public Boolean load(String key) throws Exception {
+ return false;
+ }
+ }
+ );
+
+ public ReplicationSourceShipper(Configuration conf, String walGroupId,
+ PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
+ this.conf = conf;
+ this.walGroupId = walGroupId;
+ this.queue = queue;
+ this.source = source;
+ this.sleepForRetries =
+ this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
+ this.maxRetriesMultiplier =
+ this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
+ }
+
+ @Override
+ public void run() {
+ setWorkerState(WorkerState.RUNNING);
+ // Loop until we close down
+ while (isActive()) {
+ int sleepMultiplier = 1;
+ // Sleep until replication is enabled again
+ if (!source.isPeerEnabled()) {
+ if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+
+ while (entryReader == null) {
+ if (sleepForRetries("Replication WAL entry reader thread not initialized",
+ sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+
+ try {
+ WALEntryBatch entryBatch = entryReader.take();
+ for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
+ waitingUntilCanPush(entry);
+ }
+ shipEdits(entryBatch);
+ } catch (InterruptedException e) {
+ LOG.trace("Interrupted while waiting for next replication entry batch", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ // If the worker exits run loop without finishing its task, mark it as stopped.
+ if (state != WorkerState.FINISHED) {
+ setWorkerState(WorkerState.STOPPED);
+ }
+ }
+
+ /**
+ * Do the shipping logic
+ */
+ protected void shipEdits(WALEntryBatch entryBatch) {
+ List<Entry> entries = entryBatch.getWalEntries();
+ long lastReadPosition = entryBatch.getLastWalPosition();
+ currentPath = entryBatch.getLastWalPath();
+ int sleepMultiplier = 0;
+ if (entries.isEmpty()) {
+ if (lastLoggedPosition != lastReadPosition) {
+ // Save positions to meta table before zk.
+ updateSerialRepPositions(entryBatch.getLastSeqIds());
+ updateLogPosition(lastReadPosition);
+ // if there was nothing to ship and it's not an error
+ // set "ageOfLastShippedOp" to <now> to indicate that we're current
+ source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
+ walGroupId);
+ }
+ return;
+ }
+ int currentSize = (int) entryBatch.getHeapSize();
+ while (isActive()) {
+ try {
+ try {
+ source.tryThrottle(currentSize);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while sleeping for throttling control");
+ Thread.currentThread().interrupt();
+ // current thread might be interrupted to terminate
+ // directly go back to while() for confirm this
+ continue;
+ }
+
+ // create replicateContext here, so the entries can be GC'd upon return from this call
+ // stack
+ ReplicationEndpoint.ReplicateContext replicateContext =
+ new ReplicationEndpoint.ReplicateContext();
+ replicateContext.setEntries(entries).setSize(currentSize);
+ replicateContext.setWalGroupId(walGroupId);
+
+ long startTimeNs = System.nanoTime();
+ // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
+ boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
+ long endTimeNs = System.nanoTime();
+
+ if (!replicated) {
+ continue;
+ } else {
+ sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
+ }
+
+ if (this.lastLoggedPosition != lastReadPosition) {
+ //Clean up hfile references
+ int size = entries.size();
+ for (int i = 0; i < size; i++) {
+ cleanUpHFileRefs(entries.get(i).getEdit());
+ }
+
+ // Save positions to meta table before zk.
+ updateSerialRepPositions(entryBatch.getLastSeqIds());
+ //Log and clean up WAL logs
+ updateLogPosition(lastReadPosition);
+ }
+
+ source.postShipEdits(entries, currentSize);
+ // FIXME check relationship between wal group and overall
+ source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
+ entryBatch.getNbHFiles());
+ source.getSourceMetrics().setAgeOfLastShippedOp(
+ entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Replicated " + entries.size() + " entries or " + entryBatch.getNbOperations()
+ + " operations in " + ((endTimeNs - startTimeNs) / 1000000) + " ms");
+ }
+ break;
+ } catch (Exception ex) {
+ LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:"
+ + org.apache.hadoop.util.StringUtils.stringifyException(ex));
+ if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+ }
+ }
+
+ private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
+ String key = entry.getKey();
+ long seq = entry.getValue();
+ boolean deleteKey = false;
+ if (seq <= 0) {
+ // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
+ deleteKey = true;
+ seq = -seq;
+ }
+
+ if (!canSkipWaitingSet.getUnchecked(key)) {
+ try {
+ source.getSourceManager().waitUntilCanBePushed(Bytes.toBytes(key), seq, source.getPeerId());
+ } catch (IOException e) {
+ LOG.error("waitUntilCanBePushed fail", e);
+ throw new RuntimeException("waitUntilCanBePushed fail");
+ } catch (InterruptedException e) {
+ LOG.warn("waitUntilCanBePushed interrupted", e);
+ Thread.currentThread().interrupt();
+ }
+ canSkipWaitingSet.put(key, true);
+ }
+ if (deleteKey) {
+ canSkipWaitingSet.invalidate(key);
+ }
+ }
+
+ private void cleanUpHFileRefs(WALEdit edit) throws IOException {
+ String peerId = source.getPeerId();
+ if (peerId.contains("-")) {
+ // peerClusterZnode will be in the form peerId + "-" + rsZNode.
+ // A peerId will not have "-" in its name, see HBASE-11394
+ peerId = peerId.split("-")[0];
+ }
+ List<Cell> cells = edit.getCells();
+ int totalCells = cells.size();
+ for (int i = 0; i < totalCells; i++) {
+ Cell cell = cells.get(i);
+ if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+ List<StoreDescriptor> stores = bld.getStoresList();
+ int totalStores = stores.size();
+ for (int j = 0; j < totalStores; j++) {
+ List<String> storeFileList = stores.get(j).getStoreFileList();
+ source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
+ source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
+ }
+ }
+ }
+ }
+
+ protected void updateLogPosition(long lastReadPosition) {
+ source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
+ lastReadPosition, false, false);
+ lastLoggedPosition = lastReadPosition;
+ }
+
+ private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
+ try {
+ MetaTableAccessor.updateReplicationPositions(source.getSourceManager().getConnection(),
+ source.getPeerId(), lastPositionsForSerialScope);
+ } catch (IOException e) {
+ LOG.error("updateReplicationPositions fail", e);
+ throw new RuntimeException("updateReplicationPositions fail");
+ }
+ }
+
+ public void startup(UncaughtExceptionHandler handler) {
+ String name = Thread.currentThread().getName();
+ Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
+ + source.getPeerClusterZnode(), handler);
+ }
+
+ public PriorityBlockingQueue<Path> getLogQueue() {
+ return this.queue;
+ }
+
+ public Path getCurrentPath() {
+ return this.entryReader.getCurrentPath();
+ }
+
+ public long getCurrentPosition() {
+ return this.lastLoggedPosition;
+ }
+
+ public void setWALReader(ReplicationSourceWALReader entryReader) {
+ this.entryReader = entryReader;
+ }
+
+ public long getStartPosition() {
+ return 0;
+ }
+
+ protected boolean isActive() {
+ return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
+ }
+
+ public void setWorkerState(WorkerState state) {
+ this.state = state;
+ }
+
+ public WorkerState getWorkerState() {
+ return state;
+ }
+
+ public void stopWorker() {
+ setWorkerState(WorkerState.STOPPED);
+ }
+
+ public boolean isFinished() {
+ return state == WorkerState.FINISHED;
+ }
+
+ /**
+ * Do the sleeping logic
+ * @param msg Why we sleep
+ * @param sleepMultiplier by how many times the default sleeping time is augmented
+ * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code>
+ */
+ public boolean sleepForRetries(String msg, int sleepMultiplier) {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+ }
+ Thread.sleep(this.sleepForRetries * sleepMultiplier);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while sleeping between retries");
+ Thread.currentThread().interrupt();
+ }
+ return sleepMultiplier < maxRetriesMultiplier;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
deleted file mode 100644
index 6807da2..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
-/**
- * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
- * ReplicationSourceWALReaderThread
- */
-@InterfaceAudience.Private
-public class ReplicationSourceShipperThread extends Thread {
- private static final Log LOG = LogFactory.getLog(ReplicationSourceShipperThread.class);
-
- // Hold the state of a replication worker thread
- public enum WorkerState {
- RUNNING,
- STOPPED,
- FINISHED, // The worker is done processing a recovered queue
- }
-
- protected final Configuration conf;
- protected final String walGroupId;
- protected final PriorityBlockingQueue<Path> queue;
- protected final ReplicationSourceInterface source;
-
- // Last position in the log that we sent to ZooKeeper
- protected long lastLoggedPosition = -1;
- // Path of the current log
- protected volatile Path currentPath;
- // Current state of the worker thread
- private WorkerState state;
- protected ReplicationSourceWALReaderThread entryReader;
-
- // How long should we sleep for each retry
- protected final long sleepForRetries;
- // Maximum number of retries before taking bold actions
- protected final int maxRetriesMultiplier;
-
- // Use guava cache to set ttl for each key
- private final LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
- .expireAfterAccess(1, TimeUnit.DAYS).build(
- new CacheLoader<String, Boolean>() {
- @Override
- public Boolean load(String key) throws Exception {
- return false;
- }
- }
- );
-
- public ReplicationSourceShipperThread(Configuration conf, String walGroupId,
- PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
- this.conf = conf;
- this.walGroupId = walGroupId;
- this.queue = queue;
- this.source = source;
- this.sleepForRetries =
- this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
- this.maxRetriesMultiplier =
- this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
- }
-
- @Override
- public void run() {
- setWorkerState(WorkerState.RUNNING);
- // Loop until we close down
- while (isActive()) {
- int sleepMultiplier = 1;
- // Sleep until replication is enabled again
- if (!source.isPeerEnabled()) {
- if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
- sleepMultiplier++;
- }
- continue;
- }
-
- while (entryReader == null) {
- if (sleepForRetries("Replication WAL entry reader thread not initialized",
- sleepMultiplier)) {
- sleepMultiplier++;
- }
- }
-
- try {
- WALEntryBatch entryBatch = entryReader.take();
- for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
- waitingUntilCanPush(entry);
- }
- shipEdits(entryBatch);
- } catch (InterruptedException e) {
- LOG.trace("Interrupted while waiting for next replication entry batch", e);
- Thread.currentThread().interrupt();
- }
- }
- // If the worker exits run loop without finishing its task, mark it as stopped.
- if (state != WorkerState.FINISHED) {
- setWorkerState(WorkerState.STOPPED);
- }
- }
-
- /**
- * Do the shipping logic
- */
- protected void shipEdits(WALEntryBatch entryBatch) {
- List<Entry> entries = entryBatch.getWalEntries();
- long lastReadPosition = entryBatch.getLastWalPosition();
- currentPath = entryBatch.getLastWalPath();
- int sleepMultiplier = 0;
- if (entries.isEmpty()) {
- if (lastLoggedPosition != lastReadPosition) {
- // Save positions to meta table before zk.
- updateSerialRepPositions(entryBatch.getLastSeqIds());
- updateLogPosition(lastReadPosition);
- // if there was nothing to ship and it's not an error
- // set "ageOfLastShippedOp" to <now> to indicate that we're current
- source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
- walGroupId);
- }
- return;
- }
- int currentSize = (int) entryBatch.getHeapSize();
- while (isActive()) {
- try {
- try {
- source.tryThrottle(currentSize);
- } catch (InterruptedException e) {
- LOG.debug("Interrupted while sleeping for throttling control");
- Thread.currentThread().interrupt();
- // current thread might be interrupted to terminate
- // directly go back to while() for confirm this
- continue;
- }
-
- // create replicateContext here, so the entries can be GC'd upon return from this call
- // stack
- ReplicationEndpoint.ReplicateContext replicateContext =
- new ReplicationEndpoint.ReplicateContext();
- replicateContext.setEntries(entries).setSize(currentSize);
- replicateContext.setWalGroupId(walGroupId);
-
- long startTimeNs = System.nanoTime();
- // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
- boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
- long endTimeNs = System.nanoTime();
-
- if (!replicated) {
- continue;
- } else {
- sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
- }
-
- if (this.lastLoggedPosition != lastReadPosition) {
- //Clean up hfile references
- int size = entries.size();
- for (int i = 0; i < size; i++) {
- cleanUpHFileRefs(entries.get(i).getEdit());
- }
-
- // Save positions to meta table before zk.
- updateSerialRepPositions(entryBatch.getLastSeqIds());
- //Log and clean up WAL logs
- updateLogPosition(lastReadPosition);
- }
-
- source.postShipEdits(entries, currentSize);
- // FIXME check relationship between wal group and overall
- source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
- entryBatch.getNbHFiles());
- source.getSourceMetrics().setAgeOfLastShippedOp(
- entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Replicated " + entries.size() + " entries or " + entryBatch.getNbOperations()
- + " operations in " + ((endTimeNs - startTimeNs) / 1000000) + " ms");
- }
- break;
- } catch (Exception ex) {
- LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:"
- + org.apache.hadoop.util.StringUtils.stringifyException(ex));
- if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
- sleepMultiplier++;
- }
- }
- }
- }
-
- private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
- String key = entry.getKey();
- long seq = entry.getValue();
- boolean deleteKey = false;
- if (seq <= 0) {
- // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
- deleteKey = true;
- seq = -seq;
- }
-
- if (!canSkipWaitingSet.getUnchecked(key)) {
- try {
- source.getSourceManager().waitUntilCanBePushed(Bytes.toBytes(key), seq, source.getPeerId());
- } catch (IOException e) {
- LOG.error("waitUntilCanBePushed fail", e);
- throw new RuntimeException("waitUntilCanBePushed fail");
- } catch (InterruptedException e) {
- LOG.warn("waitUntilCanBePushed interrupted", e);
- Thread.currentThread().interrupt();
- }
- canSkipWaitingSet.put(key, true);
- }
- if (deleteKey) {
- canSkipWaitingSet.invalidate(key);
- }
- }
-
- private void cleanUpHFileRefs(WALEdit edit) throws IOException {
- String peerId = source.getPeerId();
- if (peerId.contains("-")) {
- // peerClusterZnode will be in the form peerId + "-" + rsZNode.
- // A peerId will not have "-" in its name, see HBASE-11394
- peerId = peerId.split("-")[0];
- }
- List<Cell> cells = edit.getCells();
- int totalCells = cells.size();
- for (int i = 0; i < totalCells; i++) {
- Cell cell = cells.get(i);
- if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
- BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
- List<StoreDescriptor> stores = bld.getStoresList();
- int totalStores = stores.size();
- for (int j = 0; j < totalStores; j++) {
- List<String> storeFileList = stores.get(j).getStoreFileList();
- source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
- source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
- }
- }
- }
- }
-
- protected void updateLogPosition(long lastReadPosition) {
- source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
- lastReadPosition, false, false);
- lastLoggedPosition = lastReadPosition;
- }
-
- private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
- try {
- MetaTableAccessor.updateReplicationPositions(source.getSourceManager().getConnection(),
- source.getPeerId(), lastPositionsForSerialScope);
- } catch (IOException e) {
- LOG.error("updateReplicationPositions fail", e);
- throw new RuntimeException("updateReplicationPositions fail");
- }
- }
-
- public void startup(UncaughtExceptionHandler handler) {
- String name = Thread.currentThread().getName();
- Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
- + source.getPeerClusterZnode(), handler);
- }
-
- public PriorityBlockingQueue<Path> getLogQueue() {
- return this.queue;
- }
-
- public Path getCurrentPath() {
- return this.entryReader.getCurrentPath();
- }
-
- public long getCurrentPosition() {
- return this.lastLoggedPosition;
- }
-
- public void setWALReader(ReplicationSourceWALReaderThread entryReader) {
- this.entryReader = entryReader;
- }
-
- public long getStartPosition() {
- return 0;
- }
-
- protected boolean isActive() {
- return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
- }
-
- public void setWorkerState(WorkerState state) {
- this.state = state;
- }
-
- public WorkerState getWorkerState() {
- return state;
- }
-
- public void stopWorker() {
- setWorkerState(WorkerState.STOPPED);
- }
-
- public boolean isFinished() {
- return state == WorkerState.FINISHED;
- }
-
- /**
- * Do the sleeping logic
- * @param msg Why we sleep
- * @param sleepMultiplier by how many times the default sleeping time is augmented
- * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code>
- */
- public boolean sleepForRetries(String msg, int sleepMultiplier) {
- try {
- if (LOG.isTraceEnabled()) {
- LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
- }
- Thread.sleep(this.sleepForRetries * sleepMultiplier);
- } catch (InterruptedException e) {
- LOG.debug("Interrupted while sleeping between retries");
- Thread.currentThread().interrupt();
- }
- return sleepMultiplier < maxRetriesMultiplier;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
new file mode 100644
index 0000000..04b596c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -0,0 +1,502 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+/**
+ * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ReplicationSourceWALReader extends Thread {
+ private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReader.class);
+
+ private final PriorityBlockingQueue<Path> logQueue;
+ private final FileSystem fs;
+ private final Configuration conf;
+ private final WALEntryFilter filter;
+ private final ReplicationSource source;
+
+ protected final BlockingQueue<WALEntryBatch> entryBatchQueue;
+ // max (heap) size of each batch - multiply by number of batches in queue to get total
+ private final long replicationBatchSizeCapacity;
+ // max count of each batch - multiply by number of batches in queue to get total
+ protected final int replicationBatchCountCapacity;
+ // position in the WAL to start reading at
+ private long currentPosition;
+ private final long sleepForRetries;
+ private final int maxRetriesMultiplier;
+ private final boolean eofAutoRecovery;
+
+ //Indicates whether this particular worker is running
+ private boolean isReaderRunning = true;
+
+ private AtomicLong totalBufferUsed;
+ private long totalBufferQuota;
+
+ /**
+ * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
+ * entries, and puts them on a batch queue.
+ * @param fs the files system to use
+ * @param conf configuration to use
+ * @param logQueue The WAL queue to read off of
+ * @param startPosition position in the first WAL to start reading from
+ * @param filter The filter to use while reading
+ * @param source replication source
+ */
+ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
+ PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
+ ReplicationSource source) {
+ this.logQueue = logQueue;
+ this.currentPosition = startPosition;
+ this.fs = fs;
+ this.conf = conf;
+ this.filter = filter;
+ this.source = source;
+ this.replicationBatchSizeCapacity =
+ this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
+ this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
+ // memory used will be batchSizeCapacity * (nb.batches + 1)
+ // the +1 is for the current thread reading before placing onto the queue
+ int batchCount = conf.getInt("replication.source.nb.batches", 1);
+ this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
+ this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+ HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+ this.sleepForRetries =
+ this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
+ this.maxRetriesMultiplier =
+ this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
+ this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
+ this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
+ LOG.info("peerClusterZnode=" + source.getPeerClusterZnode()
+ + ", ReplicationSourceWALReaderThread : " + source.getPeerId()
+ + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
+ + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
+ + ", replicationBatchQueueCapacity=" + batchCount);
+ }
+
+ @Override
+ public void run() {
+ int sleepMultiplier = 1;
+ while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
+ try (WALEntryStream entryStream =
+ new WALEntryStream(logQueue, fs, conf, currentPosition, source.getSourceMetrics())) {
+ while (isReaderRunning()) { // loop here to keep reusing stream while we can
+ if (!checkQuota()) {
+ continue;
+ }
+ WALEntryBatch batch = readWALEntries(entryStream);
+ if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(String.format("Read %s WAL entries eligible for replication",
+ batch.getNbEntries()));
+ }
+ entryBatchQueue.put(batch);
+ sleepMultiplier = 1;
+ } else { // got no entries and didn't advance position in WAL
+ handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath());
+ }
+ currentPosition = entryStream.getPosition();
+ entryStream.reset(); // reuse stream
+ }
+ } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
+ if (sleepMultiplier < maxRetriesMultiplier) {
+ LOG.debug("Failed to read stream of replication entries: " + e);
+ sleepMultiplier++;
+ } else {
+ LOG.error("Failed to read stream of replication entries", e);
+ handleEofException(e);
+ }
+ Threads.sleep(sleepForRetries * sleepMultiplier);
+ } catch (InterruptedException e) {
+ LOG.trace("Interrupted while sleeping between WAL reads");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException {
+ WALEntryBatch batch = null;
+ while (entryStream.hasNext()) {
+ if (batch == null) {
+ batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
+ }
+ Entry entry = entryStream.next();
+ if (updateSerialReplPos(batch, entry)) {
+ batch.lastWalPosition = entryStream.getPosition();
+ break;
+ }
+ entry = filterEntry(entry);
+ if (entry != null) {
+ WALEdit edit = entry.getEdit();
+ if (edit != null && !edit.isEmpty()) {
+ long entrySize = getEntrySize(entry);
+ batch.addEntry(entry);
+ updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
+ boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
+ // Stop if too many entries or too big
+ if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
+ || batch.getNbEntries() >= replicationBatchCountCapacity) {
+ break;
+ }
+ }
+ }
+ }
+ return batch;
+ }
+
+ protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
+ throws InterruptedException {
+ LOG.trace("Didn't read any new entries from WAL");
+ Thread.sleep(sleepForRetries);
+ }
+
+ // if we get an EOF due to a zero-length log, and there are other logs in queue
+ // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
+ // enabled, then dump the log
+ private void handleEofException(Exception e) {
+ if (e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) {
+ try {
+ if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
+ LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
+ logQueue.remove();
+ currentPosition = 0;
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Couldn't get file length information about log " + logQueue.peek());
+ }
+ }
+ }
+
+ public Path getCurrentPath() {
+ // if we've read some WAL entries, get the Path we read from
+ WALEntryBatch batchQueueHead = entryBatchQueue.peek();
+ if (batchQueueHead != null) {
+ return batchQueueHead.lastWalPath;
+ }
+ // otherwise, we must be currently reading from the head of the log queue
+ return logQueue.peek();
+ }
+
+ //returns false if we've already exceeded the global quota
+ private boolean checkQuota() {
+ // try not to go over total quota
+ if (totalBufferUsed.get() > totalBufferQuota) {
+ Threads.sleep(sleepForRetries);
+ return false;
+ }
+ return true;
+ }
+
+ private Entry filterEntry(Entry entry) {
+ Entry filtered = filter.filter(entry);
+ if (entry != null && filtered == null) {
+ source.getSourceMetrics().incrLogEditsFiltered();
+ }
+ return filtered;
+ }
+
+ /**
+ * @return true if we should stop reading because we're at REGION_CLOSE
+ */
+ private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException {
+ if (entry.hasSerialReplicationScope()) {
+ String key = Bytes.toString(entry.getKey().getEncodedRegionName());
+ batch.setLastPosition(key, entry.getKey().getSequenceId());
+ if (!entry.getEdit().getCells().isEmpty()) {
+ WALProtos.RegionEventDescriptor maybeEvent =
+ WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
+ if (maybeEvent != null && maybeEvent
+ .getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
+ // In serially replication, if we move a region to another RS and move it back, we may
+ // read logs crossing two sections. We should break at REGION_CLOSE and push the first
+ // section first in case of missing the middle section belonging to the other RS.
+ // In a worker thread, if we can push the first log of a region, we can push all logs
+ // in the same region without waiting until we read a close marker because next time
+ // we read logs in this region, it must be a new section and not adjacent with this
+ // region. Mark it negative.
+ batch.setLastPosition(key, -entry.getKey().getSequenceId());
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
+ * batch to become available
+ * @return A batch of entries, along with the position in the log after reading the batch
+ * @throws InterruptedException if interrupted while waiting
+ */
+ public WALEntryBatch take() throws InterruptedException {
+ return entryBatchQueue.take();
+ }
+
+ private long getEntrySize(Entry entry) {
+ WALEdit edit = entry.getEdit();
+ return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
+ }
+
+ private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
+ WALEdit edit = entry.getEdit();
+ if (edit != null && !edit.isEmpty()) {
+ batch.incrementHeapSize(entrySize);
+ Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
+ batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
+ batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
+ }
+ batch.lastWalPosition = entryPosition;
+ }
+
+ /**
+ * Count the number of different row keys in the given edit because of mini-batching. We assume
+ * that there's at least one Cell in the WALEdit.
+ * @param edit edit to count row keys from
+ * @return number of different row keys and HFiles
+ */
+ private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
+ List<Cell> cells = edit.getCells();
+ int distinctRowKeys = 1;
+ int totalHFileEntries = 0;
+ Cell lastCell = cells.get(0);
+
+ int totalCells = edit.size();
+ for (int i = 0; i < totalCells; i++) {
+ // Count HFiles to be replicated
+ if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
+ try {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
+ List<StoreDescriptor> stores = bld.getStoresList();
+ int totalStores = stores.size();
+ for (int j = 0; j < totalStores; j++) {
+ totalHFileEntries += stores.get(j).getStoreFileList().size();
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ + "Then its hfiles count will not be added into metric.");
+ }
+ }
+
+ if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
+ distinctRowKeys++;
+ }
+ lastCell = cells.get(i);
+ }
+
+ Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
+ return result;
+ }
+
+ /**
+ * Calculate the total size of all the store files
+ * @param edit edit to count row keys from
+ * @return the total size of the store files
+ */
+ private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
+ List<Cell> cells = edit.getCells();
+ int totalStoreFilesSize = 0;
+
+ int totalCells = edit.size();
+ for (int i = 0; i < totalCells; i++) {
+ if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
+ try {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
+ List<StoreDescriptor> stores = bld.getStoresList();
+ int totalStores = stores.size();
+ for (int j = 0; j < totalStores; j++) {
+ totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes();
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ + "Size of HFiles part of cell will not be considered in replication "
+ + "request size calculation.",
+ e);
+ }
+ }
+ }
+ return totalStoreFilesSize;
+ }
+
+ /**
+ * @param size delta size for grown buffer
+ * @return true if we should clear buffer and push all
+ */
+ private boolean acquireBufferQuota(long size) {
+ return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
+ }
+
+ /**
+ * @return whether the reader thread is running
+ */
+ public boolean isReaderRunning() {
+ return isReaderRunning && !isInterrupted();
+ }
+
+ /**
+ * @param readerRunning the readerRunning to set
+ */
+ public void setReaderRunning(boolean readerRunning) {
+ this.isReaderRunning = readerRunning;
+ }
+
+ /**
+ * Holds a batch of WAL entries to replicate, along with some statistics
+ *
+ */
+ static class WALEntryBatch {
+ private List<Entry> walEntries;
+ // last WAL that was read
+ private Path lastWalPath;
+ // position in WAL of last entry in this batch
+ private long lastWalPosition = 0;
+ // number of distinct row keys in this batch
+ private int nbRowKeys = 0;
+ // number of HFiles
+ private int nbHFiles = 0;
+ // heap size of data we need to replicate
+ private long heapSize = 0;
+ // save the last sequenceid for each region if the table has serial-replication scope
+ private Map<String, Long> lastSeqIds = new HashMap<>();
+
+ /**
+ * @param walEntries
+ * @param lastWalPath Path of the WAL the last entry in this batch was read from
+ * @param lastWalPosition Position in the WAL the last entry in this batch was read from
+ */
+ WALEntryBatch(int maxNbEntries, Path lastWalPath) {
+ this.walEntries = new ArrayList<>(maxNbEntries);
+ this.lastWalPath = lastWalPath;
+ }
+
+ public void addEntry(Entry entry) {
+ walEntries.add(entry);
+ }
+
+ /**
+ * @return the WAL Entries.
+ */
+ public List<Entry> getWalEntries() {
+ return walEntries;
+ }
+
+ /**
+ * @return the path of the last WAL that was read.
+ */
+ public Path getLastWalPath() {
+ return lastWalPath;
+ }
+
+ /**
+ * @return the position in the last WAL that was read.
+ */
+ public long getLastWalPosition() {
+ return lastWalPosition;
+ }
+
+ public int getNbEntries() {
+ return walEntries.size();
+ }
+
+ /**
+ * @return the number of distinct row keys in this batch
+ */
+ public int getNbRowKeys() {
+ return nbRowKeys;
+ }
+
+ /**
+ * @return the number of HFiles in this batch
+ */
+ public int getNbHFiles() {
+ return nbHFiles;
+ }
+
+ /**
+ * @return total number of operations in this batch
+ */
+ public int getNbOperations() {
+ return getNbRowKeys() + getNbHFiles();
+ }
+
+ /**
+ * @return the heap size of this batch
+ */
+ public long getHeapSize() {
+ return heapSize;
+ }
+
+ /**
+ * @return the last sequenceid for each region if the table has serial-replication scope
+ */
+ public Map<String, Long> getLastSeqIds() {
+ return lastSeqIds;
+ }
+
+ private void incrementNbRowKeys(int increment) {
+ nbRowKeys += increment;
+ }
+
+ private void incrementNbHFiles(int increment) {
+ nbHFiles += increment;
+ }
+
+ private void incrementHeapSize(long increment) {
+ heapSize += increment;
+ }
+
+ private void setLastPosition(String region, Long sequenceId) {
+ getLastSeqIds().put(region, sequenceId);
+ }
+ }
+}