You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/06/03 16:49:07 UTC
[1/2] hbase git commit: HBASE-15995 Separate replication WAL reading
from shipping
Repository: hbase
Updated Branches:
refs/heads/branch-1 b66a478e7 -> 3cf443326
http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
new file mode 100644
index 0000000..c4d552c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -0,0 +1,411 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALFactory;
+
+/**
+ * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
+ * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
+ * dequeues it and starts reading from the next.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entry> {
+ private static final Log LOG = LogFactory.getLog(WALEntryStream.class);
+
+ private Reader reader;
+ private Path currentPath;
+ // cache of next entry for hasNext()
+ private Entry currentEntry;
+ // position after reading current entry
+ private long currentPosition = 0;
+ private PriorityBlockingQueue<Path> logQueue;
+ private FileSystem fs;
+ private Configuration conf;
+ private MetricsSource metrics;
+
+ /**
+ * Create an entry stream over the given queue
+ * @param logQueue the queue of WAL paths
+ * @param fs {@link FileSystem} to use to create {@link Reader} for this stream
+ * @param conf {@link Configuration} to use to create {@link Reader} for this stream
+ * @param metrics replication metrics
+ * @throws IOException
+ */
+ public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
+ MetricsSource metrics)
+ throws IOException {
+ this(logQueue, fs, conf, 0, metrics);
+ }
+
+ /**
+ * Create an entry stream over the given queue at the given start position
+ * @param logQueue the queue of WAL paths
+ * @param fs {@link FileSystem} to use to create {@link Reader} for this stream
+ * @param conf {@link Configuration} to use to create {@link Reader} for this stream
+ * @param startPosition the position in the first WAL to start reading at
+ * @param metrics replication metrics
+ * @throws IOException
+ */
+ public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
+ long startPosition, MetricsSource metrics) throws IOException {
+ this.logQueue = logQueue;
+ this.fs = fs;
+ this.conf = conf;
+ this.currentPosition = startPosition;
+ this.metrics = metrics;
+ }
+
+ /**
+ * @return true if there is another WAL {@link Entry}
+ * @throws WALEntryStreamRuntimeException if there was an Exception while reading
+ */
+ @Override
+ public boolean hasNext() {
+ if (currentEntry == null) {
+ try {
+ tryAdvanceEntry();
+ } catch (Exception e) {
+ throw new WALEntryStreamRuntimeException(e);
+ }
+ }
+ return currentEntry != null;
+ }
+
+ /**
+ * @return the next WAL entry in this stream
+ * @throws WALEntryStreamRuntimeException if there was an IOException
+ * @throws NoSuchElementException if no more entries in the stream.
+ */
+ @Override
+ public Entry next() {
+ if (!hasNext()) throw new NoSuchElementException();
+ Entry save = currentEntry;
+ currentEntry = null; // gets reloaded by hasNext()
+ return save;
+ }
+
+ /**
+ * Not supported.
+ */
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() throws IOException {
+ closeReader();
+ }
+
+ /**
+ * @return the iterator over WAL entries in the queue.
+ */
+ @Override
+ public Iterator<Entry> iterator() {
+ return this;
+ }
+
+ /**
+ * @return the position of the last Entry returned by next()
+ */
+ public long getPosition() {
+ return currentPosition;
+ }
+
+ /**
+ * @return the {@link Path} of the current WAL
+ */
+ public Path getCurrentPath() {
+ return currentPath;
+ }
+
+ private String getCurrentPathStat() {
+ StringBuilder sb = new StringBuilder();
+ if (currentPath != null) {
+ sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
+ .append(currentPosition).append("\n");
+ } else {
+ sb.append("no replication ongoing, waiting for new log");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
+ * false)
+ * @throws IOException
+ */
+ public void reset() throws IOException {
+ if (reader != null && currentPath != null) {
+ resetReader();
+ }
+ }
+
+ private void setPosition(long position) {
+ currentPosition = position;
+ }
+
+ private void setCurrentPath(Path path) {
+ this.currentPath = path;
+ }
+
+ private void tryAdvanceEntry() throws IOException {
+ if (checkReader()) {
+ readNextEntryAndSetPosition();
+ if (currentEntry == null) { // no more entries in this log file - see if log was rolled
+ if (logQueue.size() > 1) { // log was rolled
+ // Before dequeueing, we should always get one more attempt at reading.
+ // This is in case more entries came in after we opened the reader,
+ // and a new log was enqueued while we were reading. See HBASE-6758
+ resetReader();
+ readNextEntryAndSetPosition();
+ if (currentEntry == null) {
+ if (checkAllBytesParsed()) { // now we're certain we're done with this log file
+ dequeueCurrentLog();
+ if (openNextLog()) {
+ readNextEntryAndSetPosition();
+ }
+ }
+ }
+ } // no other logs, we've simply hit the end of the current open log. Do nothing
+ }
+ }
+ // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
+ }
+
+ // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
+ private boolean checkAllBytesParsed() throws IOException {
+ // -1 means the wal wasn't closed cleanly.
+ final long trailerSize = currentTrailerSize();
+ FileStatus stat = null;
+ try {
+ stat = fs.getFileStatus(this.currentPath);
+ } catch (IOException exception) {
+ LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it "
+ + (trailerSize < 0 ? "was not" : "was") + " closed cleanly " + getCurrentPathStat());
+ metrics.incrUnknownFileLengthForClosedWAL();
+ }
+ if (stat != null) {
+ if (trailerSize < 0) {
+ if (currentPosition < stat.getLen()) {
+ final long skippedBytes = stat.getLen() - currentPosition;
+ LOG.info("Reached the end of WAL file '" + currentPath
+ + "'. It was not closed cleanly, so we did not parse " + skippedBytes
+ + " bytes of data.");
+ metrics.incrUncleanlyClosedWALs();
+ metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
+ }
+ } else if (currentPosition + trailerSize < stat.getLen()) {
+ LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition
+ + ", which is too far away from reported file length " + stat.getLen()
+ + ". Restarting WAL reading (see HBASE-15983 for details). " + getCurrentPathStat());
+ setPosition(0);
+ resetReader();
+ metrics.incrRestartedWALReading();
+ metrics.incrRepeatedFileBytes(currentPosition);
+ return false;
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is "
+ + (stat == null ? "N/A" : stat.getLen()));
+ }
+ metrics.incrCompletedWAL();
+ return true;
+ }
+
+ private void dequeueCurrentLog() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reached the end of log " + currentPath);
+ }
+ closeReader();
+ logQueue.remove();
+ setPosition(0);
+ metrics.decrSizeOfLogQueue();
+ }
+
+ private void readNextEntryAndSetPosition() throws IOException {
+ Entry readEntry = reader.next();
+ long readerPos = reader.getPosition();
+ if (readEntry != null) {
+ metrics.incrLogEditsRead();
+ metrics.incrLogReadInBytes(readerPos - currentPosition);
+ }
+ currentEntry = readEntry; // could be null
+ setPosition(readerPos);
+ }
+
+ private void closeReader() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ }
+
+ // if we don't have a reader, open a reader on the next log
+ private boolean checkReader() throws IOException {
+ if (reader == null) {
+ return openNextLog();
+ }
+ return true;
+ }
+
+ // open a reader on the next log in queue
+ private boolean openNextLog() throws IOException {
+ Path nextPath = logQueue.peek();
+ if (nextPath != null) {
+ openReader(nextPath);
+ if (reader != null) return true;
+ }
+ return false;
+ }
+
+ private Path getArchivedLog(Path path) throws IOException {
+ Path rootDir = FSUtils.getRootDir(conf);
+ Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ Path archivedLogLocation = new Path(oldLogDir, path.getName());
+ if (fs.exists(archivedLogLocation)) {
+ LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+ return archivedLogLocation;
+ } else {
+ LOG.error("Couldn't locate log: " + path);
+ return path;
+ }
+ }
+
+ private void openReader(Path path) throws IOException {
+ try {
+ // Detect if this is a new file, if so get a new reader else
+ // reset the current reader so that we see the new data
+ if (reader == null || !getCurrentPath().equals(path)) {
+ closeReader();
+ reader = WALFactory.createReader(fs, path, conf);
+ seek();
+ setCurrentPath(path);
+ } else {
+ resetReader();
+ }
+ } catch (FileNotFoundException fnfe) {
+ // If the log was archived, continue reading from there
+ Path archivedLog = getArchivedLog(path);
+ if (!path.equals(archivedLog)) {
+ openReader(archivedLog);
+ } else {
+ throw fnfe;
+ }
+ } catch (LeaseNotRecoveredException lnre) {
+ // HBASE-15019 the WAL was not closed due to some hiccup.
+ LOG.warn("Try to recover the WAL lease " + currentPath, lnre);
+ recoverLease(conf, currentPath);
+ reader = null;
+ } catch (NullPointerException npe) {
+ // Workaround for race condition in HDFS-4380
+ // which throws a NPE if we open a file before any data node has the most recent block
+ // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
+ LOG.warn("Got NPE opening reader, will retry.");
+ reader = null;
+ }
+ }
+
+ // For HBASE-15019
+ private void recoverLease(final Configuration conf, final Path path) {
+ try {
+ final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
+ FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
+ fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
+ @Override
+ public boolean progress() {
+ LOG.debug("recover WAL lease: " + path);
+ return true;
+ }
+ });
+ } catch (IOException e) {
+ LOG.warn("unable to recover lease for WAL: " + path, e);
+ }
+ }
+
+ private void resetReader() throws IOException {
+ try {
+ reader.reset();
+ seek();
+ } catch (FileNotFoundException fnfe) {
+ // If the log was archived, continue reading from there
+ Path archivedLog = getArchivedLog(currentPath);
+ if (!currentPath.equals(archivedLog)) {
+ openReader(archivedLog);
+ } else {
+ throw fnfe;
+ }
+ } catch (NullPointerException npe) {
+ throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
+ }
+ }
+
+ private void seek() throws IOException {
+ if (currentPosition != 0) {
+ reader.seek(currentPosition);
+ }
+ }
+
+ private long currentTrailerSize() {
+ long size = -1L;
+ if (reader instanceof ProtobufLogReader) {
+ final ProtobufLogReader pblr = (ProtobufLogReader) reader;
+ size = pblr.trailerSize();
+ }
+ return size;
+ }
+
+ @InterfaceAudience.Private
+ public static class WALEntryStreamRuntimeException extends RuntimeException {
+ private static final long serialVersionUID = -6298201811259982568L;
+
+ public WALEntryStreamRuntimeException(Exception e) {
+ super(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
deleted file mode 100644
index 40db3eb..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-@Category({LargeTests.class})
-@RunWith(Parameterized.class)
-public class TestReplicationWALReaderManager {
-
- private static HBaseTestingUtility TEST_UTIL;
- private static Configuration conf;
- private static FileSystem fs;
- private static MiniDFSCluster cluster;
- private static final TableName tableName = TableName.valueOf("tablename");
- private static final byte [] family = Bytes.toBytes("column");
- private static final byte [] qualifier = Bytes.toBytes("qualifier");
- private static final HRegionInfo info = new HRegionInfo(tableName,
- HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
- private static final HTableDescriptor htd = new HTableDescriptor(tableName);
-
- private WAL log;
- private ReplicationWALReaderManager logManager;
- private PathWatcher pathWatcher;
- private int nbRows;
- private int walEditKVs;
- @Rule public TestName tn = new TestName();
- private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
-
- @Parameters
- public static Collection<Object[]> parameters() {
- // Try out different combinations of row count and KeyValue count
- int[] NB_ROWS = { 1500, 60000 };
- int[] NB_KVS = { 1, 100 };
- // whether compression is used
- Boolean[] BOOL_VALS = { false, true };
- List<Object[]> parameters = new ArrayList<Object[]>();
- for (int nbRows : NB_ROWS) {
- for (int walEditKVs : NB_KVS) {
- for (boolean b : BOOL_VALS) {
- Object[] arr = new Object[3];
- arr[0] = nbRows;
- arr[1] = walEditKVs;
- arr[2] = b;
- parameters.add(arr);
- }
- }
- }
- return parameters;
- }
-
- public TestReplicationWALReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
- this.nbRows = nbRows;
- this.walEditKVs = walEditKVs;
- TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
- enableCompression);
- mvcc.advanceTo(1);
- }
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- TEST_UTIL = new HBaseTestingUtility();
- conf = TEST_UTIL.getConfiguration();
- TEST_UTIL.startMiniDFSCluster(3);
-
- cluster = TEST_UTIL.getDFSCluster();
- fs = cluster.getFileSystem();
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- }
-
- @Before
- public void setUp() throws Exception {
- logManager = new ReplicationWALReaderManager(fs, conf);
- List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
- pathWatcher = new PathWatcher();
- listeners.add(pathWatcher);
- final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName());
- log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
- }
-
- @After
- public void tearDown() throws Exception {
- log.close();
- }
-
- @Test
- public void test() throws Exception {
- // Grab the path that was generated when the log rolled as part of its creation
- Path path = pathWatcher.currentPath;
-
- assertEquals(0, logManager.getPosition());
-
- appendToLog();
-
- // There's one edit in the log, read it. Reading past it needs to return nulls
- assertNotNull(logManager.openReader(path));
- logManager.seek();
- WAL.Entry entry = logManager.readNextAndSetPosition();
- assertNotNull(entry);
- entry = logManager.readNextAndSetPosition();
- assertNull(entry);
- logManager.closeReader();
- long oldPos = logManager.getPosition();
-
- appendToLog();
-
- // Read the newly added entry, make sure we made progress
- assertNotNull(logManager.openReader(path));
- logManager.seek();
- entry = logManager.readNextAndSetPosition();
- assertNotEquals(oldPos, logManager.getPosition());
- assertNotNull(entry);
- logManager.closeReader();
- oldPos = logManager.getPosition();
-
- log.rollWriter();
-
- // We rolled but we still should see the end of the first log and not get data
- assertNotNull(logManager.openReader(path));
- logManager.seek();
- entry = logManager.readNextAndSetPosition();
- assertEquals(oldPos, logManager.getPosition());
- assertNull(entry);
- logManager.finishCurrentFile();
-
- path = pathWatcher.currentPath;
-
- for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
- log.rollWriter();
- logManager.openReader(path);
- logManager.seek();
- for (int i = 0; i < nbRows; i++) {
- WAL.Entry e = logManager.readNextAndSetPosition();
- if (e == null) {
- fail("Should have enough entries");
- }
- }
- }
-
- private void appendToLog() throws IOException {
- appendToLogPlus(1);
- }
-
- private void appendToLogPlus(int count) throws IOException {
- final long txid = log.append(htd, info,
- new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
- getWALEdits(count), true);
- log.sync(txid);
- }
-
- private WALEdit getWALEdits(int count) {
- WALEdit edit = new WALEdit();
- for (int i = 0; i < count; i++) {
- edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
- System.currentTimeMillis(), qualifier));
- }
- return edit;
- }
-
- class PathWatcher extends WALActionsListener.Base {
-
- Path currentPath;
-
- @Override
- public void preLogRoll(Path oldPath, Path newPath) throws IOException {
- currentPath = newPath;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
new file mode 100644
index 0000000..005e2a1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -0,0 +1,440 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestWALEntryStream {
+
+ private static HBaseTestingUtility TEST_UTIL;
+ private static Configuration conf;
+ private static FileSystem fs;
+ private static MiniDFSCluster cluster;
+ private static final TableName tableName = TableName.valueOf("tablename");
+ private static final byte[] family = Bytes.toBytes("column");
+ private static final byte[] qualifier = Bytes.toBytes("qualifier");
+ private static final HRegionInfo info =
+ new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
+ private static final HTableDescriptor htd = new HTableDescriptor(tableName);
+ private static NavigableMap<byte[], Integer> scopes;
+
+ private WAL log;
+ PriorityBlockingQueue<Path> walQueue;
+ private PathWatcher pathWatcher;
+
+ @Rule
+ public TestName tn = new TestName();
+ private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ conf = TEST_UTIL.getConfiguration();
+ TEST_UTIL.startMiniDFSCluster(3);
+
+ cluster = TEST_UTIL.getDFSCluster();
+ fs = cluster.getFileSystem();
+ scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ for (byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ walQueue = new PriorityBlockingQueue<>();
+ List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
+ pathWatcher = new PathWatcher();
+ listeners.add(pathWatcher);
+ final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName());
+ log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ log.close();
+ }
+
+ // Try out different combinations of row count and KeyValue count
+ @Test
+ public void testDifferentCounts() throws Exception {
+ int[] NB_ROWS = { 1500, 60000 };
+ int[] NB_KVS = { 1, 100 };
+ // whether compression is used
+ Boolean[] BOOL_VALS = { false, true };
+ // long lastPosition = 0;
+ for (int nbRows : NB_ROWS) {
+ for (int walEditKVs : NB_KVS) {
+ for (boolean isCompressionEnabled : BOOL_VALS) {
+ TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
+ isCompressionEnabled);
+ mvcc.advanceTo(1);
+
+ for (int i = 0; i < nbRows; i++) {
+ appendToLogPlus(walEditKVs);
+ }
+
+ log.rollWriter();
+
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+ int i = 0;
+ for (WAL.Entry e : entryStream) {
+ assertNotNull(e);
+ i++;
+ }
+ assertEquals(nbRows, i);
+
+ // should've read all entries
+ assertFalse(entryStream.hasNext());
+ }
+ // reset everything for next loop
+ log.close();
+ setUp();
+ }
+ }
+ }
+ }
+
+ /**
+ * Tests basic reading of log appends
+ */
+ @Test
+ public void testAppendsWithRolls() throws Exception {
+ appendToLog();
+
+ long oldPos;
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+ // There's one edit in the log, read it. Reading past it needs to throw exception
+ assertTrue(entryStream.hasNext());
+ WAL.Entry entry = entryStream.next();
+ assertNotNull(entry);
+ assertFalse(entryStream.hasNext());
+ try {
+ entry = entryStream.next();
+ fail();
+ } catch (NoSuchElementException e) {
+ // expected
+ }
+ oldPos = entryStream.getPosition();
+ }
+
+ appendToLog();
+
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
+ // Read the newly added entry, make sure we made progress
+ WAL.Entry entry = entryStream.next();
+ assertNotEquals(oldPos, entryStream.getPosition());
+ assertNotNull(entry);
+ oldPos = entryStream.getPosition();
+ }
+
+ // We rolled but we still should see the end of the first log and get that item
+ appendToLog();
+ log.rollWriter();
+ appendToLog();
+
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
+ WAL.Entry entry = entryStream.next();
+ assertNotEquals(oldPos, entryStream.getPosition());
+ assertNotNull(entry);
+
+ // next item should come from the new log
+ entry = entryStream.next();
+ assertNotEquals(oldPos, entryStream.getPosition());
+ assertNotNull(entry);
+
+ // no more entries to read
+ assertFalse(entryStream.hasNext());
+ oldPos = entryStream.getPosition();
+ }
+ }
+
+ /**
+ * Tests that if after a stream is opened, more entries come in and then the log is rolled, we
+ * don't mistakenly dequeue the current log thinking we're done with it
+ */
+ @Test
+ public void testLogrollWhileStreaming() throws Exception {
+ appendToLog("1");
+ appendToLog("2");// 2
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+ assertEquals("1", getRow(entryStream.next()));
+
+ appendToLog("3"); // 3 - comes in after reader opened
+ log.rollWriter(); // log roll happening while we're reading
+ appendToLog("4"); // 4 - this append is in the rolled log
+
+ assertEquals("2", getRow(entryStream.next()));
+ assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an
+ // entry in first log
+ assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
+ // and 3 would be skipped
+ assertEquals("4", getRow(entryStream.next())); // 4
+ assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly
+ assertFalse(entryStream.hasNext());
+ }
+ }
+
+ /**
+ * Tests that if writes come in while we have a stream open, we shouldn't miss them
+ */
+ @Test
+ public void testNewEntriesWhileStreaming() throws Exception {
+ appendToLog("1");
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+ entryStream.next(); // we've hit the end of the stream at this point
+
+ // some new entries come in while we're streaming
+ appendToLog("2");
+ appendToLog("3");
+
+ // don't see them
+ assertFalse(entryStream.hasNext());
+
+ // But we do if we reset
+ entryStream.reset();
+ assertEquals("2", getRow(entryStream.next()));
+ assertEquals("3", getRow(entryStream.next()));
+ assertFalse(entryStream.hasNext());
+ }
+ }
+
+ @Test
+ public void testResumeStreamingFromPosition() throws Exception {
+ long lastPosition = 0;
+ appendToLog("1");
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+ entryStream.next(); // we've hit the end of the stream at this point
+ appendToLog("2");
+ appendToLog("3");
+ lastPosition = entryStream.getPosition();
+ }
+ // next stream should picks up where we left off
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+ assertEquals("2", getRow(entryStream.next()));
+ assertEquals("3", getRow(entryStream.next()));
+ assertFalse(entryStream.hasNext()); // done
+ assertEquals(1, walQueue.size());
+ }
+ }
+
+ /**
+ * Tests that if we stop before hitting the end of a stream, we can continue where we left off
+ * using the last position
+ */
+ @Test
+ public void testPosition() throws Exception {
+ long lastPosition = 0;
+ appendEntriesToLog(3);
+ // read only one element
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+ entryStream.next();
+ lastPosition = entryStream.getPosition();
+ }
+ // there should still be two more entries from where we left off
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+ assertNotNull(entryStream.next());
+ assertNotNull(entryStream.next());
+ assertFalse(entryStream.hasNext());
+ }
+ }
+
+
+ @Test
+ public void testEmptyStream() throws Exception {
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+ assertFalse(entryStream.hasNext());
+ }
+ }
+
+ @Test
+ public void testReplicationSourceWALReaderThread() throws Exception {
+ appendEntriesToLog(3);
+ // get ending position
+ long position;
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+ entryStream.next();
+ entryStream.next();
+ entryStream.next();
+ position = entryStream.getPosition();
+ }
+
+ // start up a batcher
+ ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+ when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
+ fs, conf, getDummyFilter(), new MetricsSource("1"));
+ Path walPath = walQueue.peek();
+ batcher.start();
+ WALEntryBatch entryBatch = batcher.take();
+
+ // should've batched up our entries
+ assertNotNull(entryBatch);
+ assertEquals(3, entryBatch.getWalEntries().size());
+ assertEquals(position, entryBatch.getLastWalPosition());
+ assertEquals(walPath, entryBatch.getLastWalPath());
+ assertEquals(3, entryBatch.getNbRowKeys());
+
+ appendToLog("foo");
+ entryBatch = batcher.take();
+ assertEquals(1, entryBatch.getNbEntries());
+ assertEquals(getRow(entryBatch.getWalEntries().get(0)), "foo");
+ }
+
+ private String getRow(WAL.Entry entry) {
+ Cell cell = entry.getEdit().getCells().get(0);
+ return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+ }
+
+ private void appendToLog(String key) throws IOException {
+ final long txid = log.append(htd, info,
+ new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
+ getWALEdit(key), true);
+ log.sync(txid);
+ }
+
+ private void appendEntriesToLog(int count) throws IOException {
+ for (int i = 0; i < count; i++) {
+ appendToLog();
+ }
+ }
+
+ private void appendToLog() throws IOException {
+ appendToLogPlus(1);
+ }
+
+ private void appendToLogPlus(int count) throws IOException {
+ final long txid = log.append(htd, info,
+ new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
+ getWALEdits(count), true);
+ log.sync(txid);
+ }
+
+ private WALEdit getWALEdits(int count) {
+ WALEdit edit = new WALEdit();
+ for (int i = 0; i < count; i++) {
+ edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
+ System.currentTimeMillis(), qualifier));
+ }
+ return edit;
+ }
+
+ private WALEdit getWALEdit(String row) {
+ WALEdit edit = new WALEdit();
+ edit.add(
+ new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier));
+ return edit;
+ }
+
+ private WALEntryFilter getDummyFilter() {
+ return new WALEntryFilter() {
+
+ @Override
+ public Entry filter(Entry entry) {
+ return entry;
+ }
+ };
+ }
+
+ private ReplicationQueueInfo getQueueInfo() {
+ return new ReplicationQueueInfo("1");
+ }
+
+ class PathWatcher extends WALActionsListener.Base {
+
+ Path currentPath;
+
+ @Override
+ public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+ walQueue.add(newPath);
+ currentPath = newPath;
+ }
+ }
+
+}
[2/2] hbase git commit: HBASE-15995 Separate replication WAL reading
from shipping
Posted by te...@apache.org.
HBASE-15995 Separate replication WAL reading from shipping
Signed-off-by: tedyu <yu...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3cf44332
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3cf44332
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3cf44332
Branch: refs/heads/branch-1
Commit: 3cf4433260b60a0e0455090628cf60a9d5a180f3
Parents: b66a478
Author: Vincent <vi...@gmail.com>
Authored: Thu Jun 1 16:50:41 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Sat Jun 3 09:48:57 2017 -0700
----------------------------------------------------------------------
.../replication/ClusterMarkingEntryFilter.java | 70 ++
.../regionserver/ReplicationSource.java | 826 +++++--------------
.../ReplicationSourceWALReaderThread.java | 471 +++++++++++
.../ReplicationWALReaderManager.java | 155 ----
.../regionserver/WALEntryStream.java | 411 +++++++++
.../TestReplicationWALReaderManager.java | 228 -----
.../regionserver/TestWALEntryStream.java | 440 ++++++++++
7 files changed, 1610 insertions(+), 991 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
new file mode 100644
index 0000000..7cd3fed
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.UUID;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+
+/**
+ * Filters out entries with our peerClusterId (i.e. already replicated)
+ * and marks all other entries with our clusterID
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+@InterfaceStability.Evolving
+public class ClusterMarkingEntryFilter implements WALEntryFilter {
+ private UUID clusterId;
+ private UUID peerClusterId;
+ private ReplicationEndpoint replicationEndpoint;
+
+ /**
+ * @param clusterId id of this cluster
+ * @param peerClusterId of the other cluster
+ * @param replicationEndpoint ReplicationEndpoint which will handle the actual replication
+ */
+ public ClusterMarkingEntryFilter(UUID clusterId, UUID peerClusterId, ReplicationEndpoint replicationEndpoint) {
+ this.clusterId = clusterId;
+ this.peerClusterId = peerClusterId;
+ this.replicationEndpoint = replicationEndpoint;
+ }
+ @Override
+ public Entry filter(Entry entry) {
+ // don't replicate if the log entries have already been consumed by the cluster
+ if (replicationEndpoint.canReplicateToSameCluster()
+ || !entry.getKey().getClusterIds().contains(peerClusterId)) {
+ WALEdit edit = entry.getEdit();
+ WALKey logKey = entry.getKey();
+
+ if (edit != null && !edit.isEmpty()) {
+ // Mark that the current cluster has the change
+ logKey.addClusterId(clusterId);
+ // We need to set the CC to null else it will be compressed when sent to the sink
+ entry.setCompressionContext(null);
+ return entry;
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index ce070d0..0d52bbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -25,8 +25,6 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
-import java.io.EOFException;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -55,12 +53,12 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
+import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
@@ -69,16 +67,14 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* Class that handles the source of a replication stream.
@@ -93,8 +89,7 @@ import org.apache.hadoop.hbase.wal.WALKey;
*
*/
@InterfaceAudience.Private
-public class ReplicationSource extends Thread
- implements ReplicationSourceInterface {
+public class ReplicationSource extends Thread implements ReplicationSourceInterface {
private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
// Queues of logs to process, entry in format of walGroupId->queue,
@@ -118,10 +113,6 @@ public class ReplicationSource extends Thread
private Stoppable stopper;
// How long should we sleep for each retry
private long sleepForRetries;
- // Max size in bytes of entriesArray
- private long replicationQueueSizeCapacity;
- // Max number of entries in entriesArray
- private int replicationQueueNbCapacity;
private FileSystem fs;
// id of this cluster
private UUID clusterId;
@@ -149,11 +140,10 @@ public class ReplicationSource extends Thread
private ReplicationThrottler throttler;
private long defaultBandwidth;
private long currentBandwidth;
- private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
- new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
+ private ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads =
+ new ConcurrentHashMap<String, ReplicationSourceShipperThread>();
private AtomicLong totalBufferUsed;
- private long totalBufferQuota;
/**
* Instantiation method used by region servers
@@ -178,10 +168,6 @@ public class ReplicationSource extends Thread
this.stopper = stopper;
this.conf = HBaseConfiguration.create(conf);
decorateConf();
- this.replicationQueueSizeCapacity =
- this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
- this.replicationQueueNbCapacity =
- this.conf.getInt("replication.source.nb.capacity", 25000);
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
@@ -207,12 +193,8 @@ public class ReplicationSource extends Thread
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
- this.totalBufferQuota = conf.getInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
- HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
- + " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity
- + ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth="
- + this.currentBandwidth);
+ + ", currentBandwidth=" + this.currentBandwidth);
}
private void decorateConf() {
@@ -233,9 +215,9 @@ public class ReplicationSource extends Thread
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
- final ReplicationSourceWorkerThread worker =
- new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
- ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
+ final ReplicationSourceShipperThread worker =
+ new ReplicationSourceShipperThread(logPrefix, queue, replicationQueueInfo, this);
+ ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(logPrefix, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
} else {
@@ -342,9 +324,9 @@ public class ReplicationSource extends Thread
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
- final ReplicationSourceWorkerThread worker =
- new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
- ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
+ final ReplicationSourceShipperThread worker =
+ new ReplicationSourceShipperThread(walGroupId, queue, replicationQueueInfo, this);
+ ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
@@ -415,9 +397,10 @@ public class ReplicationSource extends Thread
+ " because an error occurred: " + reason, cause);
}
this.sourceRunning = false;
- Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
- for (ReplicationSourceWorkerThread worker : workers) {
+ Collection<ReplicationSourceShipperThread> workers = workerThreads.values();
+ for (ReplicationSourceShipperThread worker : workers) {
worker.setWorkerRunning(false);
+ worker.entryReader.interrupt();
worker.interrupt();
}
ListenableFuture<Service.State> future = null;
@@ -425,7 +408,7 @@ public class ReplicationSource extends Thread
future = this.replicationEndpoint.stop();
}
if (join) {
- for (ReplicationSourceWorkerThread worker : workers) {
+ for (ReplicationSourceShipperThread worker : workers) {
Threads.shutdown(worker, this.sleepForRetries);
LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
}
@@ -454,7 +437,7 @@ public class ReplicationSource extends Thread
@Override
public Path getCurrentPath() {
// only for testing
- for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
+ for (ReplicationSourceShipperThread worker : workerThreads.values()) {
if (worker.getCurrentPath() != null) return worker.getCurrentPath();
}
return null;
@@ -491,9 +474,9 @@ public class ReplicationSource extends Thread
StringBuilder sb = new StringBuilder();
sb.append("Total replicated edits: ").append(totalReplicatedEdits)
.append(", current progress: \n");
- for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
+ for (Map.Entry<String, ReplicationSourceShipperThread> entry : workerThreads.entrySet()) {
String walGroupId = entry.getKey();
- ReplicationSourceWorkerThread worker = entry.getValue();
+ ReplicationSourceShipperThread worker = entry.getValue();
long position = worker.getCurrentPosition();
Path currentPath = worker.getCurrentPath();
sb.append("walGroup [").append(walGroupId).append("]: ");
@@ -522,28 +505,20 @@ public class ReplicationSource extends Thread
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
}
- public class ReplicationSourceWorkerThread extends Thread {
- private ReplicationSource source;
- private String walGroupId;
- private PriorityBlockingQueue<Path> queue;
- private ReplicationQueueInfo replicationQueueInfo;
- // Our reader for the current log. open/close handled by repLogReader
- private WAL.Reader reader;
+ // This thread reads entries from a queue and ships them.
+ // Entries are placed onto the queue by ReplicationSourceWALReaderThread
+ public class ReplicationSourceShipperThread extends Thread {
+ ReplicationSourceInterface source;
+ String walGroupId;
+ PriorityBlockingQueue<Path> queue;
+ ReplicationQueueInfo replicationQueueInfo;
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
private volatile Path currentPath;
- // Handle on the log reader helper
- private ReplicationWALReaderManager repLogReader;
- // Current number of operations (Put/Delete) that we need to replicate
- private int currentNbOperations = 0;
- // Current size of data we need to replicate
- private int currentSize = 0;
// Indicates whether this particular worker is running
private boolean workerRunning = true;
- // Current number of hfiles that we need to replicate
- private long currentNbHFiles = 0;
- List<WAL.Entry> entries;
+ ReplicationSourceWALReaderThread entryReader;
// Use guava cache to set ttl for each key
private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.DAYS).build(
@@ -555,32 +530,17 @@ public class ReplicationSource extends Thread
}
);
- public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
- ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
+ public ReplicationSourceShipperThread(String walGroupId,
+ PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
+ ReplicationSourceInterface source) {
this.walGroupId = walGroupId;
this.queue = queue;
this.replicationQueueInfo = replicationQueueInfo;
- this.repLogReader = new ReplicationWALReaderManager(fs, conf);
this.source = source;
- this.entries = new ArrayList<>();
}
@Override
public void run() {
- // If this is recovered, the queue is already full and the first log
- // normally has a position (unless the RS failed between 2 logs)
- if (this.replicationQueueInfo.isQueueRecovered()) {
- try {
- this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
- this.queue.peek().getName()));
- if (LOG.isTraceEnabled()) {
- LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
- + this.repLogReader.getPosition());
- }
- } catch (ReplicationException e) {
- terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
- }
- }
// Loop until we close down
while (isWorkerActive()) {
int sleepMultiplier = 1;
@@ -591,150 +551,43 @@ public class ReplicationSource extends Thread
}
continue;
}
- Path oldPath = getCurrentPath(); //note that in the current scenario,
- //oldPath will be null when a log roll
- //happens.
- // Get a new path
- boolean hasCurrentPath = getNextPath();
- if (getCurrentPath() != null && oldPath == null) {
- sleepMultiplier = 1; //reset the sleepMultiplier on a path change
- }
- if (!hasCurrentPath) {
- if (sleepForRetries("No log to process", sleepMultiplier)) {
+ while (entryReader == null) {
+ if (sleepForRetries("Replication WAL entry reader thread not initialized",
+ sleepMultiplier)) {
sleepMultiplier++;
}
- continue;
- }
- boolean currentWALisBeingWrittenTo = false;
- //For WAL files we own (rather than recovered), take a snapshot of whether the
- //current WAL file (this.currentPath) is in use (for writing) NOW!
- //Since the new WAL paths are enqueued only after the prev WAL file
- //is 'closed', presence of an element in the queue means that
- //the previous WAL file was closed, else the file is in use (currentPath)
- //We take the snapshot now so that we are protected against races
- //where a new file gets enqueued while the current file is being processed
- //(and where we just finished reading the current file).
- if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
- currentWALisBeingWrittenTo = true;
- }
- // Open a reader on it
- if (!openReader(sleepMultiplier)) {
- // Reset the sleep multiplier, else it'd be reused for the next file
- sleepMultiplier = 1;
- continue;
- }
-
- // If we got a null reader but didn't continue, then sleep and continue
- if (this.reader == null) {
- if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
- sleepMultiplier++;
+ if (sleepMultiplier == maxRetriesMultiplier) {
+ LOG.warn("Replication WAL entry reader thread not initialized");
}
- continue;
}
- boolean gotIOE = false;
- currentNbOperations = 0;
- currentNbHFiles = 0;
- entries.clear();
- Map<String, Long> lastPositionsForSerialScope = new HashMap<>();
- currentSize = 0;
try {
- if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries,
- lastPositionsForSerialScope)) {
- for (Map.Entry<String, Long> entry : lastPositionsForSerialScope.entrySet()) {
- waitingUntilCanPush(entry);
- }
- try {
- MetaTableAccessor
- .updateReplicationPositions(manager.getConnection(), actualPeerId,
- lastPositionsForSerialScope);
- } catch (IOException e) {
- LOG.error("updateReplicationPositions fail", e);
- stopper.stop("updateReplicationPositions fail");
- }
-
- continue;
- }
- } catch (IOException ioe) {
- LOG.warn(peerClusterZnode + " Got: ", ioe);
- gotIOE = true;
- if (ioe.getCause() instanceof EOFException) {
-
- boolean considerDumping = false;
- if (this.replicationQueueInfo.isQueueRecovered()) {
- try {
- FileStatus stat = fs.getFileStatus(this.currentPath);
- if (stat.getLen() == 0) {
- LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
- }
- considerDumping = true;
- } catch (IOException e) {
- LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
- }
- }
-
- if (considerDumping &&
- sleepMultiplier == maxRetriesMultiplier &&
- processEndOfFile()) {
- continue;
- }
- }
- } finally {
- try {
- this.reader = null;
- this.repLogReader.closeReader();
- } catch (IOException e) {
- gotIOE = true;
- LOG.warn("Unable to finalize the tailing of a file", e);
- }
- }
- for(Map.Entry<String, Long> entry: lastPositionsForSerialScope.entrySet()) {
- waitingUntilCanPush(entry);
- }
- // If we didn't get anything to replicate, or if we hit a IOE,
- // wait a bit and retry.
- // But if we need to stop, don't bother sleeping
- if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
- if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
-
- // Save positions to meta table before zk.
- if (!gotIOE) {
- try {
- MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
- lastPositionsForSerialScope);
- } catch (IOException e) {
- LOG.error("updateReplicationPositions fail", e);
- stopper.stop("updateReplicationPositions fail");
- }
- }
-
- manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
- this.repLogReader.getPosition(),
- this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
-
- this.lastLoggedPosition = this.repLogReader.getPosition();
- }
- // Reset the sleep multiplier if nothing has actually gone wrong
- if (!gotIOE) {
- sleepMultiplier = 1;
- // if there was nothing to ship and it's not an error
- // set "ageOfLastShippedOp" to <now> to indicate that we're current
- metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
+ WALEntryBatch entryBatch = entryReader.take();
+ for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
+ waitingUntilCanPush(entry);
}
- if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
- sleepMultiplier++;
+ shipEdits(entryBatch);
+ releaseBufferQuota((int) entryBatch.getHeapSize());
+ if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty()
+ && entryBatch.getLastSeqIds().isEmpty()) {
+ LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ + peerClusterZnode);
+ metrics.incrCompletedRecoveryQueue();
+ setWorkerRunning(false);
+ continue;
}
- continue;
+ } catch (InterruptedException e) {
+ LOG.trace("Interrupted while waiting for next replication entry batch", e);
+ Thread.currentThread().interrupt();
}
- shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope);
- releaseBufferQuota();
}
+
if (replicationQueueInfo.isQueueRecovered()) {
// use synchronize to make sure one last thread will clean the queue
synchronized (workerThreads) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
boolean allOtherTaskDone = true;
- for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
+ for (ReplicationSourceShipperThread worker : workerThreads.values()) {
if (!worker.equals(this) && worker.isAlive()) {
allOtherTaskDone = false;
break;
@@ -776,127 +629,6 @@ public class ReplicationSource extends Thread
}
}
- /**
- * Read all the entries from the current log files and retain those that need to be replicated.
- * Else, process the end of the current file.
- * @param currentWALisBeingWrittenTo is the current WAL being written to
- * @param entries resulting entries to be replicated
- * @param lastPosition save the last sequenceid for each region if the table has
- * serial-replication scope
- * @return true if we got nothing and went to the next file, false if we got entries
- * @throws IOException
- */
- protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
- List<WAL.Entry> entries, Map<String, Long> lastPosition) throws IOException {
- long seenEntries = 0;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Seeking in " + this.currentPath + " at position "
- + this.repLogReader.getPosition());
- }
- this.repLogReader.seek();
- long positionBeforeRead = this.repLogReader.getPosition();
- WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
- while (entry != null) {
- metrics.incrLogEditsRead();
- seenEntries++;
-
- if (entry.hasSerialReplicationScope()) {
- String key = Bytes.toString(entry.getKey().getEncodedRegionName());
- lastPosition.put(key, entry.getKey().getLogSeqNum());
- if (entry.getEdit().getCells().size() > 0) {
- WALProtos.RegionEventDescriptor maybeEvent =
- WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
- if (maybeEvent != null && maybeEvent.getEventType()
- == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
- // In serially replication, if we move a region to another RS and move it back, we may
- // read logs crossing two sections. We should break at REGION_CLOSE and push the first
- // section first in case of missing the middle section belonging to the other RS.
- // In a worker thread, if we can push the first log of a region, we can push all logs
- // in the same region without waiting until we read a close marker because next time
- // we read logs in this region, it must be a new section and not adjacent with this
- // region. Mark it negative.
- lastPosition.put(key, -entry.getKey().getLogSeqNum());
- break;
- }
- }
- }
- boolean totalBufferTooLarge = false;
- // don't replicate if the log entries have already been consumed by the cluster
- if (replicationEndpoint.canReplicateToSameCluster()
- || !entry.getKey().getClusterIds().contains(peerClusterId)) {
- // Remove all KVs that should not be replicated
- entry = walEntryFilter.filter(entry);
- WALEdit edit = null;
- WALKey logKey = null;
- if (entry != null) {
- edit = entry.getEdit();
- logKey = entry.getKey();
- }
-
- if (edit != null && edit.size() != 0) {
- // Mark that the current cluster has the change
- logKey.addClusterId(clusterId);
- currentNbOperations += countDistinctRowKeys(edit);
- entries.add(entry);
- int delta = (int)entry.getEdit().heapSize() + calculateTotalSizeOfStoreFiles(edit);
- currentSize += delta;
- totalBufferTooLarge = acquireBufferQuota(delta);
- } else {
- metrics.incrLogEditsFiltered();
- }
- }
- // Stop if too many entries or too big
- // FIXME check the relationship between single wal group and overall
- if (totalBufferTooLarge || currentSize >= replicationQueueSizeCapacity
- || entries.size() >= replicationQueueNbCapacity) {
- break;
- }
-
- try {
- entry = this.repLogReader.readNextAndSetPosition();
- } catch (IOException ie) {
- LOG.debug("Break on IOE: " + ie.getMessage());
- break;
- }
- }
- metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
- if (currentWALisBeingWrittenTo) {
- return false;
- }
- // If we didn't get anything and the queue has an object, it means we
- // hit the end of the file for sure
- return seenEntries == 0 && processEndOfFile();
- }
-
- /**
- * Calculate the total size of all the store files
- * @param edit edit to count row keys from
- * @return the total size of the store files
- */
- private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
- List<Cell> cells = edit.getCells();
- int totalStoreFilesSize = 0;
-
- int totalCells = edit.size();
- for (int i = 0; i < totalCells; i++) {
- if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
- try {
- BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
- List<StoreDescriptor> stores = bld.getStoresList();
- int totalStores = stores.size();
- for (int j = 0; j < totalStores; j++) {
- totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes();
- }
- } catch (IOException e) {
- LOG.error("Failed to deserialize bulk load entry from wal edit. "
- + "Size of HFiles part of cell will not be considered in replication "
- + "request size calculation.", e);
- }
- }
- }
- return totalStoreFilesSize;
- }
-
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = peerClusterZnode;
if (peerId.contains("-")) {
@@ -921,206 +653,6 @@ public class ReplicationSource extends Thread
}
}
- /**
- * Poll for the next path
- * @return true if a path was obtained, false if not
- */
- protected boolean getNextPath() {
- try {
- if (this.currentPath == null) {
- this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
- metrics.decrSizeOfLogQueue();
- if (this.currentPath != null) {
- // For recovered queue: must use peerClusterZnode since peerId is a parsed value
- manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
- this.replicationQueueInfo.isQueueRecovered());
- if (LOG.isTraceEnabled()) {
- LOG.trace("New log: " + this.currentPath);
- }
- }
- }
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while reading edits", e);
- }
- return this.currentPath != null;
- }
-
- /**
- * Open a reader on the current path
- *
- * @param sleepMultiplier by how many times the default sleeping time is augmented
- * @return true if we should continue with that file, false if we are over with it
- */
- protected boolean openReader(int sleepMultiplier) {
- try {
- try {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Opening log " + this.currentPath);
- }
- this.reader = repLogReader.openReader(this.currentPath);
- } catch (FileNotFoundException fnfe) {
- if (this.replicationQueueInfo.isQueueRecovered()) {
- // We didn't find the log in the archive directory, look if it still
- // exists in the dead RS folder (there could be a chain of failures
- // to look at)
- List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
- LOG.info("NB dead servers : " + deadRegionServers.size());
- final Path walDir = FSUtils.getWALRootDir(conf);
- for (String curDeadServerName : deadRegionServers) {
- final Path deadRsDirectory = new Path(walDir,
- DefaultWALProvider.getWALDirectoryName(curDeadServerName));
- Path[] locs = new Path[] {
- new Path(deadRsDirectory, currentPath.getName()),
- new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
- currentPath.getName()),
- };
- for (Path possibleLogLocation : locs) {
- LOG.info("Possible location " + possibleLogLocation.toUri().toString());
- if (manager.getFs().exists(possibleLogLocation)) {
- // We found the right new location
- LOG.info("Log " + this.currentPath + " still exists at " +
- possibleLogLocation);
- // Breaking here will make us sleep since reader is null
- // TODO why don't we need to set currentPath and call openReader here?
- return true;
- }
- }
- }
- // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
- // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
- if (stopper instanceof ReplicationSyncUp.DummyServer) {
- // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
- // area rather than to the wal area for a particular region server.
- FileStatus[] rss = fs.listStatus(manager.getLogDir());
- for (FileStatus rs : rss) {
- Path p = rs.getPath();
- FileStatus[] logs = fs.listStatus(p);
- for (FileStatus log : logs) {
- p = new Path(p, log.getPath().getName());
- if (p.getName().equals(currentPath.getName())) {
- currentPath = p;
- LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
- // Open the log at the new location
- this.openReader(sleepMultiplier);
- return true;
- }
- }
- }
- }
-
- // TODO What happens if the log was missing from every single location?
- // Although we need to check a couple of times as the log could have
- // been moved by the master between the checks
- // It can also happen if a recovered queue wasn't properly cleaned,
- // such that the znode pointing to a log exists but the log was
- // deleted a long time ago.
- // For the moment, we'll throw the IO and processEndOfFile
- throw new IOException("File from recovered queue is " +
- "nowhere to be found", fnfe);
- } else {
- // If the log was archived, continue reading from there
- Path archivedLogLocation =
- new Path(manager.getOldLogDir(), currentPath.getName());
- if (manager.getFs().exists(archivedLogLocation)) {
- currentPath = archivedLogLocation;
- LOG.info("Log " + this.currentPath + " was moved to " +
- archivedLogLocation);
- // Open the log at the new location
- this.openReader(sleepMultiplier);
-
- }
- // TODO What happens the log is missing in both places?
- }
- }
- } catch (LeaseNotRecoveredException lnre) {
- // HBASE-15019 the WAL was not closed due to some hiccup.
- LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
- recoverLease(conf, currentPath);
- this.reader = null;
- } catch (IOException ioe) {
- if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
- LOG.warn(peerClusterZnode + " Got: ", ioe);
- this.reader = null;
- if (ioe.getCause() instanceof NullPointerException) {
- // Workaround for race condition in HDFS-4380
- // which throws a NPE if we open a file before any data node has the most recent block
- // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
- LOG.warn("Got NPE opening reader, will retry.");
- } else if (sleepMultiplier >= maxRetriesMultiplier) {
- // TODO Need a better way to determine if a file is really gone but
- // TODO without scanning all logs dir
- LOG.warn("Waited too long for this file, considering dumping");
- return !processEndOfFile();
- }
- }
- return true;
- }
-
- private void recoverLease(final Configuration conf, final Path path) {
- try {
- final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
- FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
- fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
- @Override
- public boolean progress() {
- LOG.debug("recover WAL lease: " + path);
- return isWorkerActive();
- }
- });
- } catch (IOException e) {
- LOG.warn("unable to recover lease for WAL: " + path, e);
- }
- }
-
- /*
- * Checks whether the current log file is empty, and it is not a recovered queue. This is to
- * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
- * trying to read the log file and get EOFException. In case of a recovered queue the last log
- * file may be empty, and we don't want to retry that.
- */
- private boolean isCurrentLogEmpty() {
- return (this.repLogReader.getPosition() == 0 &&
- !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
- }
-
- /**
- * Count the number of different row keys in the given edit because of mini-batching. We assume
- * that there's at least one Cell in the WALEdit.
- * @param edit edit to count row keys from
- * @return number of different row keys
- */
- private int countDistinctRowKeys(WALEdit edit) {
- List<Cell> cells = edit.getCells();
- int distinctRowKeys = 1;
- int totalHFileEntries = 0;
- Cell lastCell = cells.get(0);
-
- int totalCells = edit.size();
- for (int i = 0; i < totalCells; i++) {
- // Count HFiles to be replicated
- if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
- try {
- BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
- List<StoreDescriptor> stores = bld.getStoresList();
- int totalStores = stores.size();
- for (int j = 0; j < totalStores; j++) {
- totalHFileEntries += stores.get(j).getStoreFileList().size();
- }
- } catch (IOException e) {
- LOG.error("Failed to deserialize bulk load entry from wal edit. "
- + "Then its hfiles count will not be added into metric.");
- }
- }
-
- if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
- distinctRowKeys++;
- }
- lastCell = cells.get(i);
- }
- currentNbHFiles += totalHFileEntries;
- return distinctRowKeys + totalHFileEntries;
- }
-
private void checkBandwidthChangeAndResetThrottler() {
long peerBandwidth = getCurrentBandwidth();
if (peerBandwidth != currentBandwidth) {
@@ -1133,16 +665,24 @@ public class ReplicationSource extends Thread
/**
* Do the shipping logic
- * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
- * written to when this method was called
*/
- protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries,
- Map<String, Long> lastPositionsForSerialScope) {
+ protected void shipEdits(WALEntryBatch entryBatch) {
+ List<Entry> entries = entryBatch.getWalEntries();
+ long lastReadPosition = entryBatch.getLastWalPosition();
+ currentPath = entryBatch.getLastWalPath();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
- LOG.warn("Was given 0 edits to ship");
+ if (lastLoggedPosition != lastReadPosition) {
+ // Save positions to meta table before zk.
+ updateSerialRepPositions(entryBatch.getLastSeqIds());
+ updateLogPosition(lastReadPosition);
+ // if there was nothing to ship and it's not an error
+ // set "ageOfLastShippedOp" to <now> to indicate that we're current
+ metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
+ }
return;
}
+ int currentSize = (int) entryBatch.getHeapSize();
while (isWorkerActive()) {
try {
checkBandwidthChangeAndResetThrottler();
@@ -1183,7 +723,7 @@ public class ReplicationSource extends Thread
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
- if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
+ if (this.lastLoggedPosition != lastReadPosition) {
//Clean up hfile references
int size = entries.size();
for (int i = 0; i < size; i++) {
@@ -1191,27 +731,18 @@ public class ReplicationSource extends Thread
}
// Save positions to meta table before zk.
- try {
- MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
- lastPositionsForSerialScope);
- } catch (IOException e) {
- LOG.error("updateReplicationPositions fail", e);
- stopper.stop("updateReplicationPositions fail");
- }
+ updateSerialRepPositions(entryBatch.getLastSeqIds());
//Log and clean up WAL logs
- manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
- this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
- currentWALisBeingWrittenTo);
- this.lastLoggedPosition = this.repLogReader.getPosition();
+ updateLogPosition(lastReadPosition);
}
if (throttler.isEnabled()) {
throttler.addPushSize(currentSize);
}
totalReplicatedEdits.addAndGet(entries.size());
- totalReplicatedOperations.addAndGet(currentNbOperations);
+ totalReplicatedOperations.addAndGet(entryBatch.getNbOperations());
// FIXME check relationship between wal group and overall
- metrics.shipBatch(currentNbOperations, currentSize, currentNbHFiles);
+ metrics.shipBatch(entryBatch.getNbOperations(), currentSize, entryBatch.getNbHFiles());
metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
walGroupId);
if (LOG.isTraceEnabled()) {
@@ -1230,62 +761,20 @@ public class ReplicationSource extends Thread
}
}
- /**
- * If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means
- * we're done! Else we'll just continue to try reading the log file
- * @return true if we're done with the current file, false if we should continue trying to read
- * from it
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
- justification = "Yeah, this is how it works")
- protected boolean processEndOfFile() {
- // We presume this means the file we're reading is closed.
- if (this.queue.size() != 0) {
- // -1 means the wal wasn't closed cleanly.
- final long trailerSize = this.repLogReader.currentTrailerSize();
- final long currentPosition = this.repLogReader.getPosition();
- FileStatus stat = null;
- try {
- stat = fs.getFileStatus(this.currentPath);
- } catch (IOException exception) {
- LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0 ? "was not" : "was") + " closed cleanly"
- + ", stats: " + getStats());
- metrics.incrUnknownFileLengthForClosedWAL();
- }
- if (stat != null) {
- if (trailerSize < 0) {
- if (currentPosition < stat.getLen()) {
- final long skippedBytes = stat.getLen() - currentPosition;
- LOG.info("Reached the end of WAL file '" + currentPath + "'. It was not closed cleanly, so we did not parse " + skippedBytes + " bytes of data.");
- metrics.incrUncleanlyClosedWALs();
- metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
- }
- } else if (currentPosition + trailerSize < stat.getLen()){
- LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + ", which is too far away from reported file length " + stat.getLen() +
- ". Restarting WAL reading (see HBASE-15983 for details). stats: " + getStats());
- repLogReader.setPosition(0);
- metrics.incrRestartedWALReading();
- metrics.incrRepeatedFileBytes(currentPosition);
- return false;
- }
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
- + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
- }
- this.currentPath = null;
- this.repLogReader.finishCurrentFile();
- this.reader = null;
- metrics.incrCompletedWAL();
- return true;
- } else if (this.replicationQueueInfo.isQueueRecovered()) {
- LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
- + peerClusterZnode);
- metrics.incrCompletedRecoveryQueue();
- workerRunning = false;
- return true;
+ private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
+ try {
+ MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
+ lastPositionsForSerialScope);
+ } catch (IOException e) {
+ LOG.error("updateReplicationPositions fail", e);
+ stopper.stop("updateReplicationPositions fail");
}
- return false;
+ }
+
+ private void updateLogPosition(long lastReadPosition) {
+ manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
+ this.replicationQueueInfo.isQueueRecovered(), false);
+ lastLoggedPosition = lastReadPosition;
}
public void startup() {
@@ -1302,6 +791,134 @@ public class ReplicationSource extends Thread
Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
+ peerClusterZnode, handler);
workerThreads.put(walGroupId, this);
+
+ long startPosition = 0;
+
+ if (this.replicationQueueInfo.isQueueRecovered()) {
+ startPosition = getRecoveredQueueStartPos(startPosition);
+ int numRetries = 0;
+ while (numRetries <= maxRetriesMultiplier) {
+ try {
+ locateRecoveredPaths();
+ break;
+ } catch (IOException e) {
+ LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
+ numRetries++;
+ }
+ }
+ }
+
+ startWALReaderThread(n, handler, startPosition);
+ }
+
+ // If this is a recovered queue, the queue is already full and the first log
+ // normally has a position (unless the RS failed between 2 logs)
+ private long getRecoveredQueueStartPos(long startPosition) {
+ try {
+ startPosition =
+ (replicationQueues.getLogPosition(peerClusterZnode, this.queue.peek().getName()));
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
+ + startPosition);
+ }
+ } catch (ReplicationException e) {
+ terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
+ }
+ return startPosition;
+ }
+
+ // start a background thread to read and batch entries
+ private void startWALReaderThread(String threadName, Thread.UncaughtExceptionHandler handler,
+ long startPosition) {
+ ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
+ new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
+ ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
+ entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue,
+ startPosition, fs, conf, readerFilter, metrics);
+ Threads.setDaemonThreadRunning(entryReader, threadName
+ + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
+ handler);
+ }
+
+ // Loops through the recovered queue and tries to find the location of each log
+ // this is necessary because the logs may have moved before recovery was initiated
+ private void locateRecoveredPaths() throws IOException {
+ boolean hasPathChanged = false;
+ PriorityBlockingQueue<Path> newPaths =
+ new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
+ pathsLoop: for (Path path : queue) {
+ if (fs.exists(path)) { // still in same location, don't need to do anything
+ newPaths.add(path);
+ continue;
+ }
+ // Path changed - try to find the right path.
+ hasPathChanged = true;
+ if (stopper instanceof ReplicationSyncUp.DummyServer) {
+ // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
+ // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
+ Path newPath = getReplSyncUpPath(path);
+ newPaths.add(newPath);
+ continue;
+ } else {
+ // See if Path exists in the dead RS folder (there could be a chain of failures
+ // to look at)
+ List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
+ LOG.info("NB dead servers : " + deadRegionServers.size());
+ final Path walDir = FSUtils.getWALRootDir(conf);
+ for (String curDeadServerName : deadRegionServers) {
+ final Path deadRsDirectory =
+ new Path(walDir, DefaultWALProvider.getWALDirectoryName(curDeadServerName));
+ Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
+ deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT), path.getName()) };
+ for (Path possibleLogLocation : locs) {
+ LOG.info("Possible location " + possibleLogLocation.toUri().toString());
+ if (manager.getFs().exists(possibleLogLocation)) {
+ // We found the right new location
+ LOG.info("Log " + path + " still exists at " + possibleLogLocation);
+ newPaths.add(possibleLogLocation);
+ continue pathsLoop;
+ }
+ }
+ }
+ // didn't find a new location
+ LOG.error(
+ String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
+ newPaths.add(path);
+ }
+ }
+
+ if (hasPathChanged) {
+ if (newPaths.size() != queue.size()) { // this shouldn't happen
+ LOG.error("Recovery queue size is incorrect");
+ throw new IOException("Recovery queue size error");
+ }
+ // put the correct locations in the queue
+ // since this is a recovered queue with no new incoming logs,
+ // there shouldn't be any concurrency issues
+ queue.clear();
+ for (Path path : newPaths) {
+ queue.add(path);
+ }
+ }
+ }
+
+ // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
+ // area rather than to the wal area for a particular region server.
+ private Path getReplSyncUpPath(Path path) throws IOException {
+ FileStatus[] rss = fs.listStatus(manager.getLogDir());
+ for (FileStatus rs : rss) {
+ Path p = rs.getPath();
+ FileStatus[] logs = fs.listStatus(p);
+ for (FileStatus log : logs) {
+ p = new Path(p, log.getPath().getName());
+ if (p.getName().equals(path.getName())) {
+ LOG.info("Log " + p.getName() + " found at " + p);
+ return p;
+ }
+ }
+ }
+ LOG.error("Didn't find path for: " + path.getName());
+ return path;
}
public Path getCurrentPath() {
@@ -1309,7 +926,7 @@ public class ReplicationSource extends Thread
}
public long getCurrentPosition() {
- return this.repLogReader.getPosition();
+ return this.lastLoggedPosition;
}
private boolean isWorkerActive() {
@@ -1324,27 +941,20 @@ public class ReplicationSource extends Thread
LOG.error("Closing worker for wal group " + this.walGroupId
+ " because an error occurred: " + reason, cause);
}
+ entryReader.interrupt();
+ Threads.shutdown(entryReader, sleepForRetries);
this.interrupt();
Threads.shutdown(this, sleepForRetries);
LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
}
public void setWorkerRunning(boolean workerRunning) {
+ entryReader.setReaderRunning(workerRunning);
this.workerRunning = workerRunning;
}
- /**
- * @param size delta size for grown buffer
- * @return true if we should clear buffer and push all
- */
- private boolean acquireBufferQuota(int size) {
- return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
- }
-
- private void releaseBufferQuota() {
- totalBufferUsed.addAndGet(-currentSize);
- currentSize = 0;
- entries.clear();
+ private void releaseBufferQuota(int size) {
+ totalBufferUsed.addAndGet(-size);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
new file mode 100644
index 0000000..6f1c641
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -0,0 +1,471 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+/**
+ * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ReplicationSourceWALReaderThread extends Thread {
+ private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReaderThread.class);
+
+ private PriorityBlockingQueue<Path> logQueue;
+ private FileSystem fs;
+ private Configuration conf;
+ private BlockingQueue<WALEntryBatch> entryBatchQueue;
+ // max (heap) size of each batch - multiply by number of batches in queue to get total
+ private long replicationBatchSizeCapacity;
+ // max count of each batch - multiply by number of batches in queue to get total
+ private int replicationBatchCountCapacity;
+ // position in the WAL to start reading at
+ private long currentPosition;
+ private WALEntryFilter filter;
+ private long sleepForRetries;
+ //Indicates whether this particular worker is running
+ private boolean isReaderRunning = true;
+ private ReplicationQueueInfo replicationQueueInfo;
+ private int maxRetriesMultiplier;
+ private MetricsSource metrics;
+
+ private AtomicLong totalBufferUsed;
+ private long totalBufferQuota;
+
+ /**
+ * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
+ * entries, and puts them on a batch queue.
+ * @param manager replication manager
+ * @param replicationQueueInfo
+ * @param logQueue The WAL queue to read off of
+ * @param startPosition position in the first WAL to start reading from
+ * @param fs the files system to use
+ * @param conf configuration to use
+ * @param filter The filter to use while reading
+ * @param metrics replication metrics
+ */
+ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
+ ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue<Path> logQueue,
+ long startPosition,
+ FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) {
+ this.replicationQueueInfo = replicationQueueInfo;
+ this.logQueue = logQueue;
+ this.currentPosition = startPosition;
+ this.fs = fs;
+ this.conf = conf;
+ this.filter = filter;
+ this.replicationBatchSizeCapacity =
+ this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
+ this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
+ // memory used will be batchSizeCapacity * (nb.batches + 1)
+ // the +1 is for the current thread reading before placing onto the queue
+ int batchCount = conf.getInt("replication.source.nb.batches", 1);
+ this.totalBufferUsed = manager.getTotalBufferUsed();
+ this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+ HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+ this.sleepForRetries =
+ this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
+ this.maxRetriesMultiplier =
+ this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
+ this.metrics = metrics;
+ this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
+ LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode()
+ + ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId()
+ + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
+ + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
+ + ", replicationBatchQueueCapacity=" + batchCount);
+ }
+
+ @Override
+ public void run() {
+ int sleepMultiplier = 1;
+ while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
+ try (WALEntryStream entryStream =
+ new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
+ while (isReaderRunning()) { // loop here to keep reusing stream while we can
+ if (!checkQuota()) {
+ continue;
+ }
+ WALEntryBatch batch = null;
+ while (entryStream.hasNext()) {
+ if (batch == null) {
+ batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
+ }
+ Entry entry = entryStream.next();
+ if (updateSerialReplPos(batch, entry)) {
+ batch.lastWalPosition = entryStream.getPosition();
+ break;
+ }
+ entry = filterEntry(entry);
+ if (entry != null) {
+ WALEdit edit = entry.getEdit();
+ if (edit != null && !edit.isEmpty()) {
+ long entrySize = getEntrySize(entry);
+ batch.addEntry(entry);
+ updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
+ boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
+ // Stop if too many entries or too big
+ if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
+ || batch.getNbEntries() >= replicationBatchCountCapacity) {
+ break;
+ }
+ }
+ }
+ }
+ if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(String.format("Read %s WAL entries eligible for replication",
+ batch.getNbEntries()));
+ }
+ entryBatchQueue.put(batch);
+ sleepMultiplier = 1;
+ } else { // got no entries and didn't advance position in WAL
+ LOG.trace("Didn't read any new entries from WAL");
+ if (replicationQueueInfo.isQueueRecovered()) {
+ // we're done with queue recovery, shut ourself down
+ setReaderRunning(false);
+ // shuts down shipper thread immediately
+ entryBatchQueue.put(batch != null ? batch
+ : new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()));
+ } else {
+ Thread.sleep(sleepForRetries);
+ }
+ }
+ currentPosition = entryStream.getPosition();
+ entryStream.reset(); // reuse stream
+ }
+ } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
+ if (sleepMultiplier < maxRetriesMultiplier) {
+ LOG.debug("Failed to read stream of replication entries: " + e);
+ sleepMultiplier++;
+ } else {
+ LOG.error("Failed to read stream of replication entries", e);
+ }
+ Threads.sleep(sleepForRetries * sleepMultiplier);
+ } catch (InterruptedException e) {
+ LOG.trace("Interrupted while sleeping between WAL reads");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ //returns false if we've already exceeded the global quota
+ private boolean checkQuota() {
+ // try not to go over total quota
+ if (totalBufferUsed.get() > totalBufferQuota) {
+ Threads.sleep(sleepForRetries);
+ return false;
+ }
+ return true;
+ }
+
+ private Entry filterEntry(Entry entry) {
+ Entry filtered = filter.filter(entry);
+ if (entry != null && filtered == null) {
+ metrics.incrLogEditsFiltered();
+ }
+ return filtered;
+ }
+
+ /**
+ * @return true if we should stop reading because we're at REGION_CLOSE
+ */
+ private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException {
+ if (entry.hasSerialReplicationScope()) {
+ String key = Bytes.toString(entry.getKey().getEncodedRegionName());
+ batch.setLastPosition(key, entry.getKey().getSequenceId());
+ if (!entry.getEdit().getCells().isEmpty()) {
+ WALProtos.RegionEventDescriptor maybeEvent =
+ WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
+ if (maybeEvent != null && maybeEvent
+ .getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
+ // In serially replication, if we move a region to another RS and move it back, we may
+ // read logs crossing two sections. We should break at REGION_CLOSE and push the first
+ // section first in case of missing the middle section belonging to the other RS.
+ // In a worker thread, if we can push the first log of a region, we can push all logs
+ // in the same region without waiting until we read a close marker because next time
+ // we read logs in this region, it must be a new section and not adjacent with this
+ // region. Mark it negative.
+ batch.setLastPosition(key, -entry.getKey().getSequenceId());
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
+ * batch to become available
+ * @return A batch of entries, along with the position in the log after reading the batch
+ * @throws InterruptedException if interrupted while waiting
+ */
+ public WALEntryBatch take() throws InterruptedException {
+ return entryBatchQueue.take();
+ }
+
+ private long getEntrySize(Entry entry) {
+ WALEdit edit = entry.getEdit();
+ return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
+ }
+
+ private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
+ WALEdit edit = entry.getEdit();
+ if (edit != null && !edit.isEmpty()) {
+ batch.incrementHeapSize(entrySize);
+ Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
+ batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
+ batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
+ }
+ batch.lastWalPosition = entryPosition;
+ }
+
+ /**
+ * Count the number of different row keys in the given edit because of mini-batching. We assume
+ * that there's at least one Cell in the WALEdit.
+ * @param edit edit to count row keys from
+ * @return number of different row keys and HFiles
+ */
+ private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
+ List<Cell> cells = edit.getCells();
+ int distinctRowKeys = 1;
+ int totalHFileEntries = 0;
+ Cell lastCell = cells.get(0);
+
+ int totalCells = edit.size();
+ for (int i = 0; i < totalCells; i++) {
+ // Count HFiles to be replicated
+ if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
+ try {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
+ List<StoreDescriptor> stores = bld.getStoresList();
+ int totalStores = stores.size();
+ for (int j = 0; j < totalStores; j++) {
+ totalHFileEntries += stores.get(j).getStoreFileList().size();
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ + "Then its hfiles count will not be added into metric.");
+ }
+ }
+
+ if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
+ distinctRowKeys++;
+ }
+ lastCell = cells.get(i);
+ }
+
+ Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
+ return result;
+ }
+
+ /**
+ * Calculate the total size of all the store files
+ * @param edit edit to count row keys from
+ * @return the total size of the store files
+ */
+ private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
+ List<Cell> cells = edit.getCells();
+ int totalStoreFilesSize = 0;
+
+ int totalCells = edit.size();
+ for (int i = 0; i < totalCells; i++) {
+ if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
+ try {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
+ List<StoreDescriptor> stores = bld.getStoresList();
+ int totalStores = stores.size();
+ for (int j = 0; j < totalStores; j++) {
+ totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes();
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ + "Size of HFiles part of cell will not be considered in replication "
+ + "request size calculation.",
+ e);
+ }
+ }
+ }
+ return totalStoreFilesSize;
+ }
+
+ /**
+ * @param size delta size for grown buffer
+ * @return true if we should clear buffer and push all
+ */
+ private boolean acquireBufferQuota(long size) {
+ return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
+ }
+
+ /**
+ * @return whether the reader thread is running
+ */
+ public boolean isReaderRunning() {
+ return isReaderRunning && !isInterrupted();
+ }
+
+ /**
+ * @param readerRunning the readerRunning to set
+ */
+ public void setReaderRunning(boolean readerRunning) {
+ this.isReaderRunning = readerRunning;
+ }
+
+ /**
+ * Holds a batch of WAL entries to replicate, along with some statistics
+ *
+ */
+ static class WALEntryBatch {
+ private List<Entry> walEntries;
+ // last WAL that was read
+ private Path lastWalPath;
+ // position in WAL of last entry in this batch
+ private long lastWalPosition = 0;
+ // number of distinct row keys in this batch
+ private int nbRowKeys = 0;
+ // number of HFiles
+ private int nbHFiles = 0;
+ // heap size of data we need to replicate
+ private long heapSize = 0;
+ // save the last sequenceid for each region if the table has serial-replication scope
+ private Map<String, Long> lastSeqIds = new HashMap<>();
+
+ /**
+ * @param walEntries
+ * @param lastWalPath Path of the WAL the last entry in this batch was read from
+ * @param lastWalPosition Position in the WAL the last entry in this batch was read from
+ */
+ private WALEntryBatch(int maxNbEntries, Path lastWalPath) {
+ this.walEntries = new ArrayList<>(maxNbEntries);
+ this.lastWalPath = lastWalPath;
+ }
+
+ public void addEntry(Entry entry) {
+ walEntries.add(entry);
+ }
+
+ /**
+ * @return the WAL Entries.
+ */
+ public List<Entry> getWalEntries() {
+ return walEntries;
+ }
+
+ /**
+ * @return the path of the last WAL that was read.
+ */
+ public Path getLastWalPath() {
+ return lastWalPath;
+ }
+
+ /**
+ * @return the position in the last WAL that was read.
+ */
+ public long getLastWalPosition() {
+ return lastWalPosition;
+ }
+
+ public int getNbEntries() {
+ return walEntries.size();
+ }
+
+ /**
+ * @return the number of distinct row keys in this batch
+ */
+ public int getNbRowKeys() {
+ return nbRowKeys;
+ }
+
+ /**
+ * @return the number of HFiles in this batch
+ */
+ public int getNbHFiles() {
+ return nbHFiles;
+ }
+
+ /**
+ * @return total number of operations in this batch
+ */
+ public int getNbOperations() {
+ return getNbRowKeys() + getNbHFiles();
+ }
+
+ /**
+ * @return the heap size of this batch
+ */
+ public long getHeapSize() {
+ return heapSize;
+ }
+
+ /**
+ * @return the last sequenceid for each region if the table has serial-replication scope
+ */
+ public Map<String, Long> getLastSeqIds() {
+ return lastSeqIds;
+ }
+
+ private void incrementNbRowKeys(int increment) {
+ nbRowKeys += increment;
+ }
+
+ private void incrementNbHFiles(int increment) {
+ nbHFiles += increment;
+ }
+
+ private void incrementHeapSize(long increment) {
+ heapSize += increment;
+ }
+
+ private void setLastPosition(String region, Long sequenceId) {
+ getLastSeqIds().put(region, sequenceId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
deleted file mode 100644
index 3558d08..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALFactory;
-
-import java.io.IOException;
-
-/**
- * Wrapper class around WAL to help manage the implementation details
- * such as compression.
- */
-@InterfaceAudience.Private
-public class ReplicationWALReaderManager {
-
- private static final Log LOG = LogFactory.getLog(ReplicationWALReaderManager.class);
- private final FileSystem fs;
- private final Configuration conf;
- private long position = 0;
- private Reader reader;
- private Path lastPath;
-
- /**
- * Creates the helper but doesn't open any file
- * Use setInitialPosition after using the constructor if some content needs to be skipped
- * @param fs
- * @param conf
- */
- public ReplicationWALReaderManager(FileSystem fs, Configuration conf) {
- this.fs = fs;
- this.conf = conf;
- }
-
- /**
- * Opens the file at the current position
- * @param path
- * @return an WAL reader.
- * @throws IOException
- */
- public Reader openReader(Path path) throws IOException {
- // Detect if this is a new file, if so get a new reader else
- // reset the current reader so that we see the new data
- if (this.reader == null || !this.lastPath.equals(path)) {
- this.closeReader();
- this.reader = WALFactory.createReader(this.fs, path, this.conf);
- this.lastPath = path;
- } else {
- try {
- this.reader.reset();
- } catch (NullPointerException npe) {
- throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
- }
- }
- return this.reader;
- }
-
- /**
- * Get the next entry, returned and also added in the array
- * @return a new entry or null
- * @throws IOException
- */
- public Entry readNextAndSetPosition() throws IOException {
- Entry entry = this.reader.next();
- // Store the position so that in the future the reader can start
- // reading from here. If the above call to next() throws an
- // exception, the position won't be changed and retry will happen
- // from the last known good position
- this.position = this.reader.getPosition();
- // We need to set the CC to null else it will be compressed when sent to the sink
- if (entry != null) {
- entry.setCompressionContext(null);
- }
- return entry;
- }
-
- /**
- * Advance the reader to the current position
- * @throws IOException
- */
- public void seek() throws IOException {
- if (this.position != 0) {
- this.reader.seek(this.position);
- }
- }
-
- /**
- * Get the position that we stopped reading at
- * @return current position, cannot be negative
- */
- public long getPosition() {
- return this.position;
- }
-
- public void setPosition(long pos) {
- this.position = pos;
- }
-
- public long currentTrailerSize() {
- long size = -1L;
- if (reader instanceof ProtobufLogReader) {
- final ProtobufLogReader pblr = (ProtobufLogReader)reader;
- size = pblr.trailerSize();
- }
- return size;
- }
-
- /**
- * Close the current reader
- * @throws IOException
- */
- public void closeReader() throws IOException {
- if (this.reader != null) {
- this.reader.close();
- this.reader = null;
- }
- }
-
- /**
- * Tell the helper to reset internal state
- */
- void finishCurrentFile() {
- this.position = 0;
- try {
- this.closeReader();
- } catch (IOException e) {
- LOG.warn("Unable to close reader", e);
- }
- }
-
-}