You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2018/12/13 22:14:31 UTC
[1/2] hbase git commit: HBASE-21246 Introduce WALIdentity to identify
WALs instead of a Path
Repository: hbase
Updated Branches:
refs/heads/HBASE-20952 ebfc04d85 -> c738e1575
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
index 1da31da..e82ccc2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
@@ -24,13 +24,13 @@ import java.util.Arrays;
import java.util.List;
import java.util.Optional;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,13 +148,13 @@ public class SampleRegionWALCoprocessor implements WALCoprocessor, RegionCoproce
@Override
public void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
- Path oldPath, Path newPath) throws IOException {
+ WALIdentity oldWalId, WALIdentity newWalId) throws IOException {
preWALRollCalled = true;
}
@Override
public void postWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
- Path oldPath, Path newPath) throws IOException {
+ WALIdentity oldWalId, WALIdentity newWalId) throws IOException {
postWALRollCalled = true;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java
index ad2b2d4..cd96586 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -155,7 +156,8 @@ public class TestBlockReorderMultiBlocks {
// listen for successful log rolls
final WALActionsListener listener = new WALActionsListener() {
@Override
- public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+ public void postLogRoll(final WALIdentity oldWalId, final WALIdentity newWalId)
+ throws IOException {
latch.countDown();
}
};
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java
index 9322c5e..8dcca93 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse;
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
@@ -30,6 +29,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -130,12 +130,12 @@ public abstract class AbstractTestLogRollPeriod {
private void checkMinLogRolls(final WAL log, final int minRolls)
throws Exception {
- final List<Path> paths = new ArrayList<>();
+ final List<WALIdentity> walIds = new ArrayList<WALIdentity>();
log.registerWALActionsListener(new WALActionsListener() {
@Override
- public void postLogRoll(Path oldFile, Path newFile) {
- LOG.debug("postLogRoll: oldFile="+oldFile+" newFile="+newFile);
- paths.add(newFile);
+ public void postLogRoll(WALIdentity oldWalId, WALIdentity newWalId) {
+ LOG.debug("postLogRoll: oldWalId="+oldWalId+" newWalId="+newWalId);
+ walIds.add(newWalId);
}
});
@@ -144,13 +144,13 @@ public abstract class AbstractTestLogRollPeriod {
Thread.sleep((minRolls + 1) * LOG_ROLL_PERIOD);
// Do some extra sleep in case the machine is slow,
// and the log-roll is not triggered exactly on LOG_ROLL_PERIOD.
- final int NUM_RETRIES = 1 + 8 * (minRolls - paths.size());
- for (int retry = 0; paths.size() < minRolls && retry < NUM_RETRIES; ++retry) {
+ final int NUM_RETRIES = 1 + 8 * (minRolls - walIds.size());
+ for (int retry = 0; walIds.size() < minRolls && retry < NUM_RETRIES; ++retry) {
Thread.sleep(LOG_ROLL_PERIOD / 4);
}
wtime = System.currentTimeMillis() - wtime;
LOG.info(String.format("got %d rolls after %dms (%dms each) - expected at least %d rolls",
- paths.size(), wtime, wtime / paths.size(), minRolls));
- assertFalse(paths.size() < minRolls);
+ walIds.size(), wtime, wtime / walIds.size(), minRolls));
+ assertFalse(walIds.size() < minRolls);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
index e19361e..8e7cacf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
@@ -52,8 +52,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.BeforeClass;
@@ -251,21 +253,21 @@ public class TestLogRolling extends AbstractTestLogRolling {
server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
final WAL log = server.getWAL(region);
- final List<Path> paths = new ArrayList<>(1);
+ final List<WALIdentity> walIds = new ArrayList<>(1);
final List<Integer> preLogRolledCalled = new ArrayList<>();
- paths.add(AbstractFSWALProvider.getCurrentFileName(log));
+ walIds.add(new FSWALIdentity(AbstractFSWALProvider.getCurrentFileName(log)));
log.registerWALActionsListener(new WALActionsListener() {
@Override
- public void preLogRoll(Path oldFile, Path newFile) {
+ public void preLogRoll(WALIdentity oldFile, WALIdentity newFile) {
LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile);
preLogRolledCalled.add(new Integer(1));
}
@Override
- public void postLogRoll(Path oldFile, Path newFile) {
- paths.add(newFile);
+ public void postLogRoll(WALIdentity oldFile, WALIdentity newFile) {
+ walIds.add(newFile);
}
});
@@ -315,7 +317,8 @@ public class TestLogRolling extends AbstractTestLogRolling {
// read back the data written
Set<String> loggedRows = new HashSet<>();
FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration());
- for (Path p : paths) {
+ for (WALIdentity walId : walIds) {
+ Path p = ((FSWALIdentity) walId).getPath();
LOG.debug("recovering lease for " + p);
fsUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, TEST_UTIL.getConfiguration(),
null);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
index 0967a75..880dea7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.After;
import org.junit.Before;
@@ -142,12 +143,12 @@ public class TestWALActionsListener {
public int closedCount = 0;
@Override
- public void preLogRoll(Path oldFile, Path newFile) {
+ public void preLogRoll(WALIdentity oldWalId, WALIdentity newWalId) {
preLogRollCounter++;
}
@Override
- public void postLogRoll(Path oldFile, Path newFile) {
+ public void postLogRoll(WALIdentity oldWalId, WALIdentity newWalId) {
postLogRollCounter++;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 67f793d..ed71e6e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALIdentity;
/**
* Source that does nothing at all, helpful to test ReplicationSourceManager
@@ -42,7 +44,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
private ReplicationSourceManager manager;
private ReplicationPeer replicationPeer;
private String peerClusterId;
- private Path currentPath;
+ private WALIdentity currentWalId;
private MetricsSource metrics;
private WALFileLengthProvider walFileLengthProvider;
private AtomicBoolean startup = new AtomicBoolean(false);
@@ -60,14 +62,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
}
@Override
- public void enqueueLog(Path log) {
- this.currentPath = log;
+ public void enqueueLog(WALIdentity log) {
+ this.currentWalId = log;
metrics.incrSizeOfLogQueue();
}
@Override
- public Path getCurrentPath() {
- return this.currentPath;
+ public WALIdentity getCurrentWALIdentity() {
+ return this.currentWalId;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 37ca7dc..c04fe0e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -695,7 +696,8 @@ public class TestMasterReplication {
// listen for successful log rolls
final WALActionsListener listener = new WALActionsListener() {
@Override
- public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+ public void postLogRoll(final WALIdentity oldWalId, final WALIdentity newWalId)
+ throws IOException {
latch.countDown();
}
};
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
index 225ca7f..6247714 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.Admin;
@@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.BeforeClass;
@@ -219,7 +219,8 @@ public class TestMultiSlaveReplication {
// listen for successful log rolls
final WALActionsListener listener = new WALActionsListener() {
@Override
- public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+ public void postLogRoll(final WALIdentity oldWalId, final WALIdentity newWalId)
+ throws IOException {
latch.countDown();
}
};
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
index 4effe41..0cfd833 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
@@ -32,7 +32,10 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterfa
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -64,12 +67,13 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
+ WALIdentity walId = new FSWALIdentity(currentFile);
Replication replicationService = (Replication) utility1.getHBaseCluster()
.getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
.getSources()) {
ReplicationSource source = (ReplicationSource) rsi;
- if (!currentFile.equals(source.getCurrentPath())) {
+ if (!walId.equals(source.getCurrentWALIdentity())) {
return false;
}
}
@@ -97,6 +101,7 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
emptyWalPaths.add(emptyWalPath);
}
+ WALFactory factory = new WALFactory(conf1, "empty-wal-recovery");
// inject our empty wal into the replication queue, and then roll the original wal, which
// enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
// determine if the file being replicated currently is still opened for write, so just inject a
@@ -104,8 +109,10 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
for (int i = 0; i < numRs; i++) {
HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
Replication replicationService = (Replication) hrs.getReplicationSourceService();
- replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i));
- replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i));
+ WALIdentity id = new FSWALIdentity(
+ emptyWalPaths.get(i));
+ replicationService.getReplicationManager().preLogRoll(id);
+ replicationService.getReplicationManager().postLogRoll(id);
RegionInfo regionInfo =
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
index 8ff4d84..fae1f61 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication;
import java.util.Map;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Get;
@@ -28,6 +27,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
@@ -74,9 +74,9 @@ public class TestReplicationMetricsforUI extends TestReplicationBase {
}
rs = utility1.getRSForFirstRegionInTable(tableName);
metrics = rs.getWalGroupsReplicationStatus();
- Path lastPath = null;
+ WALIdentity lastPath = null;
for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
- lastPath = metric.getValue().getCurrentPath();
+ lastPath = metric.getValue().getCurrentWalId();
Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
Assert.assertTrue("age of Last Shipped Op should be > 0 ",
metric.getValue().getAgeOfLastShippedOp() > 0);
@@ -100,7 +100,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase {
Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
Assert.assertTrue("current position should < last position",
metric.getValue().getCurrentPosition() < lastPosition);
- Assert.assertNotEquals("current path", lastPath, metric.getValue().getCurrentPath());
+ Assert.assertNotEquals("current path", lastPath, metric.getValue().getCurrentWalId());
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
index d01a0ac..b49646d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 274ccab..2320205 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -57,9 +57,11 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.AfterClass;
@@ -301,8 +303,8 @@ public class TestReplicationSource {
String walGroupId = "fake-wal-group-id";
ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
- PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
- queue.put(new Path("/www/html/test"));
+ PriorityBlockingQueue<WALIdentity> queue = new PriorityBlockingQueue<>();
+ queue.put(new FSWALIdentity(new Path("/www/html/test")));
RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
Server server = Mockito.mock(Server.class);
Mockito.when(server.getServerName()).thenReturn(serverName);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 86bbb09..def737e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -84,9 +84,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -336,7 +338,7 @@ public abstract class TestReplicationSourceManager {
when(source.isRecovered()).thenReturn(false);
when(source.isSyncReplication()).thenReturn(false);
manager.logPositionAndCleanOldLogs(source,
- new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
+ new WALEntryBatch(0, manager.getSources().get(0).getCurrentWALIdentity()));
wal.append(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
@@ -551,7 +553,7 @@ public abstract class TestReplicationSourceManager {
.map(HRegionServer::getReplicationSourceService)
.map(r -> (Replication)r)
.map(Replication::getReplicationManager)
- .mapToLong(ReplicationSourceManager::getSizeOfLatestPath)
+ .mapToLong(ReplicationSourceManager::getSizeOfLatestWalId)
.sum();
}
@@ -571,7 +573,7 @@ public abstract class TestReplicationSourceManager {
assertNotNull(source);
final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
// Enqueue log and check if metrics updated
- source.enqueueLog(new Path("abc"));
+ source.enqueueLog(new FSWALIdentity((new Path("abc")).toString()));
assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());
@@ -618,8 +620,9 @@ public abstract class TestReplicationSourceManager {
String walNameNotExists =
"remoteWAL-12345-" + slaveId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX;
Path wal = new Path(logDir, walNameNotExists);
- manager.preLogRoll(wal);
- manager.postLogRoll(wal);
+ WALIdentity walId = new FSWALIdentity(wal);
+ manager.preLogRoll(walId);
+ manager.postLogRoll(walId);
Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
fs.mkdirs(remoteLogDirForPeer);
@@ -629,8 +632,9 @@ public abstract class TestReplicationSourceManager {
new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
fs.create(remoteWAL).close();
wal = new Path(logDir, walName);
- manager.preLogRoll(wal);
- manager.postLogRoll(wal);
+ walId = new FSWALIdentity(wal);
+ manager.preLogRoll(walId);
+ manager.postLogRoll(walId);
ReplicationSourceInterface source = mockReplicationSource(peerId2);
manager.cleanOldLogs(walName, true, source);
@@ -648,13 +652,13 @@ public abstract class TestReplicationSourceManager {
@Test
public void testSameWALPrefix() throws IOException {
Set<String> latestWalsBefore =
- manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet());
+ manager.getLastestWalIds().stream().map(WALIdentity::getName).collect(Collectors.toSet());
String walName1 = "localhost,8080,12345-45678-Peer.34567";
String walName2 = "localhost,8080,12345.56789";
- manager.preLogRoll(new Path(walName1));
- manager.preLogRoll(new Path(walName2));
+ manager.preLogRoll(new FSWALIdentity((new Path(walName1)).toString()));
+ manager.preLogRoll(new FSWALIdentity((new Path(walName2)).toString()));
- Set<String> latestWals = manager.getLastestPath().stream().map(Path::getName)
+ Set<String> latestWals = manager.getLastestWalIds().stream().map(WALIdentity::getName)
.filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet());
assertEquals(2, latestWals.size());
assertTrue(latestWals.contains(walName1));
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index fac6f74..d22f96a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -56,10 +55,12 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
@@ -98,7 +99,7 @@ public class TestWALEntryStream {
}
private WAL log;
- PriorityBlockingQueue<Path> walQueue;
+ PriorityBlockingQueue<WALIdentity> walQueue;
private PathWatcher pathWatcher;
@Rule
@@ -369,7 +370,7 @@ public class TestWALEntryStream {
}
// start up a reader
- Path walPath = walQueue.peek();
+ WALIdentity walId = walQueue.peek();
ReplicationSourceWALReader reader = createReader(false, CONF);
WALEntryBatch entryBatch = reader.take();
@@ -377,7 +378,7 @@ public class TestWALEntryStream {
assertNotNull(entryBatch);
assertEquals(3, entryBatch.getWalEntries().size());
assertEquals(position, entryBatch.getLastWalPosition());
- assertEquals(walPath, entryBatch.getLastWalPath());
+ assertEquals(walId, entryBatch.getLastWalId());
assertEquals(3, entryBatch.getNbRowKeys());
appendToLog("foo");
@@ -389,7 +390,7 @@ public class TestWALEntryStream {
@Test
public void testReplicationSourceWALReaderRecovered() throws Exception {
appendEntriesToLogAndSync(10);
- Path walPath = walQueue.peek();
+ WALIdentity walId = walQueue.peek();
log.rollWriter();
appendEntriesToLogAndSync(5);
log.shutdown();
@@ -400,18 +401,18 @@ public class TestWALEntryStream {
ReplicationSourceWALReader reader = createReader(true, conf);
WALEntryBatch batch = reader.take();
- assertEquals(walPath, batch.getLastWalPath());
+ assertEquals(walId, batch.getLastWalId());
assertEquals(10, batch.getNbEntries());
assertFalse(batch.isEndOfFile());
batch = reader.take();
- assertEquals(walPath, batch.getLastWalPath());
+ assertEquals(walId, batch.getLastWalId());
assertEquals(0, batch.getNbEntries());
assertTrue(batch.isEndOfFile());
- walPath = walQueue.peek();
+ walId = walQueue.peek();
batch = reader.take();
- assertEquals(walPath, batch.getLastWalPath());
+ assertEquals(walId, batch.getLastWalId());
assertEquals(5, batch.getNbEntries());
assertTrue(batch.isEndOfFile());
@@ -422,49 +423,49 @@ public class TestWALEntryStream {
@Test
public void testReplicationSourceWALReaderWrongPosition() throws Exception {
appendEntriesToLogAndSync(1);
- Path walPath = walQueue.peek();
+ FSWALIdentity walId = (FSWALIdentity)walQueue.peek();
log.rollWriter();
appendEntriesToLogAndSync(20);
TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
- return fs.getFileStatus(walPath).getLen() > 0;
+ return fs.getFileStatus(walId.getPath()).getLen() > 0;
}
@Override
public String explainFailure() throws Exception {
- return walPath + " has not been closed yet";
+ return walId + " has not been closed yet";
}
});
- long walLength = fs.getFileStatus(walPath).getLen();
+ long walLength = fs.getFileStatus(walId.getPath()).getLen();
ReplicationSourceWALReader reader = createReader(false, CONF);
WALEntryBatch entryBatch = reader.take();
- assertEquals(walPath, entryBatch.getLastWalPath());
+ assertEquals(walId, entryBatch.getLastWalId());
assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " +
walLength, entryBatch.getLastWalPosition() <= walLength);
assertEquals(1, entryBatch.getNbEntries());
assertTrue(entryBatch.isEndOfFile());
- Path walPath2 = walQueue.peek();
+ WALIdentity walId2 = walQueue.peek();
entryBatch = reader.take();
- assertEquals(walPath2, entryBatch.getLastWalPath());
+ assertEquals(walId2, entryBatch.getLastWalId());
assertEquals(20, entryBatch.getNbEntries());
assertFalse(entryBatch.isEndOfFile());
log.rollWriter();
appendEntriesToLogAndSync(10);
entryBatch = reader.take();
- assertEquals(walPath2, entryBatch.getLastWalPath());
+ assertEquals(walId2, entryBatch.getLastWalId());
assertEquals(0, entryBatch.getNbEntries());
assertTrue(entryBatch.isEndOfFile());
- Path walPath3 = walQueue.peek();
+ WALIdentity walId3 = walQueue.peek();
entryBatch = reader.take();
- assertEquals(walPath3, entryBatch.getLastWalPath());
+ assertEquals(walId3, entryBatch.getLastWalId());
assertEquals(10, entryBatch.getNbEntries());
assertFalse(entryBatch.isEndOfFile());
}
@@ -484,7 +485,7 @@ public class TestWALEntryStream {
}
// start up a reader
- Path walPath = walQueue.peek();
+ WALIdentity walId = walQueue.peek();
ReplicationSource source = mockReplicationSource(false, CONF);
AtomicInteger invokeCount = new AtomicInteger(0);
AtomicBoolean enabled = new AtomicBoolean(false);
@@ -511,7 +512,7 @@ public class TestWALEntryStream {
assertNotNull(entryBatch);
assertEquals(3, entryBatch.getWalEntries().size());
assertEquals(position, entryBatch.getLastWalPosition());
- assertEquals(walPath, entryBatch.getLastWalPath());
+ assertEquals(walId, entryBatch.getLastWalId());
assertEquals(3, entryBatch.getNbRowKeys());
}
@@ -577,12 +578,12 @@ public class TestWALEntryStream {
class PathWatcher implements WALActionsListener {
- Path currentPath;
+ WALIdentity currentWalId;
@Override
- public void preLogRoll(Path oldPath, Path newPath) throws IOException {
- walQueue.add(newPath);
- currentPath = newPath;
+ public void preLogRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {
+ walQueue.add(newWalId);
+ currentWalId = newWalId;
}
}
[2/2] hbase git commit: HBASE-21246 Introduce WALIdentity to identify
WALs instead of a Path
Posted by el...@apache.org.
HBASE-21246 Introduce WALIdentity to identify WALs instead of a Path
Builds on top of tyu's original idea.
Signed-off-by: Josh Elser <el...@apache.org>
Signed-off-by: Reid Chan <re...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c738e157
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c738e157
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c738e157
Branch: refs/heads/HBASE-20952
Commit: c738e1575f37000c6feff00362399312740b3c74
Parents: ebfc04d
Author: Ankit Singhal <an...@gmail.com>
Authored: Thu Dec 13 16:59:23 2018 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Dec 13 17:10:28 2018 -0500
----------------------------------------------------------------------
.../apache/hadoop/hbase/wal/FSWALIdentity.java | 78 ++++++++++++++++
.../apache/hadoop/hbase/wal/WALIdentity.java | 39 ++++++++
.../regionserver/ReplicationStatusTmpl.jamon | 4 +-
.../hadoop/hbase/coprocessor/WALObserver.java | 14 +--
.../hbase/regionserver/wal/AbstractFSWAL.java | 24 ++---
.../regionserver/wal/WALActionsListener.java | 26 +++---
.../regionserver/wal/WALCoprocessorHost.java | 18 ++--
.../RecoveredReplicationSource.java | 40 +++++----
.../RecoveredReplicationSourceShipper.java | 7 +-
.../regionserver/ReplicationSource.java | 44 ++++-----
.../ReplicationSourceInterface.java | 5 +-
.../regionserver/ReplicationSourceManager.java | 64 +++++++-------
.../regionserver/ReplicationSourceShipper.java | 18 ++--
.../ReplicationSourceWALActionListener.java | 10 +--
.../ReplicationSourceWALReader.java | 39 ++++----
.../regionserver/ReplicationStatus.java | 16 ++--
.../SerialReplicationSourceWALReader.java | 20 ++---
.../replication/regionserver/WALEntryBatch.java | 22 ++---
.../regionserver/WALEntryStream.java | 93 +++++++++++---------
.../regionserver/WALFileLengthProvider.java | 4 +-
.../hadoop/hbase/wal/DisabledWALProvider.java | 36 ++++++--
.../apache/hadoop/hbase/wal/FSHLogProvider.java | 2 +-
.../coprocessor/SampleRegionWALCoprocessor.java | 6 +-
.../hbase/fs/TestBlockReorderMultiBlocks.java | 4 +-
.../wal/AbstractTestLogRollPeriod.java | 18 ++--
.../hbase/regionserver/wal/TestLogRolling.java | 15 ++--
.../wal/TestWALActionsListener.java | 5 +-
.../replication/ReplicationSourceDummy.java | 12 +--
.../replication/TestMasterReplication.java | 4 +-
.../replication/TestMultiSlaveReplication.java | 5 +-
.../TestReplicationEmptyWALRecovery.java | 13 ++-
.../TestReplicationMetricsforUI.java | 8 +-
.../master/TestRecoverStandbyProcedure.java | 1 +
.../regionserver/TestReplicationSource.java | 6 +-
.../TestReplicationSourceManager.java | 26 +++---
.../regionserver/TestWALEntryStream.java | 51 +++++------
36 files changed, 490 insertions(+), 307 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java
new file mode 100644
index 0000000..d12a1cf
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/*
+ * This is distributed FS oriented implementation for WALIdentity
+ */
+@InterfaceAudience.Private
+public class FSWALIdentity implements WALIdentity{
+ private String name;
+ private Path path;
+
+ public FSWALIdentity(String name) {
+ this.path = new Path(name);
+ this.name = path.getName();
+ }
+
+ public FSWALIdentity(Path path) {
+ this.path = path;
+ if (path !=null) {
+ this.name = path.getName();
+ }
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * @return {@link Path} object of the name encapsulated in WALIdentity
+ */
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ public int compareTo(WALIdentity o) {
+ FSWALIdentity that = (FSWALIdentity)o;
+ return this.path.compareTo(that.getPath());
+ }
+
+ @Override
+ public String toString() {
+ return this.path.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof FSWALIdentity)) {
+ return false;
+ }
+ FSWALIdentity that = (FSWALIdentity) obj;
+ return this.path.equals(that.getPath());
+ }
+ @Override
+ public int hashCode() {
+ return this.path.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java
new file mode 100644
index 0000000..fa7d2fa
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This interface defines the identification of WAL for both stream based and distributed FileSystem
+ * based environments.
+ * See {@link #getName()} method.
+ */
+@InterfaceAudience.Private
+public interface WALIdentity extends Comparable<WALIdentity> {
+
+ /**
+ * WALIdentity is uniquely identifying a WAL stored in this WALProvider.
+ * This name can be thought of as a human-readable, serialized form of the WALIdentity.
+ *
+ * The same value should be returned across calls to this method.
+ *
+ * @return name of the wal
+ */
+ String getName();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
index 7dc1c7f..e1aceea 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
@@ -71,7 +71,7 @@
<tr>
<td><% entry.getValue().getPeerId() %></td>
<td><% entry.getValue().getWalGroup() %></td>
- <td><% entry.getValue().getCurrentPath() %> </td>
+ <td><% entry.getValue().getCurrentWalId() %> </td>
<td><% StringUtils.humanSize(entry.getValue().getFileSize()) %></td>
<td><% entry.getValue().getQueueSize() %></td>
<td><% StringUtils.humanSize(entry.getValue().getCurrentPosition()) %></td>
@@ -96,7 +96,7 @@
<tr>
<td><% entry.getValue().getPeerId() %></td>
<td><% entry.getValue().getWalGroup() %></td>
- <td><% entry.getValue().getCurrentPath() %> </td>
+ <td><% entry.getValue().getCurrentWalId() %> </td>
<td><% StringUtils.humanTimeDiff(entry.getValue().getAgeOfLastShippedOp()) %></td>
<td><% entry.getValue().getReplicationDelay() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(entry.getValue().getReplicationDelay()) %></td>
</tr>
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
index b2fa7ca..436f3c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
@@ -21,10 +21,10 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -92,18 +92,18 @@ public interface WALObserver {
/**
* Called before rolling the current WAL
- * @param oldPath the path of the current wal that we are replacing
- * @param newPath the path of the wal we are going to create
+ * @param oldPath the identity of the current wal that we are replacing
+ * @param newPath the identity of the wal we are going to create
*/
default void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
- Path oldPath, Path newPath) throws IOException {}
+ WALIdentity oldPath, WALIdentity newPath) throws IOException {}
/**
* Called after rolling the current WAL
- * @param oldPath the path of the wal that we replaced
- * @param newPath the path of the wal we have created and now is the current
+ * @param oldPath the identity of the wal that we replaced
+ * @param newPath the identity of the wal we have created and now is the current
*/
default void postWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
- Path oldPath, Path newPath) throws IOException {}
+ WALIdentity oldPath, WALIdentity newPath) throws IOException {}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 7915ac3..ab58b67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -65,9 +65,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
@@ -544,11 +546,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
*/
private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
throws IOException {
- coprocessorHost.preWALRoll(oldPath, newPath);
+ coprocessorHost.preWALRoll(new FSWALIdentity(oldPath), new FSWALIdentity(newPath));
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
- i.preLogRoll(oldPath, newPath);
+ i.preLogRoll(new FSWALIdentity(oldPath), new FSWALIdentity(newPath));
}
}
}
@@ -560,11 +562,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
throws IOException {
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
- i.postLogRoll(oldPath, newPath);
+ i.postLogRoll(new FSWALIdentity(oldPath), new FSWALIdentity(newPath));
}
}
- coprocessorHost.postWALRoll(oldPath, newPath);
+ coprocessorHost.postWALRoll(new FSWALIdentity(oldPath), new FSWALIdentity(newPath));
}
// public only until class moves to o.a.h.h.wal
@@ -650,7 +652,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// Tell our listeners that a log is going to be archived.
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
- i.preLogArchive(p, newPath);
+ i.preLogArchive(new FSWALIdentity(p), new FSWALIdentity(newPath));
}
}
LOG.info("Archiving " + p + " to " + newPath);
@@ -660,7 +662,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// Tell our listeners that a log has been archived.
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
- i.postLogArchive(p, newPath);
+ i.postLogArchive(new FSWALIdentity(p), new FSWALIdentity(newPath));
}
}
}
@@ -836,7 +838,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// Tell our listeners that a log is going to be archived.
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
- i.preLogArchive(file.getPath(), p);
+ i.preLogArchive(new FSWALIdentity(file.getPath()), new FSWALIdentity(p));
}
}
@@ -846,7 +848,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// Tell our listeners that a log was archived.
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
- i.postLogArchive(file.getPath(), p);
+ i.postLogArchive(new FSWALIdentity(file.getPath()), new FSWALIdentity(p));
}
}
}
@@ -994,11 +996,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* https://issues.apache.org/jira/browse/HBASE-14004 for more details.
*/
@Override
- public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
+ public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity walId) {
rollWriterLock.lock();
try {
- Path currentPath = getOldPath();
- if (path.equals(currentPath)) {
+ FSWALIdentity currentPath = new FSWALIdentity(getOldPath());
+ if (walId.equals(currentPath)) {
W writer = this.writer;
return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index 13ffac7..f1c293b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -19,9 +19,9 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.yetus.audience.InterfaceAudience;
@@ -35,32 +35,32 @@ public interface WALActionsListener {
/**
* The WAL is going to be rolled. The oldPath can be null if this is
* the first log file from the regionserver.
- * @param oldPath the path to the old wal
- * @param newPath the path to the new wal
+ * @param oldWalId the identity to the old wal
+ * @param newWalId the identity to the new wal
*/
- default void preLogRoll(Path oldPath, Path newPath) throws IOException {}
+ default void preLogRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {}
/**
* The WAL has been rolled. The oldPath can be null if this is
* the first log file from the regionserver.
- * @param oldPath the path to the old wal
- * @param newPath the path to the new wal
+ * @param oldWalId the identity to the old wal
+ * @param newWalId the identity to the new wal
*/
- default void postLogRoll(Path oldPath, Path newPath) throws IOException {}
+ default void postLogRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {}
/**
* The WAL is going to be archived.
- * @param oldPath the path to the old wal
- * @param newPath the path to the new wal
+ * @param oldWalId the identity to the old wal
+ * @param newWalId the identity to the new wal
*/
- default void preLogArchive(Path oldPath, Path newPath) throws IOException {}
+ default void preLogArchive(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {}
/**
* The WAL has been archived.
- * @param oldPath the path to the old wal
- * @param newPath the path to the new wal
+ * @param oldWalId the identity to the old wal
+ * @param newWalId the identity to the new wal
*/
- default void postLogArchive(Path oldPath, Path newPath) throws IOException {}
+ default void postLogArchive(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {}
/**
* A request was made that the WAL be rolled.
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
index 40d6d0f..de8a1bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@@ -35,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.WALObserver;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -170,28 +170,28 @@ public class WALCoprocessorHost
/**
* Called before rolling the current WAL
- * @param oldPath the path of the current wal that we are replacing
- * @param newPath the path of the wal we are going to create
+ * @param oldWalId the identity of the current wal that we are replacing
+ * @param newWalId the identity of the wal we are going to create
*/
- public void preWALRoll(Path oldPath, Path newPath) throws IOException {
+ public void preWALRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
@Override
protected void call(WALObserver observer) throws IOException {
- observer.preWALRoll(this, oldPath, newPath);
+ observer.preWALRoll(this, oldWalId, newWalId);
}
});
}
/**
* Called after rolling the current WAL
- * @param oldPath the path of the wal that we replaced
- * @param newPath the path of the wal we have created and now is the current
+ * @param oldWalId the identity of the wal that we replaced
+ * @param newWalId the identity of the wal we have created and now is the current
*/
- public void postWALRoll(Path oldPath, Path newPath) throws IOException {
+ public void postWALRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
@Override
protected void call(WALObserver observer) throws IOException {
- observer.postWALRoll(this, oldPath, newPath);
+ observer.postWALRoll(this, oldWalId, newWalId);
}
});
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index f1bb538..4bb1fe3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,17 +60,19 @@ public class RecoveredReplicationSource extends ReplicationSource {
@Override
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
- PriorityBlockingQueue<Path> queue) {
+ PriorityBlockingQueue<WALIdentity> queue) {
return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
}
- public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
+ public void locateRecoveredWalIds(PriorityBlockingQueue<WALIdentity> queue) throws IOException {
boolean hasPathChanged = false;
- PriorityBlockingQueue<Path> newPaths =
- new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
- pathsLoop: for (Path path : queue) {
- if (fs.exists(path)) { // still in same location, don't need to do anything
- newPaths.add(path);
+ PriorityBlockingQueue<WALIdentity> newWalIds =
+ new PriorityBlockingQueue<WALIdentity>(queueSizePerGroup, new LogsComparator());
+ pathsLoop: for (WALIdentity walId : queue) {
+ if (fs.exists(((FSWALIdentity) walId).getPath())) {
+ // still in same location, don't need to
+ // do anything
+ newWalIds.add(walId);
continue;
}
// Path changed - try to find the right path.
@@ -76,8 +80,8 @@ public class RecoveredReplicationSource extends ReplicationSource {
if (server instanceof ReplicationSyncUp.DummyServer) {
// In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
// from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
- Path newPath = getReplSyncUpPath(path);
- newPaths.add(newPath);
+ Path newPath = getReplSyncUpPath(((FSWALIdentity)walId).getPath());
+ newWalIds.add(new FSWALIdentity(newPath));
continue;
} else {
// See if Path exists in the dead RS folder (there could be a chain of failures
@@ -89,27 +93,27 @@ public class RecoveredReplicationSource extends ReplicationSource {
final Path deadRsDirectory =
new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName
.getServerName()));
- Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
- deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
+ Path[] locs = new Path[] { new Path(deadRsDirectory, walId.getName()), new Path(
+ deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), walId.getName()) };
for (Path possibleLogLocation : locs) {
LOG.info("Possible location " + possibleLogLocation.toUri().toString());
if (manager.getFs().exists(possibleLogLocation)) {
// We found the right new location
- LOG.info("Log " + path + " still exists at " + possibleLogLocation);
- newPaths.add(possibleLogLocation);
+ LOG.info("Log " + walId + " still exists at " + possibleLogLocation);
+ newWalIds.add(new FSWALIdentity(possibleLogLocation));
continue pathsLoop;
}
}
}
// didn't find a new location
LOG.error(
- String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
- newPaths.add(path);
+ String.format("WAL Path %s doesn't exist and couldn't find its new location", walId));
+ newWalIds.add(walId);
}
}
if (hasPathChanged) {
- if (newPaths.size() != queue.size()) { // this shouldn't happen
+ if (newWalIds.size() != queue.size()) { // this shouldn't happen
LOG.error("Recovery queue size is incorrect");
throw new IOException("Recovery queue size error");
}
@@ -117,8 +121,8 @@ public class RecoveredReplicationSource extends ReplicationSource {
// since this is a recovered queue with no new incoming logs,
// there shouldn't be any concurrency issues
queue.clear();
- for (Path path : newPaths) {
- queue.add(path);
+ for (WALIdentity walId : newWalIds) {
+ queue.add(walId);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index b0d4db0..dbf1296 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -19,11 +19,12 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.concurrent.PriorityBlockingQueue;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +41,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
private final ReplicationQueueStorage replicationQueues;
public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
- PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
+ PriorityBlockingQueue<WALIdentity> queue, RecoveredReplicationSource source,
ReplicationQueueStorage queueStorage) {
super(conf, walGroupId, queue, source);
this.source = source;
@@ -58,7 +59,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
int numRetries = 0;
while (numRetries <= maxRetriesMultiplier) {
try {
- source.locateRecoveredPaths(queue);
+ source.locateRecoveredWalIds(queue);
break;
} catch (IOException e) {
LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 10fa50f..12c63fd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -61,7 +61,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,7 +88,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
// Queues of logs to process, entry in format of walGroupId->queue,
// each presents a queue for one wal group
- private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
+ private Map<String, PriorityBlockingQueue<WALIdentity>> queues = new HashMap<>();
// per group queue size, keep no more than this number of logs in each wal group
protected int queueSizePerGroup;
protected ReplicationQueueStorage queueStorage;
@@ -166,10 +168,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.queueStorage = queueStorage;
this.replicationPeer = replicationPeer;
this.manager = manager;
- this.fs = fs;
this.metrics = metrics;
this.clusterId = clusterId;
-
+ this.fs = fs;
this.queueId = queueId;
this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
@@ -191,9 +192,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
@Override
- public void enqueueLog(Path log) {
+ public void enqueueLog(WALIdentity log) {
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
- PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
+ PriorityBlockingQueue<WALIdentity> queue = queues.get(logPrefix);
if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
@@ -300,7 +301,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.walEntryFilter = new ChainWALEntryFilter(filters);
}
- private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
+ private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<WALIdentity> queue) {
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
@@ -328,9 +329,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
int queueSize = queues.get(walGroupId).size();
replicationDelay =
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
- Path currentPath = shipper.getCurrentPath();
+ WALIdentity currentPath = shipper.getCurrentWALIdentity();
try {
- fileSize = getFileSize(currentPath);
+ fileSize = getFileSize(((FSWALIdentity)currentPath).getPath());
} catch (IOException e) {
LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
fileSize = -1;
@@ -339,7 +340,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
statusBuilder.withPeerId(this.getPeerId())
.withQueueSize(queueSize)
.withWalGroup(walGroupId)
- .withCurrentPath(currentPath)
+ .withCurrentWalId(currentPath)
.withCurrentPosition(shipper.getCurrentPosition())
.withFileSize(fileSize)
.withAgeOfLastShippedOp(ageOfLastShippedOp)
@@ -361,12 +362,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
protected ReplicationSourceShipper createNewShipper(String walGroupId,
- PriorityBlockingQueue<Path> queue) {
+ PriorityBlockingQueue<WALIdentity> queue) {
return new ReplicationSourceShipper(conf, walGroupId, queue, this);
}
private ReplicationSourceWALReader createNewWALReader(String walGroupId,
- PriorityBlockingQueue<Path> queue, long startPosition) {
+ PriorityBlockingQueue<WALIdentity> queue, long startPosition) {
return replicationPeer.getPeerConfig().isSerial()
? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
: new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
@@ -374,7 +375,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected final void uncaughtException(Thread t, Throwable e) {
RSRpcServices.exitIfOOME(e);
- LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
+ LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentWALIdentity(),
+ e);
server.abort("Unexpected exception in " + t.getName(), e);
}
@@ -497,9 +499,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
initializeWALEntryFilter(peerClusterId);
// start workers
- for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
+ for (Map.Entry<String, PriorityBlockingQueue<WALIdentity>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
- PriorityBlockingQueue<Path> queue = entry.getValue();
+ PriorityBlockingQueue<WALIdentity> queue = entry.getValue();
tryStartNewShipper(walGroupId, queue);
}
}
@@ -593,11 +595,11 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
@Override
- public Path getCurrentPath() {
+ public WALIdentity getCurrentWALIdentity() {
// only for testing
for (ReplicationSourceShipper worker : workerThreads.values()) {
- if (worker.getCurrentPath() != null) {
- return worker.getCurrentPath();
+ if (worker.getCurrentWALIdentity() != null) {
+ return worker.getCurrentWALIdentity();
}
}
return null;
@@ -611,10 +613,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
/**
* Comparator used to compare logs together based on their start time
*/
- public static class LogsComparator implements Comparator<Path> {
+ public static class LogsComparator implements Comparator<WALIdentity> {
@Override
- public int compare(Path o1, Path o2) {
+ public int compare(WALIdentity o1, WALIdentity o2) {
return Long.compare(getTS(o1), getTS(o2));
}
@@ -628,7 +630,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
* @param p path to split
* @return start time
*/
- private static long getTS(Path p) {
+ private static long getTS(WALIdentity p) {
return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName());
}
}
@@ -642,7 +644,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
String walGroupId = entry.getKey();
ReplicationSourceShipper worker = entry.getValue();
long position = worker.getCurrentPosition();
- Path currentPath = worker.getCurrentPath();
+ WALIdentity currentPath = worker.getCurrentWALIdentity();
sb.append("walGroup [").append(walGroupId).append("]: ");
if (currentPath != null) {
sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index df7a8cc..3058fcc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -60,7 +61,7 @@ public interface ReplicationSourceInterface {
* Add a log to the list of logs to replicate
* @param log path to the log to replicate
*/
- void enqueueLog(Path log);
+ void enqueueLog(WALIdentity log);
/**
* Add hfile names to the queue to be replicated.
@@ -95,7 +96,7 @@ public interface ReplicationSourceInterface {
* Get the current log that's replicated
* @return the current log
*/
- Path getCurrentPath();
+ WALIdentity getCurrentWALIdentity();
/**
* Get the queue id that the source is replicating to
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 20c1215..b948d7e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -63,7 +63,9 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -114,7 +116,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
* is already synchronized on {@link #oldsources}. So no need synchronized on
* {@link #walsByIdRecoveredQueues}.</li>
- * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
+ * <li>Need synchronized on {@link #latestWalIds} to avoid the new open source miss new log.</li>
* <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
* to-be-removed peer.</li>
* </ul>
@@ -148,7 +150,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Configuration conf;
private final FileSystem fs;
// The paths to the latest log of each wal group, for new coming peers
- private final Map<String, Path> latestPaths;
+ private final Map<String, WALIdentity> latestWalIds;
// Path to the wals directories
private final Path logDir;
// Path to the wal archive
@@ -216,7 +218,7 @@ public class ReplicationSourceManager implements ReplicationListener {
tfb.setNameFormat("ReplicationExecutor-%d");
tfb.setDaemon(true);
this.executor.setThreadFactory(tfb.build());
- this.latestPaths = new HashMap<>();
+ this.latestWalIds = new HashMap<>();
this.replicationForBulkLoadDataEnabled = conf.getBoolean(
HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
@@ -365,22 +367,22 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationSourceInterface addSource(String peerId) throws IOException {
ReplicationPeer peer = replicationPeers.getPeer(peerId);
ReplicationSourceInterface src = createSource(peerId, peer);
- // synchronized on latestPaths to avoid missing the new log
- synchronized (this.latestPaths) {
+ // synchronized on latestWalIds to avoid missing the new log
+ synchronized (this.latestWalIds) {
this.sources.put(peerId, src);
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
this.walsById.put(peerId, walsByGroup);
// Add the latest wal to that source's queue
- if (!latestPaths.isEmpty()) {
- for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
- Path walPath = walPrefixAndPath.getValue();
+ if (!latestWalIds.isEmpty()) {
+ for (Map.Entry<String, WALIdentity> walPrefixAndId : latestWalIds.entrySet()) {
+ WALIdentity walId = walPrefixAndId.getValue();
NavigableSet<String> wals = new TreeSet<>();
- wals.add(walPath.getName());
- walsByGroup.put(walPrefixAndPath.getKey(), wals);
+ wals.add(walId.getName());
+ walsByGroup.put(walPrefixAndId.getKey(), wals);
// Abort RS and throw exception to make add peer failed
abortAndThrowIOExceptionWhenFail(
- () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName()));
- src.enqueueLog(walPath);
+ () -> this.queueStorage.addWAL(server.getServerName(), peerId, walId.getName()));
+ src.enqueueLog(walId);
}
}
}
@@ -417,7 +419,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// walsById.
ReplicationSourceInterface toRemove;
Map<String, NavigableSet<String>> wals = new HashMap<>();
- synchronized (latestPaths) {
+ synchronized (latestWalIds) {
toRemove = sources.put(peerId, src);
if (toRemove != null) {
LOG.info("Terminate replication source for " + toRemove.getPeerId());
@@ -476,15 +478,15 @@ public class ReplicationSourceManager implements ReplicationListener {
" state or config changed. Will close the previous replication source and open a new one";
ReplicationPeer peer = replicationPeers.getPeer(peerId);
ReplicationSourceInterface src = createSource(peerId, peer);
- // synchronized on latestPaths to avoid missing the new log
- synchronized (this.latestPaths) {
+ // synchronized on latestWalIds to avoid missing the new log
+ synchronized (this.latestWalIds) {
ReplicationSourceInterface toRemove = this.sources.put(peerId, src);
if (toRemove != null) {
LOG.info("Terminate replication source for " + toRemove.getPeerId());
toRemove.terminate(terminateMessage);
}
for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
- walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
+ walsByGroup.forEach(wal -> src.enqueueLog(new FSWALIdentity(new Path(this.logDir, wal))));
}
}
LOG.info("Startup replication source for " + src.getPeerId());
@@ -505,7 +507,7 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationSourceInterface replicationSource = createSource(queueId, peer);
this.oldsources.add(replicationSource);
for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
- walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal)));
+ walsByGroup.forEach(wal -> src.enqueueLog(new FSWALIdentity(wal)));
}
toStartup.add(replicationSource);
}
@@ -617,7 +619,7 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
WALEntryBatch entryBatch) {
- String fileName = entryBatch.getLastWalPath().getName();
+ String fileName = entryBatch.getLastWalId().getName();
interruptOrAbortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(),
source.getQueueId(), fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
cleanOldLogs(fileName, entryBatch.isEndOfFile(), source);
@@ -735,11 +737,11 @@ public class ReplicationSourceManager implements ReplicationListener {
// public because of we call it in TestReplicationEmptyWALRecovery
@VisibleForTesting
- public void preLogRoll(Path newLog) throws IOException {
+ public void preLogRoll(WALIdentity newLog) throws IOException {
String logName = newLog.getName();
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
- // synchronized on latestPaths to avoid the new open source miss the new log
- synchronized (this.latestPaths) {
+ // synchronized on latestWalIds to avoid the new open source miss the new log
+ synchronized (this.latestWalIds) {
// Add log to queue storage
for (ReplicationSourceInterface source : this.sources.values()) {
// If record log to queue storage failed, abort RS and throw exception to make log roll
@@ -778,14 +780,14 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
- // Add to latestPaths
- latestPaths.put(logPrefix, newLog);
+ // Add to latestWalIds
+ latestWalIds.put(logPrefix, newLog);
}
}
// public because of we call it in TestReplicationEmptyWALRecovery
@VisibleForTesting
- public void postLogRoll(Path newLog) throws IOException {
+ public void postLogRoll(WALIdentity newLog) throws IOException {
// This only updates the sources we own, not the recovered ones
for (ReplicationSourceInterface source : this.sources.values()) {
source.enqueueLog(newLog);
@@ -961,7 +963,7 @@ public class ReplicationSourceManager implements ReplicationListener {
}
oldsources.add(src);
for (String wal : walsSet) {
- src.enqueueLog(new Path(oldLogDir, wal));
+ src.enqueueLog(new FSWALIdentity(new Path(oldLogDir, wal)));
}
src.startup();
}
@@ -1038,16 +1040,16 @@ public class ReplicationSourceManager implements ReplicationListener {
}
@VisibleForTesting
- int getSizeOfLatestPath() {
- synchronized (latestPaths) {
- return latestPaths.size();
+ int getSizeOfLatestWalId() {
+ synchronized (latestWalIds) {
+ return latestWalIds.size();
}
}
@VisibleForTesting
- Set<Path> getLastestPath() {
- synchronized (latestPaths) {
- return Sets.newHashSet(latestPaths.values());
+ Set<WALIdentity> getLastestWalIds() {
+ synchronized (latestWalIds) {
+ return Sets.newHashSet(latestWalIds.values());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 5d6198e..8ecd5bd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
@@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,14 +56,14 @@ public class ReplicationSourceShipper extends Thread {
private final Configuration conf;
protected final String walGroupId;
- protected final PriorityBlockingQueue<Path> queue;
+ protected final PriorityBlockingQueue<WALIdentity> queue;
private final ReplicationSource source;
// Last position in the log that we sent to ZooKeeper
// It will be accessed by the stats thread so make it volatile
private volatile long currentPosition = -1;
// Path of the current log
- private Path currentPath;
+ private WALIdentity currentWalId;
// Current state of the worker thread
private volatile WorkerState state;
protected ReplicationSourceWALReader entryReader;
@@ -76,7 +76,7 @@ public class ReplicationSourceShipper extends Thread {
private final int getEntriesTimeout;
public ReplicationSourceShipper(Configuration conf, String walGroupId,
- PriorityBlockingQueue<Path> queue, ReplicationSource source) {
+ PriorityBlockingQueue<WALIdentity> queue, ReplicationSource source) {
this.conf = conf;
this.walGroupId = walGroupId;
this.queue = queue;
@@ -269,7 +269,7 @@ public class ReplicationSourceShipper extends Thread {
// record on zk, so let's call it. The last wal position maybe zero if end of file is true and
// there is no entry in the batch. It is OK because that the queue storage will ignore the zero
// position and the file will be removed soon in cleanOldLogs.
- if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
+ if (batch.isEndOfFile() || !batch.getLastWalId().equals(currentWalId) ||
batch.getLastWalPosition() != currentPosition) {
source.getSourceManager().logPositionAndCleanOldLogs(source, batch);
updated = true;
@@ -278,10 +278,10 @@ public class ReplicationSourceShipper extends Thread {
// the only exception is for recovered queue, if we reach the end of the queue, then there will
// no more files so here the currentPath may be null.
if (batch.isEndOfFile()) {
- currentPath = entryReader.getCurrentPath();
+ currentWalId = entryReader.getCurrentWalId();
currentPosition = 0L;
} else {
- currentPath = batch.getLastWalPath();
+ currentWalId = batch.getLastWalId();
currentPosition = batch.getLastWalPosition();
}
return updated;
@@ -293,8 +293,8 @@ public class ReplicationSourceShipper extends Thread {
name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
}
- Path getCurrentPath() {
- return entryReader.getCurrentPath();
+ WALIdentity getCurrentWALIdentity() {
+ return entryReader.getCurrentWalId();
}
long getCurrentPosition() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
index 27b25c4..6c1c0b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
@@ -19,11 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
@@ -46,13 +46,13 @@ class ReplicationSourceWALActionListener implements WALActionsListener {
}
@Override
- public void preLogRoll(Path oldPath, Path newPath) throws IOException {
- manager.preLogRoll(newPath);
+ public void preLogRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {
+ manager.preLogRoll(newWalId);
}
@Override
- public void postLogRoll(Path oldPath, Path newPath) throws IOException {
- manager.postLogRoll(newPath);
+ public void postLogRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {
+ manager.postLogRoll(newWalId);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index b3bdb02..a0b2ecd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -28,15 +28,16 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -55,7 +56,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript
class ReplicationSourceWALReader extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
- private final PriorityBlockingQueue<Path> logQueue;
+ private final PriorityBlockingQueue<WALIdentity> logQueue;
private final FileSystem fs;
private final Configuration conf;
private final WALEntryFilter filter;
@@ -89,7 +90,7 @@ class ReplicationSourceWALReader extends Thread {
* @param source replication source
*/
public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
- PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
+ PriorityBlockingQueue<WALIdentity> logQueue, long startPosition, WALEntryFilter filter,
ReplicationSource source) {
this.logQueue = logQueue;
this.currentPosition = startPosition;
@@ -181,29 +182,29 @@ class ReplicationSourceWALReader extends Thread {
batch.getNbEntries() >= replicationBatchCountCapacity;
}
- protected static final boolean switched(WALEntryStream entryStream, Path path) {
- Path newPath = entryStream.getCurrentPath();
- return newPath == null || !path.getName().equals(newPath.getName());
+ protected static final boolean switched(WALEntryStream entryStream, WALIdentity walId) {
+ WALIdentity newWalId = entryStream.getCurrentWalIdentity();
+ return newWalId == null || !walId.equals(newWalId);
}
protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
- Path currentPath = entryStream.getCurrentPath();
+ WALIdentity walId = entryStream.getCurrentWalIdentity();
if (!entryStream.hasNext()) {
// check whether we have switched a file
- if (currentPath != null && switched(entryStream, currentPath)) {
- return WALEntryBatch.endOfFile(currentPath);
+ if (walId != null && switched(entryStream, walId)) {
+ return WALEntryBatch.endOfFile(walId);
} else {
return null;
}
}
- if (currentPath != null) {
- if (switched(entryStream, currentPath)) {
- return WALEntryBatch.endOfFile(currentPath);
+ if (walId != null) {
+ if (switched(entryStream, walId)) {
+ return WALEntryBatch.endOfFile(walId);
}
} else {
// when reading from the entry stream first time we will enter here
- currentPath = entryStream.getCurrentPath();
+ walId = entryStream.getCurrentWalIdentity();
}
WALEntryBatch batch = createBatch(entryStream);
for (;;) {
@@ -217,7 +218,7 @@ class ReplicationSourceWALReader extends Thread {
}
boolean hasNext = entryStream.hasNext();
// always return if we have switched to a new file
- if (switched(entryStream, currentPath)) {
+ if (switched(entryStream, walId)) {
batch.setEndOfFile(true);
break;
}
@@ -248,7 +249,7 @@ class ReplicationSourceWALReader extends Thread {
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
logQueue.size() > 1 && this.eofAutoRecovery) {
try {
- if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
+ if (fs.getFileStatus(((FSWALIdentity)logQueue.peek()).getPath()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
logQueue.remove();
currentPosition = 0;
@@ -259,11 +260,11 @@ class ReplicationSourceWALReader extends Thread {
}
}
- public Path getCurrentPath() {
- // if we've read some WAL entries, get the Path we read from
+ public WALIdentity getCurrentWalId() {
+ // if we've read some WAL entries, get the walId we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
if (batchQueueHead != null) {
- return batchQueueHead.getLastWalPath();
+ return batchQueueHead.getLastWalId();
}
// otherwise, we must be currently reading from the head of the log queue
return logQueue.peek();
@@ -280,7 +281,7 @@ class ReplicationSourceWALReader extends Thread {
}
protected final WALEntryBatch createBatch(WALEntryStream entryStream) {
- return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
+ return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentWalIdentity());
}
protected final Entry filterEntry(Entry entry) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
index 10d6cd5..bd0d83a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
@@ -17,14 +17,14 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public final class ReplicationStatus {
private final String peerId;
private final String walGroup;
- private final Path currentPath;
+ private final WALIdentity currentWalId;
private final int queueSize;
private final long ageOfLastShippedOp;
private final long replicationDelay;
@@ -34,7 +34,7 @@ public final class ReplicationStatus {
private ReplicationStatus(ReplicationStatusBuilder builder) {
this.peerId = builder.peerId;
this.walGroup = builder.walGroup;
- this.currentPath = builder.currentPath;
+ this.currentWalId = builder.currentWalId;
this.queueSize = builder.queueSize;
this.ageOfLastShippedOp = builder.ageOfLastShippedOp;
this.replicationDelay = builder.replicationDelay;
@@ -70,8 +70,8 @@ public final class ReplicationStatus {
return replicationDelay;
}
- public Path getCurrentPath() {
- return currentPath;
+ public WALIdentity getCurrentWalId() {
+ return currentWalId;
}
public static ReplicationStatusBuilder newBuilder() {
@@ -81,7 +81,7 @@ public final class ReplicationStatus {
public static class ReplicationStatusBuilder {
private String peerId = "UNKNOWN";
private String walGroup = "UNKNOWN";
- private Path currentPath = new Path("UNKNOWN");
+ private WALIdentity currentWalId = null;
private int queueSize = -1;
private long ageOfLastShippedOp = -1;
private long replicationDelay = -1;
@@ -103,8 +103,8 @@ public final class ReplicationStatus {
return this;
}
- public ReplicationStatusBuilder withCurrentPath(Path currentPath) {
- this.currentPath = currentPath;
+ public ReplicationStatusBuilder withCurrentWalId(WALIdentity currentWalId) {
+ this.currentWalId = currentWalId;
return this;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 9edcc8a..5f33e73 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -21,11 +21,11 @@ import java.io.IOException;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -44,7 +44,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
private final SerialReplicationChecker checker;
public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
- PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
+ PriorityBlockingQueue<WALIdentity> logQueue, long startPosition, WALEntryFilter filter,
ReplicationSource source) {
super(fs, conf, logQueue, startPosition, filter, source);
checker = new SerialReplicationChecker(conf, source);
@@ -53,22 +53,22 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
@Override
protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
- Path currentPath = entryStream.getCurrentPath();
+ WALIdentity currentWalId = entryStream.getCurrentWalIdentity();
if (!entryStream.hasNext()) {
// check whether we have switched a file
- if (currentPath != null && switched(entryStream, currentPath)) {
- return WALEntryBatch.endOfFile(currentPath);
+ if (currentWalId != null && switched(entryStream, currentWalId)) {
+ return WALEntryBatch.endOfFile(currentWalId);
} else {
return null;
}
}
- if (currentPath != null) {
- if (switched(entryStream, currentPath)) {
- return WALEntryBatch.endOfFile(currentPath);
+ if (currentWalId != null) {
+ if (switched(entryStream, currentWalId)) {
+ return WALEntryBatch.endOfFile(currentWalId);
}
} else {
// when reading from the entry stream first time we will enter here
- currentPath = entryStream.getCurrentPath();
+ currentWalId = entryStream.getCurrentWalIdentity();
}
long positionBefore = entryStream.getPosition();
WALEntryBatch batch = createBatch(entryStream);
@@ -115,7 +115,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
}
boolean hasNext = entryStream.hasNext();
// always return if we have switched to a new file.
- if (switched(entryStream, currentPath)) {
+ if (switched(entryStream, currentWalId)) {
batch.setEndOfFile(true);
break;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 22b2de7..b651a9b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -21,8 +21,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -36,7 +36,7 @@ class WALEntryBatch {
private List<Entry> walEntries;
// last WAL that was read
- private Path lastWalPath;
+ private WALIdentity lastWalId;
// position in WAL of last entry in this batch
private long lastWalPosition = 0;
// number of distinct row keys in this batch
@@ -51,16 +51,16 @@ class WALEntryBatch {
private boolean endOfFile;
/**
- * @param lastWalPath Path of the WAL the last entry in this batch was read from
+ * @param lastWalId of the WAL the last entry in this batch was read from
*/
- WALEntryBatch(int maxNbEntries, Path lastWalPath) {
+ WALEntryBatch(int maxNbEntries, WALIdentity lastWalId) {
this.walEntries = new ArrayList<>(maxNbEntries);
- this.lastWalPath = lastWalPath;
+ this.lastWalId = lastWalId;
}
- static WALEntryBatch endOfFile(Path lastWalPath) {
- WALEntryBatch batch = new WALEntryBatch(0, lastWalPath);
+ static WALEntryBatch endOfFile(WALIdentity lastWalId) {
+ WALEntryBatch batch = new WALEntryBatch(0, lastWalId);
batch.setLastWalPosition(-1L);
batch.setEndOfFile(true);
return batch;
@@ -78,10 +78,10 @@ class WALEntryBatch {
}
/**
- * @return the path of the last WAL that was read.
+ * @return the Id of the last WAL that was read.
*/
- public Path getLastWalPath() {
- return lastWalPath;
+ public WALIdentity getLastWalId() {
+ return lastWalId;
}
/**
@@ -160,7 +160,7 @@ class WALEntryBatch {
@Override
public String toString() {
- return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath +
+ return "WALEntryBatch [walEntries=" + walEntries + ", lastWalId=" + lastWalId +
", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" +
nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" +
endOfFile + "]";
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 0393af4..3d90153 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -33,9 +33,11 @@ import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -43,9 +45,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
- * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
- * dequeues it and starts reading from the next.
+ * Streaming access to WAL entries. This class is given a queue of WAL {@link WALIdentity}, and
+ * continually iterates through all the WAL {@link Entry} in the queue. When it's done reading from
+ * a Wal, it dequeues it and starts reading from the next.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@@ -53,7 +55,7 @@ class WALEntryStream implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
private Reader reader;
- private Path currentPath;
+ private WALIdentity currentWAlIdentity;
// cache of next entry for hasNext()
private Entry currentEntry;
// position for the current entry. As now we support peek, which means that the upper layer may
@@ -62,7 +64,7 @@ class WALEntryStream implements Closeable {
private long currentPositionOfEntry = 0;
// position after reading current entry
private long currentPositionOfReader = 0;
- private final PriorityBlockingQueue<Path> logQueue;
+ private final PriorityBlockingQueue<WALIdentity> logQueue;
private final FileSystem fs;
private final Configuration conf;
private final WALFileLengthProvider walFileLengthProvider;
@@ -72,7 +74,7 @@ class WALEntryStream implements Closeable {
/**
* Create an entry stream over the given queue at the given start position
- * @param logQueue the queue of WAL paths
+ * @param logQueue the queue of WAL walIds
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
* @param startPosition the position in the first WAL to start reading at
@@ -80,9 +82,9 @@ class WALEntryStream implements Closeable {
* @param metrics replication metrics
* @throws IOException
*/
- public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
- long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
- MetricsSource metrics) throws IOException {
+ public WALEntryStream(PriorityBlockingQueue<WALIdentity> logQueue, FileSystem fs,
+ Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider,
+ ServerName serverName, MetricsSource metrics) throws IOException {
this.logQueue = logQueue;
this.fs = fs;
this.conf = conf;
@@ -135,16 +137,16 @@ class WALEntryStream implements Closeable {
}
/**
- * @return the {@link Path} of the current WAL
+ * @return the {@link WALIdentity} of the current WAL
*/
- public Path getCurrentPath() {
- return currentPath;
+ public WALIdentity getCurrentWalIdentity() {
+ return currentWAlIdentity;
}
- private String getCurrentPathStat() {
+ private String getCurrentWalIdStat() {
StringBuilder sb = new StringBuilder();
- if (currentPath != null) {
- sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
+ if (currentWAlIdentity != null) {
+ sb.append("currently replicating from: ").append(currentWAlIdentity).append(" at position: ")
.append(currentPositionOfEntry).append("\n");
} else {
sb.append("no replication ongoing, waiting for new log");
@@ -157,7 +159,7 @@ class WALEntryStream implements Closeable {
* false)
*/
public void reset() throws IOException {
- if (reader != null && currentPath != null) {
+ if (reader != null && currentWAlIdentity != null) {
resetReader();
}
}
@@ -166,8 +168,8 @@ class WALEntryStream implements Closeable {
currentPositionOfEntry = position;
}
- private void setCurrentPath(Path path) {
- this.currentPath = path;
+ private void setCurrentWalId(WALIdentity walId) {
+ this.currentWAlIdentity = walId;
}
private void tryAdvanceEntry() throws IOException {
@@ -203,10 +205,10 @@ class WALEntryStream implements Closeable {
final long trailerSize = currentTrailerSize();
FileStatus stat = null;
try {
- stat = fs.getFileStatus(this.currentPath);
+ stat = fs.getFileStatus(((FSWALIdentity)this.currentWAlIdentity).getPath());
} catch (IOException exception) {
LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
- currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat());
+ currentWAlIdentity, trailerSize < 0 ? "was not" : "was", getCurrentWalIdStat());
metrics.incrUnknownFileLengthForClosedWAL();
}
// Here we use currentPositionOfReader instead of currentPositionOfEntry.
@@ -222,7 +224,7 @@ class WALEntryStream implements Closeable {
LOG.debug(
"Reached the end of WAL file '{}'. It was not closed cleanly," +
" so we did not parse {} bytes of data. This is normally ok.",
- currentPath, skippedBytes);
+ currentWAlIdentity, skippedBytes);
metrics.incrUncleanlyClosedWALs();
metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
}
@@ -230,7 +232,7 @@ class WALEntryStream implements Closeable {
LOG.warn(
"Processing end of WAL file '{}'. At position {}, which is too far away from" +
" reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
- currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
+ currentWAlIdentity, currentPositionOfReader, stat.getLen(), getCurrentWalIdStat());
setPosition(0);
resetReader();
metrics.incrRestartedWALReading();
@@ -239,15 +241,15 @@ class WALEntryStream implements Closeable {
}
}
if (LOG.isTraceEnabled()) {
- LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " +
- (stat == null ? "N/A" : stat.getLen()));
+ LOG.trace("Reached the end of log " + this.currentWAlIdentity
+ + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
}
metrics.incrCompletedWAL();
return true;
}
private void dequeueCurrentLog() throws IOException {
- LOG.debug("Reached the end of log {}", currentPath);
+ LOG.debug("Reached the end of log {}", currentWAlIdentity);
closeReader();
logQueue.remove();
setPosition(0);
@@ -260,12 +262,13 @@ class WALEntryStream implements Closeable {
private boolean readNextEntryAndRecordReaderPosition() throws IOException {
Entry readEntry = reader.next();
long readerPos = reader.getPosition();
- OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
+ OptionalLong fileLength =
+ walFileLengthProvider.getLogFileSizeIfBeingWritten(currentWAlIdentity);
if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
// see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
// data, so we need to make sure that we do not read beyond the committed file length.
if (LOG.isDebugEnabled()) {
- LOG.debug("The provider tells us the valid length for " + currentPath + " is " +
+ LOG.debug("The provider tells us the valid length for " + currentWAlIdentity + " is " +
fileLength.getAsLong() + ", but we have advanced to " + readerPos);
}
resetReader();
@@ -297,16 +300,16 @@ class WALEntryStream implements Closeable {
// open a reader on the next log in queue
private boolean openNextLog() throws IOException {
- Path nextPath = logQueue.peek();
- if (nextPath != null) {
- openReader(nextPath);
+ WALIdentity nextWalId = logQueue.peek();
+ if (nextWalId != null) {
+ openReader((FSWALIdentity)nextWalId);
if (reader != null) {
return true;
}
} else {
// no more files in queue, this could happen for recovered queue, or for a wal group of a sync
// replication peer which has already been transited to DA or S.
- setCurrentPath(null);
+ setCurrentWalId(null);
}
return false;
}
@@ -336,38 +339,39 @@ class WALEntryStream implements Closeable {
return path;
}
- private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
+ private void handleFileNotFound(FSWALIdentity walId, FileNotFoundException fnfe)
+ throws IOException {
// If the log was archived, continue reading from there
- Path archivedLog = getArchivedLog(path);
- if (!path.equals(archivedLog)) {
+ FSWALIdentity archivedLog = new FSWALIdentity(getArchivedLog(walId.getPath()));
+ if (!walId.equals(archivedLog)) {
openReader(archivedLog);
} else {
throw fnfe;
}
}
- private void openReader(Path path) throws IOException {
+ private void openReader(FSWALIdentity walId) throws IOException {
try {
// Detect if this is a new file, if so get a new reader else
// reset the current reader so that we see the new data
- if (reader == null || !getCurrentPath().equals(path)) {
+ if (reader == null || !getCurrentWalIdentity().equals(walId)) {
closeReader();
- reader = WALFactory.createReader(fs, path, conf);
+ reader = WALFactory.createReader(fs, walId.getPath(), conf);
seek();
- setCurrentPath(path);
+ setCurrentWalId(walId);
} else {
resetReader();
}
} catch (FileNotFoundException fnfe) {
- handleFileNotFound(path, fnfe);
+ handleFileNotFound(walId, fnfe);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
if (!(ioe instanceof FileNotFoundException)) throw ioe;
- handleFileNotFound(path, (FileNotFoundException)ioe);
+ handleFileNotFound(walId, (FileNotFoundException)ioe);
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
- LOG.warn("Try to recover the WAL lease " + currentPath, lnre);
- recoverLease(conf, currentPath);
+ LOG.warn("Try to recover the WAL lease " + currentWAlIdentity, lnre);
+ recoverLease(conf, ((FSWALIdentity)currentWAlIdentity).getPath());
reader = null;
} catch (NullPointerException npe) {
// Workaround for race condition in HDFS-4380
@@ -402,8 +406,9 @@ class WALEntryStream implements Closeable {
seek();
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
- Path archivedLog = getArchivedLog(currentPath);
- if (!currentPath.equals(archivedLog)) {
+ FSWALIdentity archivedLog =
+ new FSWALIdentity(getArchivedLog(((FSWALIdentity) currentWAlIdentity).getPath()));
+ if (!currentWAlIdentity.equals(archivedLog)) {
openReader(archivedLog);
} else {
throw fnfe;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
index 010fa69..d0b63cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.util.OptionalLong;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -30,5 +30,5 @@ import org.apache.yetus.audience.InterfaceAudience;
@FunctionalInterface
public interface WALFileLengthProvider {
- OptionalLong getLogFileSizeIfBeingWritten(Path path);
+ OptionalLong getLogFileSizeIfBeingWritten(WALIdentity walId);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 75439fe..8dee012 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -63,7 +63,29 @@ class DisabledWALProvider implements WALProvider {
if (null == providerId) {
providerId = "defaultDisabled";
}
- disabled = new DisabledWAL(new Path(FSUtils.getWALRootDir(conf), providerId), conf, null);
+ final Path path = new Path(FSUtils.getWALRootDir(conf), providerId);
+ disabled = new DisabledWAL(new WALIdentity() {
+
+ @Override
+ public int compareTo(WALIdentity o) {
+ return 0;
+ }
+
+ @Override
+ public String getName() {
+ return path.getName();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+ }, conf, null);
}
@Override
@@ -90,14 +112,14 @@ class DisabledWALProvider implements WALProvider {
private static class DisabledWAL implements WAL {
protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
- protected final Path path;
+ protected final WALIdentity walId;
protected final WALCoprocessorHost coprocessorHost;
protected final AtomicBoolean closed = new AtomicBoolean(false);
- public DisabledWAL(final Path path, final Configuration conf,
+ public DisabledWAL(final WALIdentity walId, final Configuration conf,
final List<WALActionsListener> listeners) {
this.coprocessorHost = new WALCoprocessorHost(this, conf);
- this.path = path;
+ this.walId = walId;
if (null != listeners) {
for(WALActionsListener listener : listeners) {
registerWALActionsListener(listener);
@@ -123,14 +145,14 @@ class DisabledWALProvider implements WALProvider {
}
for (WALActionsListener listener : listeners) {
try {
- listener.preLogRoll(path, path);
+ listener.preLogRoll(walId, walId);
} catch (IOException exception) {
LOG.debug("Ignoring exception from listener.", exception);
}
}
for (WALActionsListener listener : listeners) {
try {
- listener.postLogRoll(path, path);
+ listener.postLogRoll(walId, walId);
} catch (IOException exception) {
LOG.debug("Ignoring exception from listener.", exception);
}
@@ -243,7 +265,7 @@ class DisabledWALProvider implements WALProvider {
}
@Override
- public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
+ public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity path) {
return OptionalLong.empty();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
index 7cd39ea..b02a4d3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.wal;
import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -27,7 +28,6 @@ import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
-import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;