You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/17 09:34:57 UTC
[17/36] hbase git commit: HBASE-19857 Complete the procedure for
adding a sync replication peer
HBASE-19857 Complete the procedure for adding a sync replication peer
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f453292c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f453292c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f453292c
Branch: refs/heads/HBASE-19064
Commit: f453292c21f6551f3eec1196c295c5bf78dd6c65
Parents: 6063da7
Author: zhangduo <zh...@apache.org>
Authored: Thu Jan 25 20:09:00 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu May 17 17:09:36 2018 +0800
----------------------------------------------------------------------
.../hbase/replication/ReplicationPeer.java | 9 +
.../hbase/replication/ReplicationPeerImpl.java | 28 +--
.../hbase/replication/ReplicationPeers.java | 3 +-
.../regionserver/PeerActionListener.java | 10 +-
.../SyncReplicationPeerProvider.java | 35 +++
.../SynchronousReplicationPeerProvider.java | 35 ---
.../hbase/wal/SyncReplicationWALProvider.java | 234 +++++++++++++++++++
.../wal/SynchronousReplicationWALProvider.java | 225 ------------------
.../org/apache/hadoop/hbase/wal/WALFactory.java | 8 +-
.../TestReplicationSourceManager.java | 3 +
.../wal/TestSyncReplicationWALProvider.java | 153 ++++++++++++
.../TestSynchronousReplicationWALProvider.java | 153 ------------
12 files changed, 456 insertions(+), 440 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f453292c/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index 2da3cce..0196a9a 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -54,6 +54,15 @@ public interface ReplicationPeer {
PeerState getPeerState();
/**
+ * Returns the sync replication state of the peer by reading local cache.
+ * <p>
+ * If the peer is not a synchronous replication peer, a {@link SyncReplicationState#NONE} will be
+ * returned.
+ * @return the sync replication state
+ */
+ SyncReplicationState getSyncReplicationState();
+
+ /**
* Test whether the peer is enabled.
* @return {@code true} if enabled, otherwise {@code false}.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/f453292c/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
index d656466..ff3f662 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
@@ -36,6 +36,8 @@ public class ReplicationPeerImpl implements ReplicationPeer {
private volatile PeerState peerState;
+ private volatile SyncReplicationState syncReplicationState;
+
private final List<ReplicationPeerConfigListener> peerConfigListeners;
/**
@@ -45,12 +47,13 @@ public class ReplicationPeerImpl implements ReplicationPeer {
* @param id string representation of this peer's identifier
* @param peerConfig configuration for the replication peer
*/
- public ReplicationPeerImpl(Configuration conf, String id, boolean peerState,
- ReplicationPeerConfig peerConfig) {
+ public ReplicationPeerImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig,
+ boolean peerState, SyncReplicationState syncReplicationState) {
this.conf = conf;
this.id = id;
this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED;
this.peerConfig = peerConfig;
+ this.syncReplicationState = syncReplicationState;
this.peerConfigListeners = new ArrayList<>();
}
@@ -77,37 +80,26 @@ public class ReplicationPeerImpl implements ReplicationPeer {
return peerState;
}
- /**
- * Get the peer config object
- * @return the ReplicationPeerConfig for this peer
- */
+ @Override
+ public SyncReplicationState getSyncReplicationState() {
+ return syncReplicationState;
+ }
+
@Override
public ReplicationPeerConfig getPeerConfig() {
return peerConfig;
}
- /**
- * Get the configuration object required to communicate with this peer
- * @return configuration object
- */
@Override
public Configuration getConfiguration() {
return conf;
}
- /**
- * Get replicable (table, cf-list) map of this peer
- * @return the replicable (table, cf-list) map
- */
@Override
public Map<TableName, List<String>> getTableCFs() {
return this.peerConfig.getTableCFsMap();
}
- /**
- * Get replicable namespace set of this peer
- * @return the replicable namespaces set
- */
@Override
public Set<String> getNamespaces() {
return this.peerConfig.getNamespaces();
http://git-wip-us.apache.org/repos/asf/hbase/blob/f453292c/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 4d602ca..a54f339 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -134,7 +134,8 @@ public class ReplicationPeers {
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
+ SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId);
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
- peerId, enabled, peerConfig);
+ peerId, peerConfig, enabled, syncReplicationState);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f453292c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
index 74ad626..6df2af9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
@@ -17,17 +17,19 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Get notification for replication peer events. Mainly used for telling the
- * {@link org.apache.hadoop.hbase.wal.SynchronousReplicationWALProvider} to close some WAL if not
- * used any more.
- * <p>
- * TODO: Also need a synchronous peer state change notification.
+ * {@link org.apache.hadoop.hbase.wal.SyncReplicationWALProvider} to close some WAL if not used any
+ * more.
*/
@InterfaceAudience.Private
public interface PeerActionListener {
default void peerRemoved(String peerId) {}
+
+ default void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
+ SyncReplicationState to) {}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f453292c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java
new file mode 100644
index 0000000..b97bf7e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.Optional;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Get the peer id and remote root dir if the region is synchronously replicated.
+ */
+@InterfaceAudience.Private
+public interface SyncReplicationPeerProvider {
+
+ /**
+ * Return the peer id and remote WAL directory if the region is synchronously replicated.
+ */
+ Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f453292c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java
deleted file mode 100644
index b4e04fb..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.util.Optional;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Get the peer id and remote root dir if the region is synchronously replicated.
- */
-@InterfaceAudience.Private
-public interface SynchronousReplicationPeerProvider {
-
- /**
- * Return the peer id and remote WAL directory if the region is synchronously replicated.
- */
- Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info);
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f453292c/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
new file mode 100644
index 0000000..bccc842
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
+import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDirectoryName;
+import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
+import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerProvider;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.KeyLocker;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+
+/**
+ * The special {@link WALProvider} for synchronous replication.
+ * <p>
+ * It works like an interceptor, when getting WAL, first it will check if the given region should be
+ * replicated synchronously, if so it will return a special WAL for it, otherwise it will delegate
+ * the request to the normal {@link WALProvider}.
+ */
+@InterfaceAudience.Private
+public class SyncReplicationWALProvider implements WALProvider, PeerActionListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class);
+
+ private static final String LOG_SUFFIX = ".syncrep";
+
+ private final WALProvider provider;
+
+ private final SyncReplicationPeerProvider peerProvider;
+
+ private WALFactory factory;
+
+ private Configuration conf;
+
+ private List<WALActionsListener> listeners = new ArrayList<>();
+
+ private EventLoopGroup eventLoopGroup;
+
+ private Class<? extends Channel> channelClass;
+
+ private AtomicBoolean initialized = new AtomicBoolean(false);
+
+ private final ConcurrentMap<String, DualAsyncFSWAL> peerId2WAL = new ConcurrentHashMap<>();
+
+ private final KeyLocker<String> createLock = new KeyLocker<>();
+
+ SyncReplicationWALProvider(WALProvider provider, SyncReplicationPeerProvider peerProvider) {
+ this.provider = provider;
+ this.peerProvider = peerProvider;
+ }
+
+ @Override
+ public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
+ if (!initialized.compareAndSet(false, true)) {
+ throw new IllegalStateException("WALProvider.init should only be called once.");
+ }
+ provider.init(factory, conf, providerId);
+ this.conf = conf;
+ this.factory = factory;
+ Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
+ NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
+ eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
+ channelClass = eventLoopGroupAndChannelClass.getSecond();
+ }
+
+ private String getLogPrefix(String peerId) {
+ return factory.factoryId + WAL_FILE_NAME_DELIMITER + peerId;
+ }
+
+ private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException {
+ Path remoteWALDirPath = new Path(remoteWALDir);
+ FileSystem remoteFs = remoteWALDirPath.getFileSystem(conf);
+ return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), remoteFs,
+ CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId),
+ getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
+ conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass);
+ }
+
+ private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException {
+ DualAsyncFSWAL wal = peerId2WAL.get(peerId);
+ if (wal != null) {
+ return wal;
+ }
+ Lock lock = createLock.acquireLock(peerId);
+ try {
+ wal = peerId2WAL.get(peerId);
+ if (wal == null) {
+ wal = createWAL(peerId, remoteWALDir);
+ peerId2WAL.put(peerId, wal);
+ wal.init();
+ }
+ return wal;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public WAL getWAL(RegionInfo region) throws IOException {
+ Optional<Pair<String, String>> peerIdAndRemoteWALDir =
+ peerProvider.getPeerIdAndRemoteWALDir(region);
+ if (peerIdAndRemoteWALDir.isPresent()) {
+ Pair<String, String> pair = peerIdAndRemoteWALDir.get();
+ return getWAL(pair.getFirst(), pair.getSecond());
+ } else {
+ return provider.getWAL(region);
+ }
+ }
+
+ private Stream<WAL> getWALStream() {
+ return Streams.concat(peerId2WAL.values().stream(), provider.getWALs().stream());
+ }
+
+ @Override
+ public List<WAL> getWALs() {
+ return getWALStream().collect(Collectors.toList());
+ }
+
+ @Override
+ public void shutdown() throws IOException {
+ // save the last exception and rethrow
+ IOException failure = null;
+ for (DualAsyncFSWAL wal : peerId2WAL.values()) {
+ try {
+ wal.shutdown();
+ } catch (IOException e) {
+ LOG.error("Shutdown WAL failed", e);
+ failure = e;
+ }
+ }
+ provider.shutdown();
+ if (failure != null) {
+ throw failure;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // save the last exception and rethrow
+ IOException failure = null;
+ for (DualAsyncFSWAL wal : peerId2WAL.values()) {
+ try {
+ wal.close();
+ } catch (IOException e) {
+ LOG.error("Close WAL failed", e);
+ failure = e;
+ }
+ }
+ provider.close();
+ if (failure != null) {
+ throw failure;
+ }
+ }
+
+ @Override
+ public long getNumLogFiles() {
+ return peerId2WAL.size() + provider.getNumLogFiles();
+ }
+
+ @Override
+ public long getLogFileSize() {
+ return peerId2WAL.values().stream().mapToLong(DualAsyncFSWAL::getLogFileSize).sum() +
+ provider.getLogFileSize();
+ }
+
+ private void safeClose(WAL wal) {
+ if (wal != null) {
+ try {
+ wal.close();
+ } catch (IOException e) {
+ LOG.error("Close WAL failed", e);
+ }
+ }
+ }
+
+ @Override
+ public void addWALActionsListener(WALActionsListener listener) {
+ listeners.add(listener);
+ provider.addWALActionsListener(listener);
+ }
+
+ @Override
+ public void peerRemoved(String peerId) {
+ safeClose(peerId2WAL.remove(peerId));
+ }
+
+ @Override
+ public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
+ SyncReplicationState to) {
+ assert to == SyncReplicationState.DOWNGRADE_ACTIVE;
+ safeClose(peerId2WAL.remove(peerId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f453292c/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java
deleted file mode 100644
index f60599f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.wal;
-
-import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
-import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDirectoryName;
-import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
-import org.apache.hadoop.hbase.replication.regionserver.SynchronousReplicationPeerProvider;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.util.KeyLocker;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
-import org.apache.hbase.thirdparty.io.netty.channel.Channel;
-import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
-
-/**
- * The special {@link WALProvider} for synchronous replication.
- * <p>
- * It works like an interceptor, when getting WAL, first it will check if the given region should be
- * replicated synchronously, if so it will return a special WAL for it, otherwise it will delegate
- * the request to the normal {@link WALProvider}.
- */
-@InterfaceAudience.Private
-public class SynchronousReplicationWALProvider implements WALProvider, PeerActionListener {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(SynchronousReplicationWALProvider.class);
-
- private static final String LOG_SUFFIX = ".syncrep";
-
- private final WALProvider provider;
-
- private final SynchronousReplicationPeerProvider peerProvider;
-
- private WALFactory factory;
-
- private Configuration conf;
-
- private List<WALActionsListener> listeners = new ArrayList<>();
-
- private EventLoopGroup eventLoopGroup;
-
- private Class<? extends Channel> channelClass;
-
- private AtomicBoolean initialized = new AtomicBoolean(false);
-
- private final ConcurrentMap<String, DualAsyncFSWAL> peerId2WAL = new ConcurrentHashMap<>();
-
- private final KeyLocker<String> createLock = new KeyLocker<>();
-
- SynchronousReplicationWALProvider(WALProvider provider,
- SynchronousReplicationPeerProvider peerProvider) {
- this.provider = provider;
- this.peerProvider = peerProvider;
- }
-
- @Override
- public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
- if (!initialized.compareAndSet(false, true)) {
- throw new IllegalStateException("WALProvider.init should only be called once.");
- }
- provider.init(factory, conf, providerId);
- this.conf = conf;
- this.factory = factory;
- Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
- NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
- eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
- channelClass = eventLoopGroupAndChannelClass.getSecond();
- }
-
- private String getLogPrefix(String peerId) {
- return factory.factoryId + WAL_FILE_NAME_DELIMITER + peerId;
- }
-
- private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException {
- Path remoteWALDirPath = new Path(remoteWALDir);
- FileSystem remoteFs = remoteWALDirPath.getFileSystem(conf);
- return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), remoteFs,
- CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId),
- getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
- conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass);
- }
-
- private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException {
- DualAsyncFSWAL wal = peerId2WAL.get(peerId);
- if (wal != null) {
- return wal;
- }
- Lock lock = createLock.acquireLock(peerId);
- try {
- wal = peerId2WAL.get(peerId);
- if (wal == null) {
- wal = createWAL(peerId, remoteWALDir);
- peerId2WAL.put(peerId, wal);
- wal.init();
- }
- return wal;
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public WAL getWAL(RegionInfo region) throws IOException {
- Optional<Pair<String, String>> peerIdAndRemoteWALDir =
- peerProvider.getPeerIdAndRemoteWALDir(region);
- if (peerIdAndRemoteWALDir.isPresent()) {
- Pair<String, String> pair = peerIdAndRemoteWALDir.get();
- return getWAL(pair.getFirst(), pair.getSecond());
- } else {
- return provider.getWAL(region);
- }
- }
-
- private Stream<WAL> getWALStream() {
- return Streams.concat(peerId2WAL.values().stream(), provider.getWALs().stream());
- }
-
- @Override
- public List<WAL> getWALs() {
- return getWALStream().collect(Collectors.toList());
- }
-
- @Override
- public void shutdown() throws IOException {
- // save the last exception and rethrow
- IOException failure = null;
- for (DualAsyncFSWAL wal : peerId2WAL.values()) {
- try {
- wal.shutdown();
- } catch (IOException e) {
- LOG.error("Shutdown WAL failed", e);
- failure = e;
- }
- }
- provider.shutdown();
- if (failure != null) {
- throw failure;
- }
- }
-
- @Override
- public void close() throws IOException {
- // save the last exception and rethrow
- IOException failure = null;
- for (DualAsyncFSWAL wal : peerId2WAL.values()) {
- try {
- wal.close();
- } catch (IOException e) {
- LOG.error("Close WAL failed", e);
- failure = e;
- }
- }
- provider.close();
- if (failure != null) {
- throw failure;
- }
- }
-
- @Override
- public long getNumLogFiles() {
- return peerId2WAL.size() + provider.getNumLogFiles();
- }
-
- @Override
- public long getLogFileSize() {
- return peerId2WAL.values().stream().mapToLong(DualAsyncFSWAL::getLogFileSize).sum() +
- provider.getLogFileSize();
- }
-
- @Override
- public void peerRemoved(String peerId) {
- WAL wal = peerId2WAL.remove(peerId);
- if (wal != null) {
- try {
- wal.close();
- } catch (IOException e) {
- LOG.error("Close WAL failed", e);
- }
- }
- }
-
- @Override
- public void addWALActionsListener(WALActionsListener listener) {
- listeners.add(listener);
- provider.addWALActionsListener(listener);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f453292c/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 0677cdc..70d7d3e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
-import org.apache.hadoop.hbase.replication.regionserver.SynchronousReplicationPeerProvider;
+import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerProvider;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@@ -186,7 +186,7 @@ public class WALFactory {
* Remove it once we can integrate the synchronous replication logic in RS.
*/
@VisibleForTesting
- WALFactory(Configuration conf, String factoryId, SynchronousReplicationPeerProvider peerProvider)
+ WALFactory(Configuration conf, String factoryId, SyncReplicationPeerProvider peerProvider)
throws IOException {
timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
/* TODO Both of these are probably specific to the fs wal provider */
@@ -195,9 +195,9 @@ public class WALFactory {
this.conf = conf;
this.factoryId = factoryId;
WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
- this.provider = new SynchronousReplicationWALProvider(provider, peerProvider);
- this.provider.addWALActionsListener(new MetricsWAL());
+ this.provider = new SyncReplicationWALProvider(provider, peerProvider);
this.provider.init(this, conf, null);
+ this.provider.addWALActionsListener(new MetricsWAL());
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/f453292c/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 8170893..04c7aad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -173,6 +173,9 @@ public abstract class TestReplicationSourceManager {
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
+ ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/sync-rep-state");
+ ZKUtil.setData(zkw, "/hbase/replication/peers/1/sync-rep-state",
+ Bytes.toBytes(SyncReplicationState.NONE.ordinal()));
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f453292c/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
new file mode 100644
index 0000000..60a9e13
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestSyncReplicationWALProvider {
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static String PEER_ID = "1";
+
+ private static String REMOTE_WAL_DIR = "/RemoteWAL";
+
+ private static TableName TABLE = TableName.valueOf("table");
+
+ private static TableName TABLE_NO_REP = TableName.valueOf("table-no-rep");
+
+ private static RegionInfo REGION = RegionInfoBuilder.newBuilder(TABLE).build();
+
+ private static RegionInfo REGION_NO_REP = RegionInfoBuilder.newBuilder(TABLE_NO_REP).build();
+
+ private static WALFactory FACTORY;
+
+ private static Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
+ if (info.getTable().equals(TABLE)) {
+ return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL.startMiniDFSCluster(3);
+ FACTORY = new WALFactory(UTIL.getConfiguration(), "test",
+ TestSyncReplicationWALProvider::getPeerIdAndRemoteWALDir);
+ UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID));
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws IOException {
+ FACTORY.close();
+ UTIL.shutdownMiniDFSCluster();
+ }
+
+ private void testReadWrite(DualAsyncFSWAL wal) throws Exception {
+ int recordCount = 100;
+ int columnCount = 10;
+ byte[] row = Bytes.toBytes("testRow");
+ long timestamp = System.currentTimeMillis();
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+ ProtobufLogTestHelper.doWrite(wal, REGION, TABLE, columnCount, recordCount, row, timestamp,
+ mvcc);
+ Path localFile = wal.getCurrentFileName();
+ Path remoteFile = new Path(REMOTE_WAL_DIR + "/" + PEER_ID, localFile.getName());
+ try (ProtobufLogReader reader =
+ (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) {
+ ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row,
+ timestamp);
+ }
+ try (ProtobufLogReader reader =
+ (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) {
+ ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row,
+ timestamp);
+ }
+ wal.rollWriter();
+ DistributedFileSystem dfs = (DistributedFileSystem) UTIL.getDFSCluster().getFileSystem();
+ UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return dfs.isFileClosed(localFile) && dfs.isFileClosed(remoteFile);
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ StringBuilder sb = new StringBuilder();
+ if (!dfs.isFileClosed(localFile)) {
+ sb.append(localFile + " has not been closed yet.");
+ }
+ if (!dfs.isFileClosed(remoteFile)) {
+ sb.append(remoteFile + " has not been closed yet.");
+ }
+ return sb.toString();
+ }
+ });
+ try (ProtobufLogReader reader =
+ (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) {
+ ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row,
+ timestamp);
+ }
+ try (ProtobufLogReader reader =
+ (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) {
+ ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row,
+ timestamp);
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ WAL walNoRep = FACTORY.getWAL(REGION_NO_REP);
+ assertThat(walNoRep, not(instanceOf(DualAsyncFSWAL.class)));
+ DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION);
+ assertEquals(2, FACTORY.getWALs().size());
+ testReadWrite(wal);
+ SyncReplicationWALProvider walProvider =
+ (SyncReplicationWALProvider) FACTORY.getWALProvider();
+ walProvider.peerRemoved(PEER_ID);
+ assertEquals(1, FACTORY.getWALs().size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f453292c/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java
deleted file mode 100644
index e6031c6..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.wal;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.not;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import java.io.IOException;
-import java.util.Optional;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ RegionServerTests.class, MediumTests.class })
-public class TestSynchronousReplicationWALProvider {
-
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
- private static String PEER_ID = "1";
-
- private static String REMOTE_WAL_DIR = "/RemoteWAL";
-
- private static TableName TABLE = TableName.valueOf("table");
-
- private static TableName TABLE_NO_REP = TableName.valueOf("table-no-rep");
-
- private static RegionInfo REGION = RegionInfoBuilder.newBuilder(TABLE).build();
-
- private static RegionInfo REGION_NO_REP = RegionInfoBuilder.newBuilder(TABLE_NO_REP).build();
-
- private static WALFactory FACTORY;
-
- private static Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
- if (info.getTable().equals(TABLE)) {
- return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR));
- } else {
- return Optional.empty();
- }
- }
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- UTIL.startMiniDFSCluster(3);
- FACTORY = new WALFactory(UTIL.getConfiguration(), "test",
- TestSynchronousReplicationWALProvider::getPeerIdAndRemoteWALDir);
- UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID));
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws IOException {
- FACTORY.close();
- UTIL.shutdownMiniDFSCluster();
- }
-
- private void testReadWrite(DualAsyncFSWAL wal) throws Exception {
- int recordCount = 100;
- int columnCount = 10;
- byte[] row = Bytes.toBytes("testRow");
- long timestamp = System.currentTimeMillis();
- MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
- ProtobufLogTestHelper.doWrite(wal, REGION, TABLE, columnCount, recordCount, row, timestamp,
- mvcc);
- Path localFile = wal.getCurrentFileName();
- Path remoteFile = new Path(REMOTE_WAL_DIR + "/" + PEER_ID, localFile.getName());
- try (ProtobufLogReader reader =
- (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) {
- ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row,
- timestamp);
- }
- try (ProtobufLogReader reader =
- (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) {
- ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row,
- timestamp);
- }
- wal.rollWriter();
- DistributedFileSystem dfs = (DistributedFileSystem) UTIL.getDFSCluster().getFileSystem();
- UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
-
- @Override
- public boolean evaluate() throws Exception {
- return dfs.isFileClosed(localFile) && dfs.isFileClosed(remoteFile);
- }
-
- @Override
- public String explainFailure() throws Exception {
- StringBuilder sb = new StringBuilder();
- if (!dfs.isFileClosed(localFile)) {
- sb.append(localFile + " has not been closed yet.");
- }
- if (!dfs.isFileClosed(remoteFile)) {
- sb.append(remoteFile + " has not been closed yet.");
- }
- return sb.toString();
- }
- });
- try (ProtobufLogReader reader =
- (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) {
- ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row,
- timestamp);
- }
- try (ProtobufLogReader reader =
- (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) {
- ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row,
- timestamp);
- }
- }
-
- @Test
- public void test() throws Exception {
- WAL walNoRep = FACTORY.getWAL(REGION_NO_REP);
- assertThat(walNoRep, not(instanceOf(DualAsyncFSWAL.class)));
- DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION);
- assertEquals(2, FACTORY.getWALs().size());
- testReadWrite(wal);
- SynchronousReplicationWALProvider walProvider =
- (SynchronousReplicationWALProvider) FACTORY.getWALProvider();
- walProvider.peerRemoved(PEER_ID);
- assertEquals(1, FACTORY.getWALs().size());
- }
-}