You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/09/24 00:34:16 UTC
[16/47] hbase git commit: HBASE-14004 [Replication] Inconsistency
between Memstore and WAL may result in data in remote cluster that is not in
the origin
HBASE-14004 [Replication] Inconsistency between Memstore and WAL may result in data in remote cluster that is not in the origin
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4341c3f5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4341c3f5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4341c3f5
Branch: refs/heads/HBASE-18467
Commit: 4341c3f554cf85e73d3bb536bdda33a83f463f16
Parents: f7a986c
Author: zhangduo <zh...@apache.org>
Authored: Thu Sep 14 17:26:36 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Sep 15 19:22:00 2017 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/util/Threads.java | 12 +-
.../hbase/regionserver/HRegionServer.java | 71 +++++-----
.../hbase/regionserver/ReplicationService.java | 17 ++-
.../hbase/regionserver/wal/AbstractFSWAL.java | 35 ++++-
.../hbase/regionserver/wal/AsyncFSWAL.java | 6 +-
.../hadoop/hbase/regionserver/wal/FSHLog.java | 12 +-
.../RecoveredReplicationSource.java | 11 +-
.../replication/regionserver/Replication.java | 15 ++-
.../regionserver/ReplicationSource.java | 22 ++--
.../ReplicationSourceInterface.java | 14 +-
.../regionserver/ReplicationSourceManager.java | 71 +++++-----
.../ReplicationSourceWALReader.java | 25 ++--
.../regionserver/WALEntryStream.java | 130 ++++++++-----------
.../regionserver/WALFileLengthProvider.java | 34 +++++
.../hadoop/hbase/wal/AbstractFSWALProvider.java | 2 +-
.../hadoop/hbase/wal/DisabledWALProvider.java | 8 +-
.../hbase/wal/RegionGroupingProvider.java | 2 +-
.../java/org/apache/hadoop/hbase/wal/WAL.java | 6 +-
.../org/apache/hadoop/hbase/wal/WALFactory.java | 11 +-
.../apache/hadoop/hbase/wal/WALProvider.java | 12 +-
.../replication/ReplicationSourceDummy.java | 13 +-
.../replication/TestReplicationSource.java | 5 +-
.../TestReplicationSourceManager.java | 10 +-
.../regionserver/TestWALEntryStream.java | 87 ++++++++-----
.../apache/hadoop/hbase/wal/IOTestProvider.java | 2 +-
25 files changed, 365 insertions(+), 268 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
index 35bf2b7..b39a5e8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
@@ -59,7 +59,7 @@ public class Threads {
* @param t thread to run
* @return Returns the passed Thread <code>t</code>.
*/
- public static Thread setDaemonThreadRunning(final Thread t) {
+ public static <T extends Thread> T setDaemonThreadRunning(T t) {
return setDaemonThreadRunning(t, t.getName());
}
@@ -69,8 +69,7 @@ public class Threads {
* @param name new name
* @return Returns the passed Thread <code>t</code>.
*/
- public static Thread setDaemonThreadRunning(final Thread t,
- final String name) {
+ public static <T extends Thread> T setDaemonThreadRunning(T t, String name) {
return setDaemonThreadRunning(t, name, null);
}
@@ -78,12 +77,11 @@ public class Threads {
* Utility method that sets name, daemon status and starts passed thread.
* @param t thread to frob
* @param name new name
- * @param handler A handler to set on the thread. Pass null if want to
- * use default handler.
+ * @param handler A handler to set on the thread. Pass null if want to use default handler.
* @return Returns the passed Thread <code>t</code>.
*/
- public static Thread setDaemonThreadRunning(final Thread t,
- final String name, final UncaughtExceptionHandler handler) {
+ public static <T extends Thread> T setDaemonThreadRunning(T t, String name,
+ UncaughtExceptionHandler handler) {
t.setName(name);
if (handler != null) {
t.setUncaughtExceptionHandler(handler);
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 62987c0..f648c2f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1586,7 +1586,7 @@ public class HRegionServer extends HasThread implements
// Save it in a file, this will allow to see if we crash
ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
- this.walFactory = setupWALAndReplication();
+ setupWALAndReplication();
// Init in here rather than in constructor after thread name has been set
this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
@@ -1855,13 +1855,12 @@ public class HRegionServer extends HasThread implements
/**
* Setup WAL log and replication if enabled.
* Replication setup is done in here because it wants to be hooked up to WAL.
- * @return A WAL instance.
* @throws IOException
*/
- private WALFactory setupWALAndReplication() throws IOException {
+ private void setupWALAndReplication() throws IOException {
// TODO Replication make assumptions here based on the default filesystem impl
- final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- final String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
+ Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
Path logDir = new Path(walRootDir, logName);
if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir);
@@ -1875,7 +1874,7 @@ public class HRegionServer extends HasThread implements
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir);
// listeners the wal factory will add to wals it creates.
- final List<WALActionsListener> listeners = new ArrayList<>();
+ List<WALActionsListener> listeners = new ArrayList<>();
listeners.add(new MetricsWAL());
if (this.replicationSourceHandler != null &&
this.replicationSourceHandler.getWALActionsListener() != null) {
@@ -1883,7 +1882,21 @@ public class HRegionServer extends HasThread implements
listeners.add(this.replicationSourceHandler.getWALActionsListener());
}
- return new WALFactory(conf, listeners, serverName.toString());
+ // There is a cyclic dependency between ReplicationSourceHandler and WALFactory.
+ // We use WALActionsListener to get the newly rolled WALs, so we need to get the
+ // WALActionsListeners from ReplicationSourceHandler before constructing WALFactory. And then
+ // ReplicationSourceHandler need to use WALFactory get the length of the wal file being written.
+ // So we here we need to construct WALFactory first, and then pass it to the initialize method
+ // of ReplicationSourceHandler.
+ WALFactory factory = new WALFactory(conf, listeners, serverName.toString());
+ this.walFactory = factory;
+ if (this.replicationSourceHandler != null) {
+ this.replicationSourceHandler.initialize(this, walFs, logDir, oldLogDir, factory);
+ }
+ if (this.replicationSinkHandler != null &&
+ this.replicationSinkHandler != this.replicationSourceHandler) {
+ this.replicationSinkHandler.initialize(this, walFs, logDir, oldLogDir, factory);
+ }
}
public MetricsRegionServer getRegionServerMetrics() {
@@ -2898,7 +2911,7 @@ public class HRegionServer extends HasThread implements
/**
* Load the replication service objects, if any
*/
- static private void createNewReplicationInstance(Configuration conf,
+ private static void createNewReplicationInstance(Configuration conf,
HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir) throws IOException{
if ((server instanceof HMaster) && (!LoadBalancer.isTablesOnMaster(conf) ||
@@ -2908,47 +2921,41 @@ public class HRegionServer extends HasThread implements
// read in the name of the source replication class from the config file.
String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
- HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+ HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
// read in the name of the sink replication class from the config file.
String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
- HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+ HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
// If both the sink and the source class names are the same, then instantiate
// only one object.
if (sourceClassname.equals(sinkClassname)) {
- server.replicationSourceHandler = (ReplicationSourceService)
- newReplicationInstance(sourceClassname,
- conf, server, walFs, walDir, oldWALDir);
- server.replicationSinkHandler = (ReplicationSinkService)
- server.replicationSourceHandler;
+ server.replicationSourceHandler =
+ (ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs,
+ walDir, oldWALDir);
+ server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
} else {
- server.replicationSourceHandler = (ReplicationSourceService)
- newReplicationInstance(sourceClassname,
- conf, server, walFs, walDir, oldWALDir);
- server.replicationSinkHandler = (ReplicationSinkService)
- newReplicationInstance(sinkClassname,
- conf, server, walFs, walDir, oldWALDir);
+ server.replicationSourceHandler =
+ (ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs,
+ walDir, oldWALDir);
+ server.replicationSinkHandler = (ReplicationSinkService) newReplicationInstance(sinkClassname,
+ conf, server, walFs, walDir, oldWALDir);
}
}
- static private ReplicationService newReplicationInstance(String classname,
- Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
- Path oldLogDir) throws IOException{
-
- Class<?> clazz = null;
+ private static ReplicationService newReplicationInstance(String classname, Configuration conf,
+ HRegionServer server, FileSystem walFs, Path logDir, Path oldLogDir) throws IOException {
+ Class<? extends ReplicationService> clazz = null;
try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- clazz = Class.forName(classname, true, classLoader);
+ clazz = Class.forName(classname, true, classLoader).asSubclass(ReplicationService.class);
} catch (java.lang.ClassNotFoundException nfe) {
throw new IOException("Could not find class for " + classname);
}
- // create an instance of the replication object.
- ReplicationService service = (ReplicationService)
- ReflectionUtils.newInstance(clazz, conf);
- service.initialize(server, walFs, logDir, oldLogDir);
- return service;
+ // create an instance of the replication object, but do not initialize it here as we need to use
+ // WALFactory when initializing.
+ return ReflectionUtils.newInstance(clazz, conf);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index d88450a..f3bc188 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -20,17 +20,17 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
/**
- * Gateway to Cluster Replication.
- * Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
- * One such application is a cross-datacenter
- * replication service that can keep two hbase clusters in sync.
+ * Gateway to Cluster Replication. Used by
+ * {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. One such application is a
+ * cross-datacenter replication service that can keep two hbase clusters in sync.
*/
@InterfaceAudience.Private
public interface ReplicationService {
@@ -39,9 +39,8 @@ public interface ReplicationService {
* Initializes the replication service object.
* @throws IOException
*/
- void initialize(
- Server rs, FileSystem fs, Path logdir, Path oldLogDir
- ) throws IOException;
+ void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir,
+ WALFileLengthProvider walFileLengthProvider) throws IOException;
/**
* Start replication services.
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 8b99676..8157108 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -17,9 +17,12 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
-import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.*;
+import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
+import com.lmax.disruptor.RingBuffer;
+
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.management.MemoryType;
@@ -29,6 +32,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -58,6 +62,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.DrainBarrier;
@@ -68,6 +73,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.NullScope;
@@ -75,9 +81,6 @@ import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.RingBuffer;
-
/**
* Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
* WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
@@ -105,7 +108,7 @@ import com.lmax.disruptor.RingBuffer;
* (Need to keep our own file lengths, not rely on HDFS).
*/
@InterfaceAudience.Private
-public abstract class AbstractFSWAL<W> implements WAL {
+public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
private static final Log LOG = LogFactory.getLog(AbstractFSWAL.class);
@@ -984,6 +987,28 @@ public abstract class AbstractFSWAL<W> implements WAL {
}
/**
+ * if the given {@code path} is being written currently, then return its length.
+ * <p>
+ * This is used by replication to prevent replicating unacked log entries. See
+ * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
+ */
+ @Override
+ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
+ rollWriterLock.lock();
+ try {
+ Path currentPath = getOldPath();
+ if (path.equals(currentPath)) {
+ W writer = this.writer;
+ return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
+ } else {
+ return OptionalLong.empty();
+ }
+ } finally {
+ rollWriterLock.unlock();
+ }
+ }
+
+ /**
* NOTE: This append, at a time that is usually after this call returns, starts an mvcc
* transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
* time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 7e91f8c..42183ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -707,8 +707,10 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@Override
protected void doShutdown() throws IOException {
waitForSafePoint();
- this.writer.close();
- this.writer = null;
+ if (this.writer != null) {
+ this.writer.close();
+ this.writer = null;
+ }
closeExecutor.shutdown();
IOException error = new IOException("WAL has been closed");
syncFutures.forEach(f -> f.done(f.getTxid(), error));
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 7298137..7e0fc37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
@@ -46,8 +45,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -62,6 +65,9 @@ import org.apache.htrace.NullScope;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/**
* The default implementation of FSWAL.
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 3594868..248a52a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -50,13 +50,12 @@ public class RecoveredReplicationSource extends ReplicationSource {
private String actualPeerId;
@Override
- public void init(final Configuration conf, final FileSystem fs,
- final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
- final ReplicationPeers replicationPeers, final Stoppable stopper,
- final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
- final MetricsSource metrics) throws IOException {
+ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+ ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
+ String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
super.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerClusterZnode,
- clusterId, replicationEndpoint, metrics);
+ clusterId, replicationEndpoint, walFileLengthProvider, metrics);
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 9fd1a87..d26f253 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -85,6 +86,7 @@ public class Replication extends WALActionsListener.Base implements
private int statsThreadPeriod;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
+
/**
* Instantiate the replication management (if rep is enabled).
* @param server Hosting server
@@ -93,9 +95,8 @@ public class Replication extends WALActionsListener.Base implements
* @param oldLogDir directory where logs are archived
* @throws IOException
*/
- public Replication(final Server server, final FileSystem fs,
- final Path logDir, final Path oldLogDir) throws IOException{
- initialize(server, fs, logDir, oldLogDir);
+ public Replication(Server server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException {
+ initialize(server, fs, logDir, oldLogDir, p -> OptionalLong.empty());
}
/**
@@ -104,8 +105,8 @@ public class Replication extends WALActionsListener.Base implements
public Replication() {
}
- public void initialize(final Server server, final FileSystem fs,
- final Path logDir, final Path oldLogDir) throws IOException {
+ public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
+ WALFileLengthProvider walFileLengthProvider) throws IOException {
this.server = server;
this.conf = this.server.getConfiguration();
this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
@@ -144,8 +145,8 @@ public class Replication extends WALActionsListener.Base implements
throw new IOException("Could not read cluster id", ke);
}
this.replicationManager =
- new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
- conf, this.server, fs, logDir, oldLogDir, clusterId);
+ new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker, conf,
+ this.server, fs, logDir, oldLogDir, clusterId, walFileLengthProvider);
this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 6c96852..d16a68f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -18,8 +18,6 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -62,6 +60,8 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
/**
* Class that handles the source of a replication stream.
@@ -73,7 +73,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
* A stream is considered down when we cannot contact a region server on the
* peer cluster for more than 55 seconds by default.
* </p>
- *
*/
@InterfaceAudience.Private
public class ReplicationSource extends Thread implements ReplicationSourceInterface {
@@ -123,6 +122,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private ReplicationThrottler throttler;
private long defaultBandwidth;
private long currentBandwidth;
+ private WALFileLengthProvider walFileLengthProvider;
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
new ConcurrentHashMap<>();
@@ -147,12 +147,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
* @throws IOException
*/
@Override
- public void init(final Configuration conf, final FileSystem fs,
- final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
- final ReplicationPeers replicationPeers, final Stoppable stopper,
- final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
- final MetricsSource metrics)
- throws IOException {
+ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+ ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
+ String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
this.stopper = stopper;
this.conf = HBaseConfiguration.create(conf);
this.waitOnEndpointSeconds =
@@ -181,6 +179,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
+ this.walFileLengthProvider = walFileLengthProvider;
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
+ ", currentBandwidth=" + this.currentBandwidth);
}
@@ -560,4 +559,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
totalReplicatedEdits.addAndGet(entries.size());
totalBufferUsed.addAndGet(-batchSize);
}
+
+ @Override
+ public WALFileLengthProvider getWALFileLengthProvider() {
+ return walFileLengthProvider;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index da89aba..066f799 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -53,11 +53,10 @@ public interface ReplicationSourceInterface {
* @param clusterId
* @throws IOException
*/
- public void init(final Configuration conf, final FileSystem fs,
- final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
- final ReplicationPeers replicationPeers, final Stoppable stopper,
- final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
- final MetricsSource metrics) throws IOException;
+ void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+ ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
+ String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
/**
* Add a log to the list of logs to replicate
@@ -147,6 +146,11 @@ public interface ReplicationSourceInterface {
ReplicationSourceManager getSourceManager();
/**
+ * @return the wal file length provider
+ */
+ WALFileLengthProvider getWALFileLengthProvider();
+
+ /**
* Try to throttle when the peer config with a bandwidth
* @param batchSize entries size will be pushed
* @throws InterruptedException
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 5b54ce0..609274f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -19,9 +19,6 @@
package org.apache.hadoop.hbase.replication.regionserver;
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -31,7 +28,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -40,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -68,10 +65,13 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* This class is responsible to manage all the replication
* sources. There are two classes of sources:
@@ -116,12 +116,12 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Path logDir;
// Path to the wal archive
private final Path oldLogDir;
+ private final WALFileLengthProvider walFileLengthProvider;
// The number of ms that we wait before moving znodes, HBASE-3596
private final long sleepBeforeFailover;
// Homemade executer service for replication
private final ThreadPoolExecutor executor;
- private final Random rand;
private final boolean replicationForBulkLoadDataEnabled;
private Connection connection;
@@ -141,10 +141,10 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param oldLogDir the directory where old logs are archived
* @param clusterId
*/
- public ReplicationSourceManager(final ReplicationQueues replicationQueues,
- final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
- final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
- final Path oldLogDir, final UUID clusterId) throws IOException {
+ public ReplicationSourceManager(ReplicationQueues replicationQueues,
+ ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
+ Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
+ WALFileLengthProvider walFileLengthProvider) throws IOException {
//CopyOnWriteArrayList is thread-safe.
//Generally, reading is more than modifying.
this.sources = new CopyOnWriteArrayList<>();
@@ -162,6 +162,7 @@ public class ReplicationSourceManager implements ReplicationListener {
this.sleepBeforeFailover =
conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
this.clusterId = clusterId;
+ this.walFileLengthProvider = walFileLengthProvider;
this.replicationTracker.registerListener(this);
this.replicationPeers.getAllPeerIds();
// It's preferable to failover 1 RS at a time, but with good zk servers
@@ -175,8 +176,7 @@ public class ReplicationSourceManager implements ReplicationListener {
tfb.setNameFormat("ReplicationExecutor-%d");
tfb.setDaemon(true);
this.executor.setThreadFactory(tfb.build());
- this.rand = new Random();
- this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
+ this.latestPaths = new HashSet<Path>();
replicationForBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
@@ -243,7 +243,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* Adds a normal source per registered peer cluster and tries to process all
* old region server wal queues
*/
- protected void init() throws IOException, ReplicationException {
+ void init() throws IOException, ReplicationException {
for (String id : this.replicationPeers.getConnectedPeerIds()) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
@@ -267,13 +267,13 @@ public class ReplicationSourceManager implements ReplicationListener {
* @return the source that was created
* @throws IOException
*/
- protected ReplicationSourceInterface addSource(String id) throws IOException,
- ReplicationException {
+ @VisibleForTesting
+ ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
- ReplicationSourceInterface src =
- getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
- this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
+ ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this,
+ this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer,
+ walFileLengthProvider);
synchronized (this.walsById) {
this.sources.add(src);
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
@@ -330,7 +330,8 @@ public class ReplicationSourceManager implements ReplicationListener {
* Get a copy of the wals of the first source on this rs
* @return a sorted set of wal names
*/
- protected Map<String, Map<String, SortedSet<String>>> getWALs() {
+ @VisibleForTesting
+ Map<String, Map<String, SortedSet<String>>> getWALs() {
return Collections.unmodifiableMap(walsById);
}
@@ -338,7 +339,8 @@ public class ReplicationSourceManager implements ReplicationListener {
* Get a copy of the wals of the recovered sources on this rs
* @return a sorted set of wal names
*/
- protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
+ @VisibleForTesting
+ Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
return Collections.unmodifiableMap(walsByIdRecoveredQueues);
}
@@ -364,12 +366,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @return the normal source for the give peer if it exists, otherwise null.
*/
public ReplicationSourceInterface getSource(String peerId) {
- for (ReplicationSourceInterface source: getSources()) {
- if (source.getPeerId().equals(peerId)) {
- return source;
- }
- }
- return null;
+ return getSources().stream().filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null);
}
@VisibleForTesting
@@ -466,12 +463,11 @@ public class ReplicationSourceManager implements ReplicationListener {
* @return the created source
* @throws IOException
*/
- protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
- final FileSystem fs, final ReplicationSourceManager manager,
- final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
- final Server server, final String peerId, final UUID clusterId,
- final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
- throws IOException {
+ private ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs,
+ ReplicationSourceManager manager, ReplicationQueues replicationQueues,
+ ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId,
+ ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer,
+ WALFileLengthProvider walFileLengthProvider) throws IOException {
RegionServerCoprocessorHost rsServerHost = null;
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
@@ -507,8 +503,8 @@ public class ReplicationSourceManager implements ReplicationListener {
MetricsSource metrics = new MetricsSource(peerId);
// init replication source
- src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
- clusterId, replicationEndpoint, metrics);
+ src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId,
+ replicationEndpoint, walFileLengthProvider, metrics);
// init replication endpoint
replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
@@ -674,7 +670,8 @@ public class ReplicationSourceManager implements ReplicationListener {
// Wait a bit before transferring the queues, we may be shutting down.
// This sleep may not be enough in some cases.
try {
- Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
+ Thread.sleep(sleepBeforeFailover +
+ (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting before transferring a queue.");
Thread.currentThread().interrupt();
@@ -688,7 +685,7 @@ public class ReplicationSourceManager implements ReplicationListener {
List<String> peers = rq.getUnClaimedQueueIds(rsZnode);
while (peers != null && !peers.isEmpty()) {
Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode,
- peers.get(rand.nextInt(peers.size())));
+ peers.get(ThreadLocalRandom.current().nextInt(peers.size())));
long sleep = sleepBeforeFailover/2;
if (peer != null) {
newQueues.put(peer.getFirst(), peer.getSecond());
@@ -748,7 +745,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// enqueue sources
ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
- server, peerId, this.clusterId, peerConfig, peer);
+ server, peerId, this.clusterId, peerConfig, peer, walFileLengthProvider);
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
// see removePeer
synchronized (oldsources) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index bb5abe9..bb993c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -37,18 +37,18 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
/**
* Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
@@ -127,8 +127,8 @@ public class ReplicationSourceWALReader extends Thread {
public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
- try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, fs, conf, currentPosition, source.getSourceMetrics())) {
+ try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition,
+ source.getWALFileLengthProvider(), source.getSourceMetrics())) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!checkQuota()) {
continue;
@@ -147,7 +147,7 @@ public class ReplicationSourceWALReader extends Thread {
currentPosition = entryStream.getPosition();
entryStream.reset(); // reuse stream
}
- } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
+ } catch (IOException e) { // stream related
if (sleepMultiplier < maxRetriesMultiplier) {
LOG.debug("Failed to read stream of replication entries: " + e);
sleepMultiplier++;
@@ -202,8 +202,9 @@ public class ReplicationSourceWALReader extends Thread {
// if we get an EOF due to a zero-length log, and there are other logs in queue
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
// enabled, then dump the log
- private void handleEofException(Exception e) {
- if (e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) {
+ private void handleEofException(IOException e) {
+ if (e instanceof EOFException ||
+ e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) {
try {
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 54511ae..3be4ca4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.OptionalLong;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
@@ -50,7 +50,7 @@ import org.apache.hadoop.ipc.RemoteException;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entry> {
+class WALEntryStream implements Closeable {
private static final Log LOG = LogFactory.getLog(WALEntryStream.class);
private Reader reader;
@@ -59,24 +59,11 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
private Entry currentEntry;
// position after reading current entry
private long currentPosition = 0;
- private PriorityBlockingQueue<Path> logQueue;
- private FileSystem fs;
- private Configuration conf;
- private MetricsSource metrics;
-
- /**
- * Create an entry stream over the given queue
- * @param logQueue the queue of WAL paths
- * @param fs {@link FileSystem} to use to create {@link Reader} for this stream
- * @param conf {@link Configuration} to use to create {@link Reader} for this stream
- * @param metrics replication metrics
- * @throws IOException
- */
- public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
- MetricsSource metrics)
- throws IOException {
- this(logQueue, fs, conf, 0, metrics);
- }
+ private final PriorityBlockingQueue<Path> logQueue;
+ private final FileSystem fs;
+ private final Configuration conf;
+ private final WALFileLengthProvider walFileLengthProvider;
+ private final MetricsSource metrics;
/**
* Create an entry stream over the given queue at the given start position
@@ -88,52 +75,41 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
* @throws IOException
*/
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
- long startPosition, MetricsSource metrics) throws IOException {
+ long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+ throws IOException {
this.logQueue = logQueue;
this.fs = fs;
this.conf = conf;
this.currentPosition = startPosition;
+ this.walFileLengthProvider = walFileLengthProvider;
this.metrics = metrics;
}
/**
* @return true if there is another WAL {@link Entry}
- * @throws WALEntryStreamRuntimeException if there was an Exception while reading
*/
- @Override
- public boolean hasNext() {
+ public boolean hasNext() throws IOException {
if (currentEntry == null) {
- try {
- tryAdvanceEntry();
- } catch (Exception e) {
- throw new WALEntryStreamRuntimeException(e);
- }
+ tryAdvanceEntry();
}
return currentEntry != null;
}
/**
* @return the next WAL entry in this stream
- * @throws WALEntryStreamRuntimeException if there was an IOException
+ * @throws IOException
* @throws NoSuchElementException if no more entries in the stream.
*/
- @Override
- public Entry next() {
- if (!hasNext()) throw new NoSuchElementException();
+ public Entry next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
Entry save = currentEntry;
currentEntry = null; // gets reloaded by hasNext()
return save;
}
/**
- * Not supported.
- */
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- /**
* {@inheritDoc}
*/
@Override
@@ -142,14 +118,6 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
}
/**
- * @return the iterator over WAL entries in the queue.
- */
- @Override
- public Iterator<Entry> iterator() {
- return this;
- }
-
- /**
* @return the position of the last Entry returned by next()
*/
public long getPosition() {
@@ -195,24 +163,27 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
private void tryAdvanceEntry() throws IOException {
if (checkReader()) {
- readNextEntryAndSetPosition();
- if (currentEntry == null) { // no more entries in this log file - see if log was rolled
- if (logQueue.size() > 1) { // log was rolled
- // Before dequeueing, we should always get one more attempt at reading.
- // This is in case more entries came in after we opened the reader,
- // and a new log was enqueued while we were reading. See HBASE-6758
- resetReader();
- readNextEntryAndSetPosition();
- if (currentEntry == null) {
- if (checkAllBytesParsed()) { // now we're certain we're done with this log file
- dequeueCurrentLog();
- if (openNextLog()) {
- readNextEntryAndSetPosition();
- }
+ boolean beingWritten = readNextEntryAndSetPosition();
+ if (currentEntry == null && !beingWritten) {
+ // no more entries in this log file, and the file is already closed, i.e, rolled
+ // Before dequeueing, we should always get one more attempt at reading.
+ // This is in case more entries came in after we opened the reader, and the log is rolled
+ // while we were reading. See HBASE-6758
+ resetReader();
+ readNextEntryAndSetPosition();
+ if (currentEntry == null) {
+ if (checkAllBytesParsed()) { // now we're certain we're done with this log file
+ dequeueCurrentLog();
+ if (openNextLog()) {
+ readNextEntryAndSetPosition();
}
}
- } // no other logs, we've simply hit the end of the current open log. Do nothing
+ }
}
+ // if currentEntry != null then just return
+ // if currentEntry == null but the file is still being written, then we should not switch to
+ // the next log either, just return here and try next time to see if there are more entries in
+ // the current file
}
// do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
}
@@ -270,15 +241,30 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
metrics.decrSizeOfLogQueue();
}
- private void readNextEntryAndSetPosition() throws IOException {
+ /**
+ * Returns whether the file is opened for writing.
+ */
+ private boolean readNextEntryAndSetPosition() throws IOException {
Entry readEntry = reader.next();
long readerPos = reader.getPosition();
+ OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
+ if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
+ // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
+ // data, so we need to make sure that we do not read beyond the committed file length.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The provider tells us the valid length for " + currentPath + " is " +
+ fileLength.getAsLong() + ", but we have advanced to " + readerPos);
+ }
+ resetReader();
+ return true;
+ }
if (readEntry != null) {
metrics.incrLogEditsRead();
metrics.incrLogReadInBytes(readerPos - currentPosition);
}
currentEntry = readEntry; // could be null
setPosition(readerPos);
+ return fileLength.isPresent();
}
private void closeReader() throws IOException {
@@ -301,7 +287,9 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
Path nextPath = logQueue.peek();
if (nextPath != null) {
openReader(nextPath);
- if (reader != null) return true;
+ if (reader != null) {
+ return true;
+ }
}
return false;
}
@@ -408,14 +396,4 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
}
return size;
}
-
- @InterfaceAudience.Private
- public static class WALEntryStreamRuntimeException extends RuntimeException {
- private static final long serialVersionUID = -6298201811259982568L;
-
- public WALEntryStreamRuntimeException(Exception e) {
- super(e);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
new file mode 100644
index 0000000..010fa69
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.OptionalLong;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Used by replication to prevent replicating unacked log entries. See
+ * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
+ */
+@InterfaceAudience.Private
+@FunctionalInterface
+public interface WALFileLengthProvider {
+
+ OptionalLong getLogFileSizeIfBeingWritten(Path path);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 9c80fe6..1a81b17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -115,7 +115,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
}
@Override
- public List<WAL> getWALs() throws IOException {
+ public List<WAL> getWALs() {
if (wal == null) {
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index c805ff3..a6d43d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -66,7 +67,7 @@ class DisabledWALProvider implements WALProvider {
}
@Override
- public List<WAL> getWALs() throws IOException {
+ public List<WAL> getWALs() {
List<WAL> wals = new ArrayList<>(1);
wals.add(disabled);
return wals;
@@ -232,6 +233,11 @@ class DisabledWALProvider implements WALProvider {
public String toString() {
return "WAL disabled.";
}
+
+ @Override
+ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
+ return OptionalLong.empty();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index 95b7dae..ab3a7d9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -169,7 +169,7 @@ public class RegionGroupingProvider implements WALProvider {
}
@Override
- public List<WAL> getWALs() throws IOException {
+ public List<WAL> getWALs() {
List<WAL> wals = new ArrayList<>();
for (WALProvider provider : cached.values()) {
wals.addAll(provider.getWALs());
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index eede937..9ec58ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.wal;
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
@@ -35,6 +33,8 @@ import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/**
* A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
@@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public interface WAL extends Closeable {
+public interface WAL extends Closeable, WALFileLengthProvider {
/**
* Registers WALActionsListener
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index aaa828f..efb8e2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.List;
+import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
@@ -38,6 +39,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@@ -63,7 +65,7 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
* Alternatively, you may provide a custom implementation of {@link WALProvider} by class name.
*/
@InterfaceAudience.Private
-public class WALFactory {
+public class WALFactory implements WALFileLengthProvider {
private static final Log LOG = LogFactory.getLog(WALFactory.class);
@@ -230,7 +232,7 @@ public class WALFactory {
}
}
- public List<WAL> getWALs() throws IOException {
+ public List<WAL> getWALs() {
return provider.getWALs();
}
@@ -450,4 +452,9 @@ public class WALFactory {
public final WALProvider getMetaWALProvider() {
return this.metaProvider.get();
}
+
+ @Override
+ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
+ return getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path)).filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index ffcfcd4..c38f419 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -60,7 +60,7 @@ public interface WALProvider {
/** @return the List of WALs that are used by this server
*/
- List<WAL> getWALs() throws IOException;
+ List<WAL> getWALs();
/**
* persist outstanding WALs to storage and stop accepting new appends.
@@ -76,18 +76,20 @@ public interface WALProvider {
*/
void close() throws IOException;
+ interface WriterBase extends Closeable {
+ long getLength();
+ }
+
// Writers are used internally. Users outside of the WAL should be relying on the
// interface provided by WAL.
- interface Writer extends Closeable {
+ interface Writer extends WriterBase {
void sync() throws IOException;
void append(WAL.Entry entry) throws IOException;
- long getLength();
}
- interface AsyncWriter extends Closeable {
+ interface AsyncWriter extends WriterBase {
CompletableFuture<Long> sync();
void append(WAL.Entry entry);
- long getLength();
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index e23e15b..bfe17b5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -42,16 +43,17 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
String peerClusterId;
Path currentPath;
MetricsSource metrics;
+ WALFileLengthProvider walFileLengthProvider;
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
- UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
- throws IOException {
-
+ UUID clusterId, ReplicationEndpoint replicationEndpoint,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
this.manager = manager;
this.peerClusterId = peerClusterId;
this.metrics = metrics;
+ this.walFileLengthProvider = walFileLengthProvider;
}
@Override
@@ -135,4 +137,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override
public void postShipEdits(List<Entry> entries, int batchSize) {
}
+
+ @Override
+ public WALFileLengthProvider getWALFileLengthProvider() {
+ return walFileLengthProvider;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index 56a5bdc..ebb1bf8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.IOException;
+import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -166,8 +167,8 @@ public class TestReplicationSource {
testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
- source.init(testConf, null, manager, null, mockPeers, null, "testPeer",
- null, replicationEndpoint, null);
+ source.init(testConf, null, manager, null, mockPeers, null, "testPeer", null,
+ replicationEndpoint, p -> OptionalLong.empty(), null);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(new Runnable() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 9804df4..3934e05 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -61,9 +61,6 @@ import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -97,6 +94,9 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
/**
* An abstract class that tests ReplicationSourceManager. Classes that extend this class should
@@ -646,8 +646,8 @@ public abstract class TestReplicationSourceManager {
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
- UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
- throws IOException {
+ UUID clusterId, ReplicationEndpoint replicationEndpoint,
+ WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
throw new IOException("Failing deliberately");
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 5f3452a..d65054c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
+import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
@@ -42,13 +43,10 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -56,6 +54,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -67,11 +66,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
import org.mockito.Mockito;
-import org.mockito.runners.MockitoJUnitRunner;
-@RunWith(MockitoJUnitRunner.class)
@Category({ ReplicationTests.class, LargeTests.class })
public class TestWALEntryStream {
@@ -84,8 +80,13 @@ public class TestWALEntryStream {
private static final byte[] qualifier = Bytes.toBytes("qualifier");
private static final HRegionInfo info =
new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
- private static final HTableDescriptor htd = new HTableDescriptor(tableName);
- private static NavigableMap<byte[], Integer> scopes;
+ private static final NavigableMap<byte[], Integer> scopes = getScopes();
+
+ private static NavigableMap<byte[], Integer> getScopes() {
+ NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ scopes.put(family, 1);
+ return scopes;
+ }
private WAL log;
PriorityBlockingQueue<Path> walQueue;
@@ -103,10 +104,6 @@ public class TestWALEntryStream {
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
- scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
- for (byte[] fam : htd.getFamiliesKeys()) {
- scopes.put(fam, 0);
- }
}
@AfterClass
@@ -151,10 +148,10 @@ public class TestWALEntryStream {
log.rollWriter();
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
int i = 0;
- for (WAL.Entry e : entryStream) {
- assertNotNull(e);
+ while (entryStream.hasNext()) {
+ assertNotNull(entryStream.next());
i++;
}
assertEquals(nbRows, i);
@@ -176,10 +173,9 @@ public class TestWALEntryStream {
@Test
public void testAppendsWithRolls() throws Exception {
appendToLog();
-
long oldPos;
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
// There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.next();
@@ -196,8 +192,8 @@ public class TestWALEntryStream {
appendToLog();
- try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
+ log, new MetricsSource("1"))) {
// Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
@@ -210,8 +206,8 @@ public class TestWALEntryStream {
log.rollWriter();
appendToLog();
- try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
+ log, new MetricsSource("1"))) {
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
@@ -236,7 +232,7 @@ public class TestWALEntryStream {
appendToLog("1");
appendToLog("2");// 2
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
assertEquals("1", getRow(entryStream.next()));
appendToLog("3"); // 3 - comes in after reader opened
@@ -261,7 +257,7 @@ public class TestWALEntryStream {
public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1");
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
// some new entries come in while we're streaming
@@ -284,7 +280,7 @@ public class TestWALEntryStream {
long lastPosition = 0;
appendToLog("1");
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
appendToLog("2");
appendToLog("3");
@@ -292,7 +288,7 @@ public class TestWALEntryStream {
}
// next stream should picks up where we left off
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) {
assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next()));
assertFalse(entryStream.hasNext()); // done
@@ -309,14 +305,14 @@ public class TestWALEntryStream {
long lastPosition = 0;
appendEntriesToLog(3);
// read only one element
- try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition,
+ log, new MetricsSource("1"))) {
entryStream.next();
lastPosition = entryStream.getPosition();
}
// there should still be two more entries from where we left off
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) {
assertNotNull(entryStream.next());
assertNotNull(entryStream.next());
assertFalse(entryStream.hasNext());
@@ -327,7 +323,7 @@ public class TestWALEntryStream {
@Test
public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
assertFalse(entryStream.hasNext());
}
}
@@ -338,7 +334,7 @@ public class TestWALEntryStream {
// get ending position
long position;
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+ new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
entryStream.next();
entryStream.next();
entryStream.next();
@@ -351,6 +347,7 @@ public class TestWALEntryStream {
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
+ when(source.getWALFileLengthProvider()).thenReturn(log);
ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
walQueue, 0, getDummyFilter(), source);
Path walPath = walQueue.peek();
@@ -425,10 +422,6 @@ public class TestWALEntryStream {
};
}
- private ReplicationQueueInfo getQueueInfo() {
- return new ReplicationQueueInfo("1");
- }
-
class PathWatcher extends WALActionsListener.Base {
Path currentPath;
@@ -440,4 +433,30 @@ public class TestWALEntryStream {
}
}
+ @Test
+ public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
+ appendToLog("1");
+ appendToLog("2");
+ long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
+ AtomicLong fileLength = new AtomicLong(size - 1);
+ try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0,
+ p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"))) {
+ assertTrue(entryStream.hasNext());
+ assertNotNull(entryStream.next());
+ // can not get log 2
+ assertFalse(entryStream.hasNext());
+ Thread.sleep(1000);
+ entryStream.reset();
+ // still can not get log 2
+ assertFalse(entryStream.hasNext());
+
+ // can get log 2 now
+ fileLength.set(size);
+ entryStream.reset();
+ assertTrue(entryStream.hasNext());
+ assertNotNull(entryStream.next());
+
+ assertFalse(entryStream.hasNext());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
index 041d8ae..944a4f1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
@@ -107,7 +107,7 @@ public class IOTestProvider implements WALProvider {
}
@Override
- public List<WAL> getWALs() throws IOException {
+ public List<WAL> getWALs() {
List<WAL> wals = new ArrayList<>(1);
wals.add(log);
return wals;