You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/21 03:37:31 UTC
[24/32] hbase git commit: HBASE-19079 Support setting up two clusters
with A and S stat
HBASE-19079 Support setting up two clusters with A and S stat
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d1088b99
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d1088b99
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d1088b99
Branch: refs/heads/HBASE-19064
Commit: d1088b9946ff1ee09456cde9095d4c67c71d7791
Parents: 8721f0e
Author: zhangduo <zh...@apache.org>
Authored: Tue Apr 10 22:35:19 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon May 21 11:36:05 2018 +0800
----------------------------------------------------------------------
.../replication/ReplicationPeerManager.java | 5 +-
...ransitPeerSyncReplicationStateProcedure.java | 2 +-
.../hbase/regionserver/wal/DualAsyncFSWAL.java | 14 ++
.../hadoop/hbase/regionserver/wal/WALUtil.java | 25 ++-
.../hbase/replication/ChainWALEntryFilter.java | 28 +--
.../ReplaySyncReplicationWALCallable.java | 27 ++-
.../SyncReplicationPeerInfoProviderImpl.java | 6 +-
.../hadoop/hbase/wal/AbstractFSWALProvider.java | 10 +-
.../hbase/wal/SyncReplicationWALProvider.java | 94 ++++++---
.../org/apache/hadoop/hbase/wal/WALEdit.java | 8 +-
.../org/apache/hadoop/hbase/wal/WALFactory.java | 2 +-
.../replication/TestReplicationAdmin.java | 33 +--
.../regionserver/wal/TestWALDurability.java | 2 +
.../replication/SyncReplicationTestBase.java | 185 +++++++++++++++++
.../hbase/replication/TestSyncReplication.java | 207 -------------------
.../replication/TestSyncReplicationActive.java | 64 ++++++
.../replication/TestSyncReplicationStandBy.java | 96 +++++++++
17 files changed, 521 insertions(+), 287 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 41dd6e3..229549e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -68,8 +68,9 @@ public class ReplicationPeerManager {
private final ImmutableMap<SyncReplicationState, EnumSet<SyncReplicationState>>
allowedTransition = Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE,
- EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), SyncReplicationState.STANDBY,
- EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), SyncReplicationState.DOWNGRADE_ACTIVE,
+ EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE, SyncReplicationState.STANDBY),
+ SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE),
+ SyncReplicationState.DOWNGRADE_ACTIVE,
EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index cc51890..5da2b0c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -171,7 +171,7 @@ public class TransitPeerSyncReplicationStateProcedure
}
return Flow.HAS_MORE_STATE;
case REPLAY_REMOTE_WAL_IN_PEER:
- // TODO: replay remote wal when transiting from S to DA.
+ addChildProcedure(new RecoverStandbyProcedure(peerId));
setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
return Flow.HAS_MORE_STATE;
case REOPEN_ALL_REGIONS_IN_PEER:
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
index 0495337..a98567a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
@@ -38,6 +38,8 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
private final Path remoteWalDir;
+ private volatile boolean skipRemoteWal = false;
+
public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir,
String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
@@ -51,6 +53,9 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
@Override
protected AsyncWriter createWriterInstance(Path path) throws IOException {
AsyncWriter localWriter = super.createWriterInstance(path);
+ if (skipRemoteWal) {
+ return localWriter;
+ }
AsyncWriter remoteWriter;
boolean succ = false;
try {
@@ -64,4 +69,13 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
return CombinedAsyncWriter.create(CombinedAsyncWriter.Mode.SEQUENTIAL, remoteWriter,
localWriter);
}
+
+ // Allow temporarily skipping the creation of remote writer. When failing to write to the remote
+ // dfs cluster, we need to reopen the regions and switch to use the original wal writer. But we
+ // need to write a close marker when closing a region, and if it fails, the whole rs will abort.
+ // So here we need to skip the creation of remote writer and make it possible to write the region
+ // close marker.
+ public void skipRemoteWal() {
+ this.skipRemoteWal = true;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index 1b17adc..3b18253 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -20,11 +20,13 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.NavigableMap;
-
+import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -34,7 +36,9 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
@@ -179,4 +183,23 @@ public class WALUtil {
return conf.getLong("hbase.regionserver.hlog.blocksize",
CommonFSUtils.getDefaultBlockSize(fs, dir) * 2);
}
+
+ public static void filterCells(WALEdit edit, Function<Cell, Cell> mapper) {
+ ArrayList<Cell> cells = edit.getCells();
+ int size = cells.size();
+ int newSize = 0;
+ for (int i = 0; i < size; i++) {
+ Cell cell = mapper.apply(cells.get(i));
+ if (cell != null) {
+ cells.set(newSize, cell);
+ newSize++;
+ }
+ }
+ for (int i = size - 1; i >= newSize; i--) {
+ cells.remove(i);
+ }
+ if (newSize < size / 2) {
+ cells.trimToSize();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
index 6f2c764..2bb9811 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
@@ -21,11 +21,11 @@ package org.apache.hadoop.hbase.replication;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* A {@link WALEntryFilter} which contains multiple filters and applies them
@@ -82,22 +82,16 @@ public class ChainWALEntryFilter implements WALEntryFilter {
if (entry == null || cellFilters.length == 0) {
return;
}
- ArrayList<Cell> cells = entry.getEdit().getCells();
- int size = cells.size();
- for (int i = size - 1; i >= 0; i--) {
- Cell cell = cells.get(i);
- for (WALCellFilter filter : cellFilters) {
- cell = filter.filterCell(entry, cell);
- if (cell != null) {
- cells.set(i, cell);
- } else {
- cells.remove(i);
- break;
- }
+ WALUtil.filterCells(entry.getEdit(), c -> filterCell(entry, c));
+ }
+
+ private Cell filterCell(Entry entry, Cell cell) {
+ for (WALCellFilter filter : cellFilters) {
+ cell = filter.filterCell(entry, cell);
+ if (cell == null) {
+ break;
}
}
- if (cells.size() < size / 2) {
- cells.trimToSize();
- }
+ return cell;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
index 8dfe3a2..c9c5ef6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
@@ -21,21 +21,23 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -129,20 +131,31 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
}
}
+ // return whether we should include this entry.
+ private boolean filter(Entry entry) {
+ WALEdit edit = entry.getEdit();
+ WALUtil.filterCells(edit, c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY) ? null : c);
+ return !edit.isEmpty();
+ }
+
private List<Entry> readWALEntries(Reader reader) throws IOException {
List<Entry> entries = new ArrayList<>();
if (reader == null) {
return entries;
}
long size = 0;
- Entry entry = reader.next();
- while (entry != null) {
- entries.add(entry);
- size += entry.getEdit().heapSize();
- if (size > batchSize) {
+ for (;;) {
+ Entry entry = reader.next();
+ if (entry == null) {
break;
}
- entry = reader.next();
+ if (filter(entry)) {
+ entries.add(entry);
+ size += entry.getEdit().heapSize();
+ if (size > batchSize) {
+ break;
+ }
+ }
}
return entries;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
index e4afc33..cb33dab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
@@ -54,8 +54,10 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
}
Pair<SyncReplicationState, SyncReplicationState> states =
peer.getSyncReplicationStateAndNewState();
- if (states.getFirst() == SyncReplicationState.ACTIVE &&
- states.getSecond() == SyncReplicationState.NONE) {
+ if ((states.getFirst() == SyncReplicationState.ACTIVE &&
+ states.getSecond() == SyncReplicationState.NONE) ||
+ (states.getFirst() == SyncReplicationState.DOWNGRADE_ACTIVE &&
+ states.getSecond() == SyncReplicationState.ACTIVE)) {
return Optional.of(Pair.newPair(peerId, peer.getPeerConfig().getRemoteWALDir()));
} else {
return Optional.empty();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 3eb8f8f..5a3fba3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -136,8 +136,16 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
walCopy = wal;
if (walCopy == null) {
walCopy = createWAL();
+ boolean succ = false;
+ try {
+ walCopy.init();
+ succ = true;
+ } finally {
+ if (!succ) {
+ walCopy.close();
+ }
+ }
wal = walCopy;
- walCopy.init();
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 54287fe..9cbb095 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -69,7 +69,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private final WALProvider provider;
private SyncReplicationPeerInfoProvider peerInfoProvider =
- new DefaultSyncReplicationPeerInfoProvider();
+ new DefaultSyncReplicationPeerInfoProvider();
private WALFactory factory;
@@ -83,7 +83,11 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private AtomicBoolean initialized = new AtomicBoolean(false);
- private final ConcurrentMap<String, DualAsyncFSWAL> peerId2WAL = new ConcurrentHashMap<>();
+ // when switching from A to DA, we will put a Optional.empty into this map if there is no WAL for
+ // the peer yet. When getting WAL from this map the caller should know that it should not use
+ // DualAsyncFSWAL any more.
+ private final ConcurrentMap<String, Optional<DualAsyncFSWAL>> peerId2WAL =
+ new ConcurrentHashMap<>();
private final KeyLocker<String> createLock = new KeyLocker<>();
@@ -123,18 +127,27 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
}
private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException {
- DualAsyncFSWAL wal = peerId2WAL.get(peerId);
- if (wal != null) {
- return wal;
+ Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId);
+ if (opt != null) {
+ return opt.orElse(null);
}
Lock lock = createLock.acquireLock(peerId);
try {
- wal = peerId2WAL.get(peerId);
- if (wal == null) {
- wal = createWAL(peerId, remoteWALDir);
- peerId2WAL.put(peerId, wal);
+ opt = peerId2WAL.get(peerId);
+ if (opt != null) {
+ return opt.orElse(null);
+ }
+ DualAsyncFSWAL wal = createWAL(peerId, remoteWALDir);
+ boolean succ = false;
+ try {
wal.init();
+ succ = true;
+ } finally {
+ if (!succ) {
+ wal.close();
+ }
}
+ peerId2WAL.put(peerId, Optional.of(wal));
return wal;
} finally {
lock.unlock();
@@ -146,18 +159,20 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
if (region == null) {
return provider.getWAL(null);
}
+ WAL wal = null;
Optional<Pair<String, String>> peerIdAndRemoteWALDir =
peerInfoProvider.getPeerIdAndRemoteWALDir(region);
if (peerIdAndRemoteWALDir.isPresent()) {
Pair<String, String> pair = peerIdAndRemoteWALDir.get();
- return getWAL(pair.getFirst(), pair.getSecond());
- } else {
- return provider.getWAL(region);
+ wal = getWAL(pair.getFirst(), pair.getSecond());
}
+ return wal != null ? wal : provider.getWAL(region);
}
private Stream<WAL> getWALStream() {
- return Streams.concat(peerId2WAL.values().stream(), provider.getWALs().stream());
+ return Streams.concat(
+ peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get),
+ provider.getWALs().stream());
}
@Override
@@ -169,12 +184,14 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
public void shutdown() throws IOException {
// save the last exception and rethrow
IOException failure = null;
- for (DualAsyncFSWAL wal : peerId2WAL.values()) {
- try {
- wal.shutdown();
- } catch (IOException e) {
- LOG.error("Shutdown WAL failed", e);
- failure = e;
+ for (Optional<DualAsyncFSWAL> wal : peerId2WAL.values()) {
+ if (wal.isPresent()) {
+ try {
+ wal.get().shutdown();
+ } catch (IOException e) {
+ LOG.error("Shutdown WAL failed", e);
+ failure = e;
+ }
}
}
provider.shutdown();
@@ -187,12 +204,14 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
public void close() throws IOException {
// save the last exception and rethrow
IOException failure = null;
- for (DualAsyncFSWAL wal : peerId2WAL.values()) {
- try {
- wal.close();
- } catch (IOException e) {
- LOG.error("Close WAL failed", e);
- failure = e;
+ for (Optional<DualAsyncFSWAL> wal : peerId2WAL.values()) {
+ if (wal.isPresent()) {
+ try {
+ wal.get().close();
+ } catch (IOException e) {
+ LOG.error("Close WAL failed", e);
+ failure = e;
+ }
}
}
provider.close();
@@ -208,8 +227,8 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
@Override
public long getLogFileSize() {
- return peerId2WAL.values().stream().mapToLong(DualAsyncFSWAL::getLogFileSize).sum() +
- provider.getLogFileSize();
+ return peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get)
+ .mapToLong(DualAsyncFSWAL::getLogFileSize).sum() + provider.getLogFileSize();
}
private void safeClose(WAL wal) {
@@ -231,10 +250,23 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
@Override
public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
SyncReplicationState to, int stage) {
- // TODO: stage 0
- if (from == SyncReplicationState.ACTIVE && to == SyncReplicationState.DOWNGRADE_ACTIVE &&
- stage == 1) {
- safeClose(peerId2WAL.remove(peerId));
+ if (from == SyncReplicationState.ACTIVE && to == SyncReplicationState.DOWNGRADE_ACTIVE) {
+ if (stage == 0) {
+ Lock lock = createLock.acquireLock(peerId);
+ try {
+ Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId);
+ if (opt != null) {
+ opt.ifPresent(DualAsyncFSWAL::skipRemoteWal);
+ } else {
+ // add a place holder to tell the getWAL caller do not use DualAsyncFSWAL any more.
+ peerId2WAL.put(peerId, Optional.empty());
+ }
+ } finally {
+ lock.unlock();
+ }
+ } else if (stage == 1) {
+ peerId2WAL.remove(peerId).ifPresent(this::safeClose);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
index 1d4dc1b..cd0e52e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
@@ -20,12 +20,11 @@ package org.apache.hadoop.hbase.wal;
import java.io.IOException;
import java.util.ArrayList;
-
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize;
@@ -33,9 +32,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
@@ -54,7 +53,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION,
HBaseInterfaceAudience.COPROC })
public class WALEdit implements HeapSize {
- private static final Logger LOG = LoggerFactory.getLogger(WALEdit.class);
// TODO: Get rid of this; see HBASE-8457
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index db0b1a2..cb848d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -133,7 +133,7 @@ public class WALFactory {
static WALProvider createProvider(Class<? extends WALProvider> clazz) throws IOException {
LOG.info("Instantiating WALProvider of type {}", clazz);
try {
- return clazz.newInstance();
+ return clazz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
LOG.debug("Exception details for failure to load WALProvider.", e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index 486ab51..ac98283 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@@ -259,9 +260,11 @@ public class TestReplicationAdmin {
TEST_UTIL.createTable(tableName, Bytes.toBytes("family"));
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
- String rootDir = "hdfs://srv1:9999/hbase";
+ Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL");
+ TEST_UTIL.getTestFileSystem().mkdirs(new Path(rootDir, ID_ONE));
builder.setClusterKey(KEY_ONE);
- builder.setRemoteWALDir(rootDir);
+ builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(),
+ TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString());
builder.setReplicateAllUserTables(false);
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName, new ArrayList<>());
@@ -1081,10 +1084,12 @@ public class TestReplicationAdmin {
// OK
}
- String rootDir = "hdfs://srv1:9999/hbase";
+ Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL");
+ TEST_UTIL.getTestFileSystem().mkdirs(new Path(rootDir, ID_SECOND));
builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_SECOND);
- builder.setRemoteWALDir(rootDir);
+ builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(),
+ TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString());
builder.setReplicateAllUserTables(false);
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName, new ArrayList<>());
@@ -1105,13 +1110,18 @@ public class TestReplicationAdmin {
assertEquals(SyncReplicationState.ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
- try {
- hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
- SyncReplicationState.STANDBY);
- fail("Can't transit cluster state from ACTIVE to STANDBY");
- } catch (Exception e) {
- // OK
- }
+ hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.STANDBY);
+ assertEquals(SyncReplicationState.STANDBY,
+ hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
+
+ hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
+ hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
+
+ hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
+ assertEquals(SyncReplicationState.ACTIVE,
+ hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
SyncReplicationState.DOWNGRADE_ACTIVE);
@@ -1121,7 +1131,6 @@ public class TestReplicationAdmin {
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.STANDBY);
assertEquals(SyncReplicationState.STANDBY,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
-
try {
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
fail("Can't transit cluster state from STANDBY to ACTIVE");
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java
index 17f24e8..c446306 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java
@@ -104,6 +104,7 @@ public class TestWALDurability {
FileSystem fs = FileSystem.get(conf);
Path rootDir = new Path(dir + getName());
CustomFSLog customFSLog = new CustomFSLog(fs, rootDir, getName(), conf);
+ customFSLog.init();
HRegion region = initHRegion(tableName, null, null, customFSLog);
byte[] bytes = Bytes.toBytes(getName());
Put put = new Put(bytes);
@@ -118,6 +119,7 @@ public class TestWALDurability {
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true");
fs = FileSystem.get(conf);
customFSLog = new CustomFSLog(fs, rootDir, getName(), conf);
+ customFSLog.init();
region = initHRegion(tableName, null, null, customFSLog);
customFSLog.resetSyncFlag();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
new file mode 100644
index 0000000..30dbdb5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+/**
+ * Base class for testing sync replication.
+ */
+public class SyncReplicationTestBase {
+
+ protected static final HBaseZKTestingUtility ZK_UTIL = new HBaseZKTestingUtility();
+
+ protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
+
+ protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
+
+ protected static TableName TABLE_NAME = TableName.valueOf("SyncRep");
+
+ protected static byte[] CF = Bytes.toBytes("cf");
+
+ protected static byte[] CQ = Bytes.toBytes("cq");
+
+ protected static String PEER_ID = "1";
+
+ private static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
+ util.setZkCluster(ZK_UTIL.getZkCluster());
+ Configuration conf = util.getConfiguration();
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent);
+ conf.setInt("replication.source.size.capacity", 102400);
+ conf.setLong("replication.source.sleepforretries", 100);
+ conf.setInt("hbase.regionserver.maxlogs", 10);
+ conf.setLong("hbase.master.logcleaner.ttl", 10);
+ conf.setInt("zookeeper.recovery.retry", 1);
+ conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+ conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+ conf.setInt("replication.stats.thread.period.seconds", 5);
+ conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+ conf.setLong("replication.sleep.before.failover", 2000);
+ conf.setInt("replication.source.maxretriesmultiplier", 10);
+ conf.setFloat("replication.source.ratio", 1.0f);
+ conf.setBoolean("replication.source.eof.autorecovery", true);
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ ZK_UTIL.startMiniZKCluster();
+ initTestingUtility(UTIL1, "/cluster1");
+ initTestingUtility(UTIL2, "/cluster2");
+ UTIL1.startMiniCluster(3);
+ UTIL2.startMiniCluster(3);
+ TableDescriptor td =
+ TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
+ .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
+ UTIL1.getAdmin().createTable(td);
+ UTIL2.getAdmin().createTable(td);
+ FileSystem fs1 = UTIL1.getTestFileSystem();
+ FileSystem fs2 = UTIL2.getTestFileSystem();
+ Path remoteWALDir1 =
+ new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
+ "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
+ Path remoteWALDir2 =
+ new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
+ "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
+ UTIL1.getAdmin().addReplicationPeer(PEER_ID,
+ ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
+ .setReplicateAllUserTables(false)
+ .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
+ .setRemoteWALDir(remoteWALDir2.toUri().toString()).build());
+ UTIL2.getAdmin().addReplicationPeer(PEER_ID,
+ ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey())
+ .setReplicateAllUserTables(false)
+ .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
+ .setRemoteWALDir(remoteWALDir1.toUri().toString()).build());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL1.shutdownMiniCluster();
+ UTIL2.shutdownMiniCluster();
+ ZK_UTIL.shutdownMiniZKCluster();
+ }
+
+ protected final void write(HBaseTestingUtility util, int start, int end) throws IOException {
+ try (Table table = util.getConnection().getTable(TABLE_NAME)) {
+ for (int i = start; i < end; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ }
+
+ protected final void verify(HBaseTestingUtility util, int start, int end) throws IOException {
+ try (Table table = util.getConnection().getTable(TABLE_NAME)) {
+ for (int i = start; i < end; i++) {
+ assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
+ }
+ }
+ }
+
+ protected final void verifyThroughRegion(HBaseTestingUtility util, int start, int end)
+ throws IOException {
+ HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+ for (int i = start; i < end; i++) {
+ assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
+ }
+ }
+
+ protected final void verifyNotReplicatedThroughRegion(HBaseTestingUtility util, int start,
+ int end) throws IOException {
+ HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+ for (int i = start; i < end; i++) {
+ assertTrue(region.get(new Get(Bytes.toBytes(i))).isEmpty());
+ }
+ }
+
+ protected final void waitUntilReplicationDone(HBaseTestingUtility util, int end)
+ throws Exception {
+ // The reject check is in RSRpcService so we can still read through HRegion
+ HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+ util.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return !region.get(new Get(Bytes.toBytes(end - 1))).isEmpty();
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Replication has not been catched up yet";
+ }
+ });
+ }
+
+ protected final void writeAndVerifyReplication(HBaseTestingUtility util1,
+ HBaseTestingUtility util2, int start, int end) throws Exception {
+ write(util1, start, end);
+ waitUntilReplicationDone(util2, end);
+ verifyThroughRegion(util2, start, end);
+ }
+
+ protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) {
+ Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
+ return new Path(remoteWALDir, PEER_ID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
deleted file mode 100644
index 288dcbf..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HBaseZKTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.master.MasterFileSystem;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
-
-@Category({ ReplicationTests.class, LargeTests.class })
-public class TestSyncReplication {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestSyncReplication.class);
-
- private static final HBaseZKTestingUtility ZK_UTIL = new HBaseZKTestingUtility();
-
- private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
-
- private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
-
- private static TableName TABLE_NAME = TableName.valueOf("SyncRep");
-
- private static byte[] CF = Bytes.toBytes("cf");
-
- private static byte[] CQ = Bytes.toBytes("cq");
-
- private static String PEER_ID = "1";
-
- private static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
- util.setZkCluster(ZK_UTIL.getZkCluster());
- Configuration conf = util.getConfiguration();
- conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent);
- conf.setInt("replication.source.size.capacity", 102400);
- conf.setLong("replication.source.sleepforretries", 100);
- conf.setInt("hbase.regionserver.maxlogs", 10);
- conf.setLong("hbase.master.logcleaner.ttl", 10);
- conf.setInt("zookeeper.recovery.retry", 1);
- conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
- conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
- conf.setInt("replication.stats.thread.period.seconds", 5);
- conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
- conf.setLong("replication.sleep.before.failover", 2000);
- conf.setInt("replication.source.maxretriesmultiplier", 10);
- conf.setFloat("replication.source.ratio", 1.0f);
- conf.setBoolean("replication.source.eof.autorecovery", true);
- }
-
- @BeforeClass
- public static void setUp() throws Exception {
- ZK_UTIL.startMiniZKCluster();
- initTestingUtility(UTIL1, "/cluster1");
- initTestingUtility(UTIL2, "/cluster2");
- UTIL1.startMiniCluster(3);
- UTIL2.startMiniCluster(3);
- TableDescriptor td =
- TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
- .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
- UTIL1.getAdmin().createTable(td);
- UTIL2.getAdmin().createTable(td);
- FileSystem fs1 = UTIL1.getTestFileSystem();
- FileSystem fs2 = UTIL2.getTestFileSystem();
- Path remoteWALDir1 =
- new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
- "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
- Path remoteWALDir2 =
- new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
- "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
- UTIL1.getAdmin().addReplicationPeer(PEER_ID,
- ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
- .setReplicateAllUserTables(false)
- .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
- .setRemoteWALDir(remoteWALDir2.toUri().toString()).build());
- UTIL2.getAdmin().addReplicationPeer(PEER_ID,
- ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey())
- .setReplicateAllUserTables(false)
- .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
- .setRemoteWALDir(remoteWALDir1.toUri().toString()).build());
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- UTIL1.shutdownMiniCluster();
- UTIL2.shutdownMiniCluster();
- ZK_UTIL.shutdownMiniZKCluster();
- }
-
- @FunctionalInterface
- private interface TableAction {
-
- void call(Table table) throws IOException;
- }
-
- private void assertDisallow(Table table, TableAction action) throws IOException {
- try {
- action.call(table);
- } catch (DoNotRetryIOException | RetriesExhaustedException e) {
- // expected
- assertThat(e.getMessage(), containsString("STANDBY"));
- }
- }
-
- @Test
- public void testStandby() throws Exception {
- MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
- Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
- Path remoteWALDirForPeer = new Path(remoteWALDir, PEER_ID);
- assertFalse(mfs.getWALFileSystem().exists(remoteWALDirForPeer));
- UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
- SyncReplicationState.STANDBY);
- assertTrue(mfs.getWALFileSystem().exists(remoteWALDirForPeer));
- try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
- assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row"))));
- assertDisallow(table,
- t -> t.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))));
- assertDisallow(table, t -> t.delete(new Delete(Bytes.toBytes("row"))));
- assertDisallow(table, t -> t.incrementColumnValue(Bytes.toBytes("row"), CF, CQ, 1));
- assertDisallow(table,
- t -> t.append(new Append(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))));
- assertDisallow(table,
- t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new Get(Bytes.toBytes("row1")))));
- assertDisallow(table,
- t -> t
- .put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")),
- new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, Bytes.toBytes("row1")))));
- assertDisallow(table, t -> t.mutateRow(new RowMutations(Bytes.toBytes("row"))
- .add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))));
- }
- // But we should still allow replication writes
- try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
- for (int i = 0; i < 100; i++) {
- table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
- }
- }
- // The reject check is in RSRpcService so we can still read through HRegion
- HRegion region = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
- UTIL2.waitFor(30000, new ExplainingPredicate<Exception>() {
-
- @Override
- public boolean evaluate() throws Exception {
- return !region.get(new Get(Bytes.toBytes(99))).isEmpty();
- }
-
- @Override
- public String explainFailure() throws Exception {
- return "Replication has not been catched up yet";
- }
- });
- for (int i = 0; i < 100; i++) {
- assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
new file mode 100644
index 0000000..f4fb5fe
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationActive extends SyncReplicationTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSyncReplicationActive.class);
+
+ @Test
+ public void testActive() throws Exception {
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.ACTIVE);
+ UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
+ write(UTIL1, 0, 100);
+ Thread.sleep(2000);
+ // peer is disabled so no data have been replicated
+ verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ // confirm that the data is there after we convert the peer to DA
+ verify(UTIL2, 0, 100);
+
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.ACTIVE);
+
+ writeAndVerifyReplication(UTIL2, UTIL1, 100, 200);
+
+ // shutdown the cluster completely
+ UTIL1.shutdownMiniCluster();
+ // confirm that we can convert to DA even if the remote slave cluster is down
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ write(UTIL2, 200, 300);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d1088b99/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
new file mode 100644
index 0000000..ed61d2a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationStandBy extends SyncReplicationTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSyncReplicationStandBy.class);
+
+ @FunctionalInterface
+ private interface TableAction {
+
+ void call(Table table) throws IOException;
+ }
+
+ private void assertDisallow(Table table, TableAction action) throws IOException {
+ try {
+ action.call(table);
+ } catch (DoNotRetryIOException | RetriesExhaustedException e) {
+ // expected
+ assertThat(e.getMessage(), containsString("STANDBY"));
+ }
+ }
+
+ @Test
+ public void testStandby() throws Exception {
+ MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
+ Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID);
+ assertFalse(mfs.getWALFileSystem().exists(remoteWALDir));
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ assertTrue(mfs.getWALFileSystem().exists(remoteWALDir));
+ try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
+ assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row"))));
+ assertDisallow(table,
+ t -> t.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))));
+ assertDisallow(table, t -> t.delete(new Delete(Bytes.toBytes("row"))));
+ assertDisallow(table, t -> t.incrementColumnValue(Bytes.toBytes("row"), CF, CQ, 1));
+ assertDisallow(table,
+ t -> t.append(new Append(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))));
+ assertDisallow(table,
+ t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new Get(Bytes.toBytes("row1")))));
+ assertDisallow(table,
+ t -> t
+ .put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")),
+ new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, Bytes.toBytes("row1")))));
+ assertDisallow(table, t -> t.mutateRow(new RowMutations(Bytes.toBytes("row"))
+ .add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))));
+ }
+ // We should still allow replication writes
+ writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
+ }
+}