You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2018/12/13 22:14:32 UTC

[2/2] hbase git commit: HBASE-21246 Introduce WALIdentity to identify WALs instead of a Path

HBASE-21246 Introduce WALIdentity to identify WALs instead of a Path

Builds on top of tyu's original idea.

Signed-off-by: Josh Elser <el...@apache.org>
Signed-off-by: Reid Chan <re...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c738e157
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c738e157
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c738e157

Branch: refs/heads/HBASE-20952
Commit: c738e1575f37000c6feff00362399312740b3c74
Parents: ebfc04d
Author: Ankit Singhal <an...@gmail.com>
Authored: Thu Dec 13 16:59:23 2018 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Dec 13 17:10:28 2018 -0500

----------------------------------------------------------------------
 .../apache/hadoop/hbase/wal/FSWALIdentity.java  | 78 ++++++++++++++++
 .../apache/hadoop/hbase/wal/WALIdentity.java    | 39 ++++++++
 .../regionserver/ReplicationStatusTmpl.jamon    |  4 +-
 .../hadoop/hbase/coprocessor/WALObserver.java   | 14 +--
 .../hbase/regionserver/wal/AbstractFSWAL.java   | 24 ++---
 .../regionserver/wal/WALActionsListener.java    | 26 +++---
 .../regionserver/wal/WALCoprocessorHost.java    | 18 ++--
 .../RecoveredReplicationSource.java             | 40 +++++----
 .../RecoveredReplicationSourceShipper.java      |  7 +-
 .../regionserver/ReplicationSource.java         | 44 ++++-----
 .../ReplicationSourceInterface.java             |  5 +-
 .../regionserver/ReplicationSourceManager.java  | 64 +++++++-------
 .../regionserver/ReplicationSourceShipper.java  | 18 ++--
 .../ReplicationSourceWALActionListener.java     | 10 +--
 .../ReplicationSourceWALReader.java             | 39 ++++----
 .../regionserver/ReplicationStatus.java         | 16 ++--
 .../SerialReplicationSourceWALReader.java       | 20 ++---
 .../replication/regionserver/WALEntryBatch.java | 22 ++---
 .../regionserver/WALEntryStream.java            | 93 +++++++++++---------
 .../regionserver/WALFileLengthProvider.java     |  4 +-
 .../hadoop/hbase/wal/DisabledWALProvider.java   | 36 ++++++--
 .../apache/hadoop/hbase/wal/FSHLogProvider.java |  2 +-
 .../coprocessor/SampleRegionWALCoprocessor.java |  6 +-
 .../hbase/fs/TestBlockReorderMultiBlocks.java   |  4 +-
 .../wal/AbstractTestLogRollPeriod.java          | 18 ++--
 .../hbase/regionserver/wal/TestLogRolling.java  | 15 ++--
 .../wal/TestWALActionsListener.java             |  5 +-
 .../replication/ReplicationSourceDummy.java     | 12 +--
 .../replication/TestMasterReplication.java      |  4 +-
 .../replication/TestMultiSlaveReplication.java  |  5 +-
 .../TestReplicationEmptyWALRecovery.java        | 13 ++-
 .../TestReplicationMetricsforUI.java            |  8 +-
 .../master/TestRecoverStandbyProcedure.java     |  1 +
 .../regionserver/TestReplicationSource.java     |  6 +-
 .../TestReplicationSourceManager.java           | 26 +++---
 .../regionserver/TestWALEntryStream.java        | 51 +++++------
 36 files changed, 490 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java
new file mode 100644
index 0000000..d12a1cf
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/*
+ * This is distributed FS oriented implementation for WALIdentity
+ */
+@InterfaceAudience.Private
+public class FSWALIdentity implements WALIdentity{
+  private String name;
+  private Path path;
+
+  public FSWALIdentity(String name) {
+    this.path = new Path(name);
+    this.name = path.getName();
+  }
+
+  public FSWALIdentity(Path path) {
+    this.path = path;
+    if (path !=null) {
+      this.name = path.getName();
+    }
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * @return {@link Path} object of the name encapsulated in WALIdentity
+   */
+  public Path getPath() {
+    return path;
+  }
+
+  @Override
+  public int compareTo(WALIdentity o) {
+    FSWALIdentity that = (FSWALIdentity)o;
+    return this.path.compareTo(that.getPath());
+  }
+
+  @Override
+  public String toString() {
+    return this.path.toString();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof FSWALIdentity)) {
+      return false;
+    }
+    FSWALIdentity that = (FSWALIdentity) obj;
+    return this.path.equals(that.getPath());
+  }
+  @Override
+  public int hashCode() {
+    return this.path.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java
new file mode 100644
index 0000000..fa7d2fa
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This interface defines the identification of WAL for both stream based and distributed FileSystem
+ * based environments.
+ * See {@link #getName()} method.
+ */
+@InterfaceAudience.Private
+public interface WALIdentity extends Comparable<WALIdentity> {
+
+  /**
+   * WALIdentity is uniquely identifying a WAL stored in this WALProvider.
+   * This name can be thought of as a human-readable, serialized form of the WALIdentity.
+   *
+   * The same value should be returned across calls to this method.
+   *
+   * @return name of the wal
+   */
+  String getName();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
index 7dc1c7f..e1aceea 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
@@ -71,7 +71,7 @@
                  <tr>
                      <td><% entry.getValue().getPeerId() %></td>
                      <td><% entry.getValue().getWalGroup() %></td>
-                     <td><% entry.getValue().getCurrentPath() %> </td>
+                     <td><% entry.getValue().getCurrentWalId() %> </td>
                      <td><% StringUtils.humanSize(entry.getValue().getFileSize()) %></td>
                      <td><% entry.getValue().getQueueSize() %></td>
                      <td><% StringUtils.humanSize(entry.getValue().getCurrentPosition()) %></td>
@@ -96,7 +96,7 @@
                  <tr>
                      <td><% entry.getValue().getPeerId() %></td>
                      <td><% entry.getValue().getWalGroup() %></td>
-                     <td><% entry.getValue().getCurrentPath() %> </td>
+                     <td><% entry.getValue().getCurrentWalId() %> </td>
                      <td><% StringUtils.humanTimeDiff(entry.getValue().getAgeOfLastShippedOp()) %></td>
                      <td><% entry.getValue().getReplicationDelay() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(entry.getValue().getReplicationDelay()) %></td>
                  </tr>

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
index b2fa7ca..436f3c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java
@@ -21,10 +21,10 @@ package org.apache.hadoop.hbase.coprocessor;
 
 import java.io.IOException;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
@@ -92,18 +92,18 @@ public interface WALObserver {
 
   /**
    * Called before rolling the current WAL
-   * @param oldPath the path of the current wal that we are replacing
-   * @param newPath the path of the wal we are going to create
+   * @param oldPath the identity of the current wal that we are replacing
+   * @param newPath the identity of the wal we are going to create
    */
   default void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
-      Path oldPath, Path newPath) throws IOException {}
+      WALIdentity oldPath, WALIdentity newPath) throws IOException {}
 
   /**
    * Called after rolling the current WAL
-   * @param oldPath the path of the wal that we replaced
-   * @param newPath the path of the wal we have created and now is the current
+   * @param oldPath the identity of the wal that we replaced
+   * @param newPath the identity of the wal we have created and now is the current
    */
   default void postWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
-      Path oldPath, Path newPath) throws IOException {}
+      WALIdentity oldPath, WALIdentity newPath) throws IOException {}
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 7915ac3..ab58b67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -65,9 +65,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
 import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
@@ -544,11 +546,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
    */
   private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
       throws IOException {
-    coprocessorHost.preWALRoll(oldPath, newPath);
+    coprocessorHost.preWALRoll(new FSWALIdentity(oldPath), new FSWALIdentity(newPath));
 
     if (!this.listeners.isEmpty()) {
       for (WALActionsListener i : this.listeners) {
-        i.preLogRoll(oldPath, newPath);
+        i.preLogRoll(new FSWALIdentity(oldPath), new FSWALIdentity(newPath));
       }
     }
   }
@@ -560,11 +562,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
       throws IOException {
     if (!this.listeners.isEmpty()) {
       for (WALActionsListener i : this.listeners) {
-        i.postLogRoll(oldPath, newPath);
+        i.postLogRoll(new FSWALIdentity(oldPath), new FSWALIdentity(newPath));
       }
     }
 
-    coprocessorHost.postWALRoll(oldPath, newPath);
+    coprocessorHost.postWALRoll(new FSWALIdentity(oldPath), new FSWALIdentity(newPath));
   }
 
   // public only until class moves to o.a.h.h.wal
@@ -650,7 +652,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     // Tell our listeners that a log is going to be archived.
     if (!this.listeners.isEmpty()) {
       for (WALActionsListener i : this.listeners) {
-        i.preLogArchive(p, newPath);
+        i.preLogArchive(new FSWALIdentity(p), new FSWALIdentity(newPath));
       }
     }
     LOG.info("Archiving " + p + " to " + newPath);
@@ -660,7 +662,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     // Tell our listeners that a log has been archived.
     if (!this.listeners.isEmpty()) {
       for (WALActionsListener i : this.listeners) {
-        i.postLogArchive(p, newPath);
+        i.postLogArchive(new FSWALIdentity(p), new FSWALIdentity(newPath));
       }
     }
   }
@@ -836,7 +838,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
         // Tell our listeners that a log is going to be archived.
         if (!this.listeners.isEmpty()) {
           for (WALActionsListener i : this.listeners) {
-            i.preLogArchive(file.getPath(), p);
+            i.preLogArchive(new FSWALIdentity(file.getPath()), new FSWALIdentity(p));
           }
         }
 
@@ -846,7 +848,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
         // Tell our listeners that a log was archived.
         if (!this.listeners.isEmpty()) {
           for (WALActionsListener i : this.listeners) {
-            i.postLogArchive(file.getPath(), p);
+            i.postLogArchive(new FSWALIdentity(file.getPath()), new FSWALIdentity(p));
           }
         }
       }
@@ -994,11 +996,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
    * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
    */
   @Override
-  public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
+  public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity walId) {
     rollWriterLock.lock();
     try {
-      Path currentPath = getOldPath();
-      if (path.equals(currentPath)) {
+      FSWALIdentity currentPath = new FSWALIdentity(getOldPath());
+      if (walId.equals(currentPath)) {
         W writer = this.writer;
         return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
       } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index 13ffac7..f1c293b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -19,9 +19,9 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -35,32 +35,32 @@ public interface WALActionsListener {
   /**
    * The WAL is going to be rolled. The oldPath can be null if this is
    * the first log file from the regionserver.
-   * @param oldPath the path to the old wal
-   * @param newPath the path to the new wal
+   * @param oldWalId the identity to the old wal
+   * @param newWalId the identity to the new wal
    */
-  default void preLogRoll(Path oldPath, Path newPath) throws IOException {}
+  default void preLogRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {}
 
   /**
    * The WAL has been rolled. The oldPath can be null if this is
    * the first log file from the regionserver.
-   * @param oldPath the path to the old wal
-   * @param newPath the path to the new wal
+   * @param oldWalId the identity to the old wal
+   * @param newWalId the identity to the new wal
    */
-  default void postLogRoll(Path oldPath, Path newPath) throws IOException {}
+  default void postLogRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {}
 
   /**
    * The WAL is going to be archived.
-   * @param oldPath the path to the old wal
-   * @param newPath the path to the new wal
+   * @param oldWalId the identity to the old wal
+   * @param newWalId the identity to the new wal
    */
-  default void preLogArchive(Path oldPath, Path newPath) throws IOException {}
+  default void preLogArchive(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {}
 
   /**
    * The WAL has been archived.
-   * @param oldPath the path to the old wal
-   * @param newPath the path to the new wal
+   * @param oldWalId the identity to the old wal
+   * @param newWalId the identity to the new wal
    */
-  default void postLogArchive(Path oldPath, Path newPath) throws IOException {}
+  default void postLogArchive(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {}
 
   /**
    * A request was made that the WAL be rolled.

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
index 40d6d0f..de8a1bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@@ -35,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.WALObserver;
 import org.apache.hadoop.hbase.metrics.MetricRegistry;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -170,28 +170,28 @@ public class WALCoprocessorHost
 
   /**
    * Called before rolling the current WAL
-   * @param oldPath the path of the current wal that we are replacing
-   * @param newPath the path of the wal we are going to create
+   * @param oldWalId the identity of the current wal that we are replacing
+   * @param newWalId the identity of the wal we are going to create
    */
-  public void preWALRoll(Path oldPath, Path newPath) throws IOException {
+  public void preWALRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {
     execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
       @Override
       protected void call(WALObserver observer) throws IOException {
-        observer.preWALRoll(this, oldPath, newPath);
+        observer.preWALRoll(this, oldWalId, newWalId);
       }
     });
   }
 
   /**
    * Called after rolling the current WAL
-   * @param oldPath the path of the wal that we replaced
-   * @param newPath the path of the wal we have created and now is the current
+   * @param oldWalId the identity of the wal that we replaced
+   * @param newWalId the identity of the wal we have created and now is the current
    */
-  public void postWALRoll(Path oldPath, Path newPath) throws IOException {
+  public void postWALRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {
     execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
       @Override
       protected void call(WALObserver observer) throws IOException {
-        observer.postWALRoll(this, oldPath, newPath);
+        observer.postWALRoll(this, oldWalId, newWalId);
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index f1bb538..4bb1fe3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,17 +60,19 @@ public class RecoveredReplicationSource extends ReplicationSource {
 
   @Override
   protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
-      PriorityBlockingQueue<Path> queue) {
+      PriorityBlockingQueue<WALIdentity> queue) {
     return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
   }
 
-  public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
+  public void locateRecoveredWalIds(PriorityBlockingQueue<WALIdentity> queue) throws IOException {
     boolean hasPathChanged = false;
-    PriorityBlockingQueue<Path> newPaths =
-        new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
-    pathsLoop: for (Path path : queue) {
-      if (fs.exists(path)) { // still in same location, don't need to do anything
-        newPaths.add(path);
+    PriorityBlockingQueue<WALIdentity> newWalIds =
+        new PriorityBlockingQueue<WALIdentity>(queueSizePerGroup, new LogsComparator());
+    pathsLoop: for (WALIdentity walId : queue) {
+      if (fs.exists(((FSWALIdentity) walId).getPath())) {
+        // still in same location, don't need to
+        // do anything
+        newWalIds.add(walId);
         continue;
       }
       // Path changed - try to find the right path.
@@ -76,8 +80,8 @@ public class RecoveredReplicationSource extends ReplicationSource {
       if (server instanceof ReplicationSyncUp.DummyServer) {
         // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
         // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
-        Path newPath = getReplSyncUpPath(path);
-        newPaths.add(newPath);
+        Path newPath = getReplSyncUpPath(((FSWALIdentity)walId).getPath());
+        newWalIds.add(new FSWALIdentity(newPath));
         continue;
       } else {
         // See if Path exists in the dead RS folder (there could be a chain of failures
@@ -89,27 +93,27 @@ public class RecoveredReplicationSource extends ReplicationSource {
           final Path deadRsDirectory =
               new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName
                   .getServerName()));
-          Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
-              deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
+          Path[] locs = new Path[] { new Path(deadRsDirectory, walId.getName()), new Path(
+              deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), walId.getName()) };
           for (Path possibleLogLocation : locs) {
             LOG.info("Possible location " + possibleLogLocation.toUri().toString());
             if (manager.getFs().exists(possibleLogLocation)) {
               // We found the right new location
-              LOG.info("Log " + path + " still exists at " + possibleLogLocation);
-              newPaths.add(possibleLogLocation);
+              LOG.info("Log " + walId + " still exists at " + possibleLogLocation);
+              newWalIds.add(new FSWALIdentity(possibleLogLocation));
               continue pathsLoop;
             }
           }
         }
         // didn't find a new location
         LOG.error(
-          String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
-        newPaths.add(path);
+          String.format("WAL Path %s doesn't exist and couldn't find its new location", walId));
+        newWalIds.add(walId);
       }
     }
 
     if (hasPathChanged) {
-      if (newPaths.size() != queue.size()) { // this shouldn't happen
+      if (newWalIds.size() != queue.size()) { // this shouldn't happen
         LOG.error("Recovery queue size is incorrect");
         throw new IOException("Recovery queue size error");
       }
@@ -117,8 +121,8 @@ public class RecoveredReplicationSource extends ReplicationSource {
       // since this is a recovered queue with no new incoming logs,
       // there shouldn't be any concurrency issues
       queue.clear();
-      for (Path path : newPaths) {
-        queue.add(path);
+      for (WALIdentity walId : newWalIds) {
+        queue.add(walId);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index b0d4db0..dbf1296 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -19,11 +19,12 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
 import java.util.concurrent.PriorityBlockingQueue;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +41,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
   private final ReplicationQueueStorage replicationQueues;
 
   public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
-      PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
+      PriorityBlockingQueue<WALIdentity> queue, RecoveredReplicationSource source,
       ReplicationQueueStorage queueStorage) {
     super(conf, walGroupId, queue, source);
     this.source = source;
@@ -58,7 +59,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
     int numRetries = 0;
     while (numRetries <= maxRetriesMultiplier) {
       try {
-        source.locateRecoveredPaths(queue);
+        source.locateRecoveredWalIds(queue);
         break;
       } catch (IOException e) {
         LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 10fa50f..12c63fd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -61,7 +61,9 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,7 +88,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
   // Queues of logs to process, entry in format of walGroupId->queue,
   // each presents a queue for one wal group
-  private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
+  private Map<String, PriorityBlockingQueue<WALIdentity>> queues = new HashMap<>();
   // per group queue size, keep no more than this number of logs in each wal group
   protected int queueSizePerGroup;
   protected ReplicationQueueStorage queueStorage;
@@ -166,10 +168,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
     this.queueStorage = queueStorage;
     this.replicationPeer = replicationPeer;
     this.manager = manager;
-    this.fs = fs;
     this.metrics = metrics;
     this.clusterId = clusterId;
-
+    this.fs = fs;
     this.queueId = queueId;
     this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
@@ -191,9 +192,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
   }
 
   @Override
-  public void enqueueLog(Path log) {
+  public void enqueueLog(WALIdentity log) {
     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
-    PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
+    PriorityBlockingQueue<WALIdentity> queue = queues.get(logPrefix);
     if (queue == null) {
       queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
       // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
@@ -300,7 +301,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     this.walEntryFilter = new ChainWALEntryFilter(filters);
   }
 
-  private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
+  private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<WALIdentity> queue) {
     ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
     ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
     if (extant != null) {
@@ -328,9 +329,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
       int queueSize = queues.get(walGroupId).size();
       replicationDelay =
           ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
-      Path currentPath = shipper.getCurrentPath();
+      WALIdentity currentPath = shipper.getCurrentWALIdentity();
       try {
-        fileSize = getFileSize(currentPath);
+        fileSize = getFileSize(((FSWALIdentity)currentPath).getPath());
       } catch (IOException e) {
         LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
         fileSize = -1;
@@ -339,7 +340,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
       statusBuilder.withPeerId(this.getPeerId())
           .withQueueSize(queueSize)
           .withWalGroup(walGroupId)
-          .withCurrentPath(currentPath)
+          .withCurrentWalId(currentPath)
           .withCurrentPosition(shipper.getCurrentPosition())
           .withFileSize(fileSize)
           .withAgeOfLastShippedOp(ageOfLastShippedOp)
@@ -361,12 +362,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
   }
 
   protected ReplicationSourceShipper createNewShipper(String walGroupId,
-      PriorityBlockingQueue<Path> queue) {
+      PriorityBlockingQueue<WALIdentity> queue) {
     return new ReplicationSourceShipper(conf, walGroupId, queue, this);
   }
 
   private ReplicationSourceWALReader createNewWALReader(String walGroupId,
-      PriorityBlockingQueue<Path> queue, long startPosition) {
+      PriorityBlockingQueue<WALIdentity> queue, long startPosition) {
     return replicationPeer.getPeerConfig().isSerial()
       ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
       : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
@@ -374,7 +375,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
 
   protected final void uncaughtException(Thread t, Throwable e) {
     RSRpcServices.exitIfOOME(e);
-    LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
+    LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentWALIdentity(),
+      e);
     server.abort("Unexpected exception in " + t.getName(), e);
   }
 
@@ -497,9 +499,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
 
     initializeWALEntryFilter(peerClusterId);
     // start workers
-    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
+    for (Map.Entry<String, PriorityBlockingQueue<WALIdentity>> entry : queues.entrySet()) {
       String walGroupId = entry.getKey();
-      PriorityBlockingQueue<Path> queue = entry.getValue();
+      PriorityBlockingQueue<WALIdentity> queue = entry.getValue();
       tryStartNewShipper(walGroupId, queue);
     }
   }
@@ -593,11 +595,11 @@ public class ReplicationSource implements ReplicationSourceInterface {
   }
 
   @Override
-  public Path getCurrentPath() {
+  public WALIdentity getCurrentWALIdentity() {
     // only for testing
     for (ReplicationSourceShipper worker : workerThreads.values()) {
-      if (worker.getCurrentPath() != null) {
-        return worker.getCurrentPath();
+      if (worker.getCurrentWALIdentity() != null) {
+        return worker.getCurrentWALIdentity();
       }
     }
     return null;
@@ -611,10 +613,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
   /**
    * Comparator used to compare logs together based on their start time
    */
-  public static class LogsComparator implements Comparator<Path> {
+  public static class LogsComparator implements Comparator<WALIdentity> {
 
     @Override
-    public int compare(Path o1, Path o2) {
+    public int compare(WALIdentity o1, WALIdentity o2) {
       return Long.compare(getTS(o1), getTS(o2));
     }
 
@@ -628,7 +630,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
      * @param p path to split
      * @return start time
      */
-    private static long getTS(Path p) {
+    private static long getTS(WALIdentity p) {
       return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName());
     }
   }
@@ -642,7 +644,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
       String walGroupId = entry.getKey();
       ReplicationSourceShipper worker = entry.getValue();
       long position = worker.getCurrentPosition();
-      Path currentPath = worker.getCurrentPath();
+      WALIdentity currentPath = worker.getCurrentWALIdentity();
       sb.append("walGroup [").append(walGroupId).append("]: ");
       if (currentPath != null) {
         sb.append("currently replicating from: ").append(currentPath).append(" at position: ")

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index df7a8cc..3058fcc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -60,7 +61,7 @@ public interface ReplicationSourceInterface {
    * Add a log to the list of logs to replicate
    * @param log path to the log to replicate
    */
-  void enqueueLog(Path log);
+  void enqueueLog(WALIdentity log);
 
   /**
    * Add hfile names to the queue to be replicated.
@@ -95,7 +96,7 @@ public interface ReplicationSourceInterface {
    * Get the current log that's replicated
    * @return the current log
    */
-  Path getCurrentPath();
+  WALIdentity getCurrentWALIdentity();
 
   /**
    * Get the queue id that the source is replicating to

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 20c1215..b948d7e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -63,7 +63,9 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -114,7 +116,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
  * is already synchronized on {@link #oldsources}. So no need synchronized on
  * {@link #walsByIdRecoveredQueues}.</li>
- * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
+ * <li>Need synchronized on {@link #latestWalIds} to avoid the new open source miss new log.</li>
  * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
  * to-be-removed peer.</li>
  * </ul>
@@ -148,7 +150,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final Configuration conf;
   private final FileSystem fs;
   // The paths to the latest log of each wal group, for new coming peers
-  private final Map<String, Path> latestPaths;
+  private final Map<String, WALIdentity> latestWalIds;
   // Path to the wals directories
   private final Path logDir;
   // Path to the wal archive
@@ -216,7 +218,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     tfb.setNameFormat("ReplicationExecutor-%d");
     tfb.setDaemon(true);
     this.executor.setThreadFactory(tfb.build());
-    this.latestPaths = new HashMap<>();
+    this.latestWalIds = new HashMap<>();
     this.replicationForBulkLoadDataEnabled = conf.getBoolean(
       HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
     this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
@@ -365,22 +367,22 @@ public class ReplicationSourceManager implements ReplicationListener {
   ReplicationSourceInterface addSource(String peerId) throws IOException {
     ReplicationPeer peer = replicationPeers.getPeer(peerId);
     ReplicationSourceInterface src = createSource(peerId, peer);
-    // synchronized on latestPaths to avoid missing the new log
-    synchronized (this.latestPaths) {
+    // synchronized on latestWalIds to avoid missing the new log
+    synchronized (this.latestWalIds) {
       this.sources.put(peerId, src);
       Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
       this.walsById.put(peerId, walsByGroup);
       // Add the latest wal to that source's queue
-      if (!latestPaths.isEmpty()) {
-        for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
-          Path walPath = walPrefixAndPath.getValue();
+      if (!latestWalIds.isEmpty()) {
+        for (Map.Entry<String, WALIdentity> walPrefixAndId : latestWalIds.entrySet()) {
+          WALIdentity walId = walPrefixAndId.getValue();
           NavigableSet<String> wals = new TreeSet<>();
-          wals.add(walPath.getName());
-          walsByGroup.put(walPrefixAndPath.getKey(), wals);
+          wals.add(walId.getName());
+          walsByGroup.put(walPrefixAndId.getKey(), wals);
           // Abort RS and throw exception to make add peer failed
           abortAndThrowIOExceptionWhenFail(
-            () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName()));
-          src.enqueueLog(walPath);
+            () -> this.queueStorage.addWAL(server.getServerName(), peerId, walId.getName()));
+          src.enqueueLog(walId);
         }
       }
     }
@@ -417,7 +419,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     // walsById.
     ReplicationSourceInterface toRemove;
     Map<String, NavigableSet<String>> wals = new HashMap<>();
-    synchronized (latestPaths) {
+    synchronized (latestWalIds) {
       toRemove = sources.put(peerId, src);
       if (toRemove != null) {
         LOG.info("Terminate replication source for " + toRemove.getPeerId());
@@ -476,15 +478,15 @@ public class ReplicationSourceManager implements ReplicationListener {
       " state or config changed. Will close the previous replication source and open a new one";
     ReplicationPeer peer = replicationPeers.getPeer(peerId);
     ReplicationSourceInterface src = createSource(peerId, peer);
-    // synchronized on latestPaths to avoid missing the new log
-    synchronized (this.latestPaths) {
+    // synchronized on latestWalIds to avoid missing the new log
+    synchronized (this.latestWalIds) {
       ReplicationSourceInterface toRemove = this.sources.put(peerId, src);
       if (toRemove != null) {
         LOG.info("Terminate replication source for " + toRemove.getPeerId());
         toRemove.terminate(terminateMessage);
       }
       for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
-        walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
+        walsByGroup.forEach(wal -> src.enqueueLog(new FSWALIdentity(new Path(this.logDir, wal))));
       }
     }
     LOG.info("Startup replication source for " + src.getPeerId());
@@ -505,7 +507,7 @@ public class ReplicationSourceManager implements ReplicationListener {
         ReplicationSourceInterface replicationSource = createSource(queueId, peer);
         this.oldsources.add(replicationSource);
         for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
-          walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal)));
+          walsByGroup.forEach(wal -> src.enqueueLog(new FSWALIdentity(wal)));
         }
         toStartup.add(replicationSource);
       }
@@ -617,7 +619,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    */
   public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
       WALEntryBatch entryBatch) {
-    String fileName = entryBatch.getLastWalPath().getName();
+    String fileName = entryBatch.getLastWalId().getName();
     interruptOrAbortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(),
       source.getQueueId(), fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
     cleanOldLogs(fileName, entryBatch.isEndOfFile(), source);
@@ -735,11 +737,11 @@ public class ReplicationSourceManager implements ReplicationListener {
 
   // public because of we call it in TestReplicationEmptyWALRecovery
   @VisibleForTesting
-  public void preLogRoll(Path newLog) throws IOException {
+  public void preLogRoll(WALIdentity newLog) throws IOException {
     String logName = newLog.getName();
     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
-    // synchronized on latestPaths to avoid the new open source miss the new log
-    synchronized (this.latestPaths) {
+    // synchronized on latestWalIds to avoid the new open source miss the new log
+    synchronized (this.latestWalIds) {
       // Add log to queue storage
       for (ReplicationSourceInterface source : this.sources.values()) {
         // If record log to queue storage failed, abort RS and throw exception to make log roll
@@ -778,14 +780,14 @@ public class ReplicationSourceManager implements ReplicationListener {
         }
       }
 
-      // Add to latestPaths
-      latestPaths.put(logPrefix, newLog);
+      // Add to latestWalIds
+      latestWalIds.put(logPrefix, newLog);
     }
   }
 
   // public because of we call it in TestReplicationEmptyWALRecovery
   @VisibleForTesting
-  public void postLogRoll(Path newLog) throws IOException {
+  public void postLogRoll(WALIdentity newLog) throws IOException {
     // This only updates the sources we own, not the recovered ones
     for (ReplicationSourceInterface source : this.sources.values()) {
       source.enqueueLog(newLog);
@@ -961,7 +963,7 @@ public class ReplicationSourceManager implements ReplicationListener {
             }
             oldsources.add(src);
             for (String wal : walsSet) {
-              src.enqueueLog(new Path(oldLogDir, wal));
+              src.enqueueLog(new FSWALIdentity(new Path(oldLogDir, wal)));
             }
             src.startup();
           }
@@ -1038,16 +1040,16 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   @VisibleForTesting
-  int getSizeOfLatestPath() {
-    synchronized (latestPaths) {
-      return latestPaths.size();
+  int getSizeOfLatestWalId() {
+    synchronized (latestWalIds) {
+      return latestWalIds.size();
     }
   }
 
   @VisibleForTesting
-  Set<Path> getLastestPath() {
-    synchronized (latestPaths) {
-      return Sets.newHashSet(latestPaths.values());
+  Set<WALIdentity> getLastestWalIds() {
+    synchronized (latestWalIds) {
+      return Sets.newHashSet(latestWalIds.values());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 5d6198e..8ecd5bd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.TableName;
@@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,14 +56,14 @@ public class ReplicationSourceShipper extends Thread {
 
   private final Configuration conf;
   protected final String walGroupId;
-  protected final PriorityBlockingQueue<Path> queue;
+  protected final PriorityBlockingQueue<WALIdentity> queue;
   private final ReplicationSource source;
 
   // Last position in the log that we sent to ZooKeeper
   // It will be accessed by the stats thread so make it volatile
   private volatile long currentPosition = -1;
   // Path of the current log
-  private Path currentPath;
+  private WALIdentity currentWalId;
   // Current state of the worker thread
   private volatile WorkerState state;
   protected ReplicationSourceWALReader entryReader;
@@ -76,7 +76,7 @@ public class ReplicationSourceShipper extends Thread {
   private final int getEntriesTimeout;
 
   public ReplicationSourceShipper(Configuration conf, String walGroupId,
-      PriorityBlockingQueue<Path> queue, ReplicationSource source) {
+      PriorityBlockingQueue<WALIdentity> queue, ReplicationSource source) {
     this.conf = conf;
     this.walGroupId = walGroupId;
     this.queue = queue;
@@ -269,7 +269,7 @@ public class ReplicationSourceShipper extends Thread {
     // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
     // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
     // position and the file will be removed soon in cleanOldLogs.
-    if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
+    if (batch.isEndOfFile() || !batch.getLastWalId().equals(currentWalId) ||
       batch.getLastWalPosition() != currentPosition) {
       source.getSourceManager().logPositionAndCleanOldLogs(source, batch);
       updated = true;
@@ -278,10 +278,10 @@ public class ReplicationSourceShipper extends Thread {
     // the only exception is for recovered queue, if we reach the end of the queue, then there will
     // no more files so here the currentPath may be null.
     if (batch.isEndOfFile()) {
-      currentPath = entryReader.getCurrentPath();
+      currentWalId = entryReader.getCurrentWalId();
       currentPosition = 0L;
     } else {
-      currentPath = batch.getLastWalPath();
+      currentWalId = batch.getLastWalId();
       currentPosition = batch.getLastWalPosition();
     }
     return updated;
@@ -293,8 +293,8 @@ public class ReplicationSourceShipper extends Thread {
       name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
   }
 
-  Path getCurrentPath() {
-    return entryReader.getCurrentPath();
+  WALIdentity getCurrentWALIdentity() {
+    return entryReader.getCurrentWalId();
   }
 
   long getCurrentPosition() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
index 27b25c4..6c1c0b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
@@ -19,11 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -46,13 +46,13 @@ class ReplicationSourceWALActionListener implements WALActionsListener {
   }
 
   @Override
-  public void preLogRoll(Path oldPath, Path newPath) throws IOException {
-    manager.preLogRoll(newPath);
+  public void preLogRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {
+    manager.preLogRoll(newWalId);
   }
 
   @Override
-  public void postLogRoll(Path oldPath, Path newPath) throws IOException {
-    manager.postLogRoll(newPath);
+  public void postLogRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {
+    manager.postLogRoll(newWalId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index b3bdb02..a0b2ecd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -28,15 +28,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
@@ -55,7 +56,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript
 class ReplicationSourceWALReader extends Thread {
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
 
-  private final PriorityBlockingQueue<Path> logQueue;
+  private final PriorityBlockingQueue<WALIdentity> logQueue;
   private final FileSystem fs;
   private final Configuration conf;
   private final WALEntryFilter filter;
@@ -89,7 +90,7 @@ class ReplicationSourceWALReader extends Thread {
    * @param source replication source
    */
   public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
-      PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
+      PriorityBlockingQueue<WALIdentity> logQueue, long startPosition, WALEntryFilter filter,
       ReplicationSource source) {
     this.logQueue = logQueue;
     this.currentPosition = startPosition;
@@ -181,29 +182,29 @@ class ReplicationSourceWALReader extends Thread {
       batch.getNbEntries() >= replicationBatchCountCapacity;
   }
 
-  protected static final boolean switched(WALEntryStream entryStream, Path path) {
-    Path newPath = entryStream.getCurrentPath();
-    return newPath == null || !path.getName().equals(newPath.getName());
+  protected static final boolean switched(WALEntryStream entryStream, WALIdentity walId) {
+    WALIdentity newWalId = entryStream.getCurrentWalIdentity();
+    return newWalId == null || !walId.equals(newWalId);
   }
 
   protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
       throws IOException, InterruptedException {
-    Path currentPath = entryStream.getCurrentPath();
+    WALIdentity walId = entryStream.getCurrentWalIdentity();
     if (!entryStream.hasNext()) {
       // check whether we have switched a file
-      if (currentPath != null && switched(entryStream, currentPath)) {
-        return WALEntryBatch.endOfFile(currentPath);
+      if (walId != null && switched(entryStream, walId)) {
+        return WALEntryBatch.endOfFile(walId);
       } else {
         return null;
       }
     }
-    if (currentPath != null) {
-      if (switched(entryStream, currentPath)) {
-        return WALEntryBatch.endOfFile(currentPath);
+    if (walId != null) {
+      if (switched(entryStream, walId)) {
+        return WALEntryBatch.endOfFile(walId);
       }
     } else {
       // when reading from the entry stream first time we will enter here
-      currentPath = entryStream.getCurrentPath();
+      walId = entryStream.getCurrentWalIdentity();
     }
     WALEntryBatch batch = createBatch(entryStream);
     for (;;) {
@@ -217,7 +218,7 @@ class ReplicationSourceWALReader extends Thread {
       }
       boolean hasNext = entryStream.hasNext();
       // always return if we have switched to a new file
-      if (switched(entryStream, currentPath)) {
+      if (switched(entryStream, walId)) {
         batch.setEndOfFile(true);
         break;
       }
@@ -248,7 +249,7 @@ class ReplicationSourceWALReader extends Thread {
     if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
       logQueue.size() > 1 && this.eofAutoRecovery) {
       try {
-        if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
+        if (fs.getFileStatus(((FSWALIdentity)logQueue.peek()).getPath()).getLen() == 0) {
           LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
           logQueue.remove();
           currentPosition = 0;
@@ -259,11 +260,11 @@ class ReplicationSourceWALReader extends Thread {
     }
   }
 
-  public Path getCurrentPath() {
-    // if we've read some WAL entries, get the Path we read from
+  public WALIdentity getCurrentWalId() {
+    // if we've read some WAL entries, get the walId we read from
     WALEntryBatch batchQueueHead = entryBatchQueue.peek();
     if (batchQueueHead != null) {
-      return batchQueueHead.getLastWalPath();
+      return batchQueueHead.getLastWalId();
     }
     // otherwise, we must be currently reading from the head of the log queue
     return logQueue.peek();
@@ -280,7 +281,7 @@ class ReplicationSourceWALReader extends Thread {
   }
 
   protected final WALEntryBatch createBatch(WALEntryStream entryStream) {
-    return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
+    return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentWalIdentity());
   }
 
   protected final Entry filterEntry(Entry entry) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
index 10d6cd5..bd0d83a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
@@ -17,14 +17,14 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
 public final class ReplicationStatus {
   private final String peerId;
   private final String walGroup;
-  private final Path currentPath;
+  private final WALIdentity currentWalId;
   private final int queueSize;
   private final long ageOfLastShippedOp;
   private final long replicationDelay;
@@ -34,7 +34,7 @@ public final class ReplicationStatus {
   private ReplicationStatus(ReplicationStatusBuilder builder) {
     this.peerId = builder.peerId;
     this.walGroup = builder.walGroup;
-    this.currentPath = builder.currentPath;
+    this.currentWalId = builder.currentWalId;
     this.queueSize = builder.queueSize;
     this.ageOfLastShippedOp = builder.ageOfLastShippedOp;
     this.replicationDelay = builder.replicationDelay;
@@ -70,8 +70,8 @@ public final class ReplicationStatus {
     return replicationDelay;
   }
 
-  public Path getCurrentPath() {
-    return currentPath;
+  public WALIdentity getCurrentWalId() {
+    return currentWalId;
   }
 
   public static ReplicationStatusBuilder newBuilder() {
@@ -81,7 +81,7 @@ public final class ReplicationStatus {
   public static class ReplicationStatusBuilder {
     private String peerId = "UNKNOWN";
     private String walGroup = "UNKNOWN";
-    private Path currentPath = new Path("UNKNOWN");
+    private WALIdentity currentWalId = null;
     private int queueSize = -1;
     private long ageOfLastShippedOp = -1;
     private long replicationDelay = -1;
@@ -103,8 +103,8 @@ public final class ReplicationStatus {
       return this;
     }
 
-    public ReplicationStatusBuilder withCurrentPath(Path currentPath) {
-      this.currentPath = currentPath;
+    public ReplicationStatusBuilder withCurrentWalId(WALIdentity currentWalId) {
+      this.currentWalId = currentWalId;
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 9edcc8a..5f33e73 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -21,11 +21,11 @@ import java.io.IOException;
 import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -44,7 +44,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
   private final SerialReplicationChecker checker;
 
   public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
-      PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
+      PriorityBlockingQueue<WALIdentity> logQueue, long startPosition, WALEntryFilter filter,
       ReplicationSource source) {
     super(fs, conf, logQueue, startPosition, filter, source);
     checker = new SerialReplicationChecker(conf, source);
@@ -53,22 +53,22 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
   @Override
   protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
       throws IOException, InterruptedException {
-    Path currentPath = entryStream.getCurrentPath();
+    WALIdentity currentWalId = entryStream.getCurrentWalIdentity();
     if (!entryStream.hasNext()) {
       // check whether we have switched a file
-      if (currentPath != null && switched(entryStream, currentPath)) {
-        return WALEntryBatch.endOfFile(currentPath);
+      if (currentWalId != null && switched(entryStream, currentWalId)) {
+        return WALEntryBatch.endOfFile(currentWalId);
       } else {
         return null;
       }
     }
-    if (currentPath != null) {
-      if (switched(entryStream, currentPath)) {
-        return WALEntryBatch.endOfFile(currentPath);
+    if (currentWalId != null) {
+      if (switched(entryStream, currentWalId)) {
+        return WALEntryBatch.endOfFile(currentWalId);
       }
     } else {
       // when reading from the entry stream first time we will enter here
-      currentPath = entryStream.getCurrentPath();
+      currentWalId = entryStream.getCurrentWalIdentity();
     }
     long positionBefore = entryStream.getPosition();
     WALEntryBatch batch = createBatch(entryStream);
@@ -115,7 +115,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
       }
       boolean hasNext = entryStream.hasNext();
       // always return if we have switched to a new file.
-      if (switched(entryStream, currentPath)) {
+      if (switched(entryStream, currentWalId)) {
         batch.setEndOfFile(true);
         break;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 22b2de7..b651a9b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -21,8 +21,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -36,7 +36,7 @@ class WALEntryBatch {
 
   private List<Entry> walEntries;
   // last WAL that was read
-  private Path lastWalPath;
+  private WALIdentity lastWalId;
   // position in WAL of last entry in this batch
   private long lastWalPosition = 0;
   // number of distinct row keys in this batch
@@ -51,16 +51,16 @@ class WALEntryBatch {
   private boolean endOfFile;
 
   /**
-   * @param lastWalPath Path of the WAL the last entry in this batch was read from
+   * @param lastWalId of the WAL the last entry in this batch was read from
    */
-  WALEntryBatch(int maxNbEntries, Path lastWalPath) {
+  WALEntryBatch(int maxNbEntries, WALIdentity lastWalId) {
     this.walEntries = new ArrayList<>(maxNbEntries);
-    this.lastWalPath = lastWalPath;
+    this.lastWalId = lastWalId;
   }
 
 
-  static WALEntryBatch endOfFile(Path lastWalPath) {
-    WALEntryBatch batch = new WALEntryBatch(0, lastWalPath);
+  static WALEntryBatch endOfFile(WALIdentity lastWalId) {
+    WALEntryBatch batch = new WALEntryBatch(0, lastWalId);
     batch.setLastWalPosition(-1L);
     batch.setEndOfFile(true);
     return batch;
@@ -78,10 +78,10 @@ class WALEntryBatch {
   }
 
   /**
-   * @return the path of the last WAL that was read.
+   * @return the Id of the last WAL that was read.
    */
-  public Path getLastWalPath() {
-    return lastWalPath;
+  public WALIdentity getLastWalId() {
+    return lastWalId;
   }
 
   /**
@@ -160,7 +160,7 @@ class WALEntryBatch {
 
   @Override
   public String toString() {
-    return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath +
+    return "WALEntryBatch [walEntries=" + walEntries + ", lastWalId=" + lastWalId +
       ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" +
       nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" +
       endOfFile + "]";

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 0393af4..3d90153 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -33,9 +33,11 @@ import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
@@ -43,9 +45,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
- * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
- * dequeues it and starts reading from the next.
+ * Streaming access to WAL entries. This class is given a queue of WAL {@link WALIdentity}, and
+ * continually iterates through all the WAL {@link Entry} in the queue. When it's done reading from
+ * a Wal, it dequeues it and starts reading from the next.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -53,7 +55,7 @@ class WALEntryStream implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
 
   private Reader reader;
-  private Path currentPath;
+  private WALIdentity currentWAlIdentity;
   // cache of next entry for hasNext()
   private Entry currentEntry;
   // position for the current entry. As now we support peek, which means that the upper layer may
@@ -62,7 +64,7 @@ class WALEntryStream implements Closeable {
   private long currentPositionOfEntry = 0;
   // position after reading current entry
   private long currentPositionOfReader = 0;
-  private final PriorityBlockingQueue<Path> logQueue;
+  private final PriorityBlockingQueue<WALIdentity> logQueue;
   private final FileSystem fs;
   private final Configuration conf;
   private final WALFileLengthProvider walFileLengthProvider;
@@ -72,7 +74,7 @@ class WALEntryStream implements Closeable {
 
   /**
    * Create an entry stream over the given queue at the given start position
-   * @param logQueue the queue of WAL paths
+   * @param logQueue the queue of WAL walIds
    * @param fs {@link FileSystem} to use to create {@link Reader} for this stream
    * @param conf {@link Configuration} to use to create {@link Reader} for this stream
    * @param startPosition the position in the first WAL to start reading at
@@ -80,9 +82,9 @@ class WALEntryStream implements Closeable {
    * @param metrics replication metrics
    * @throws IOException
    */
-  public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
-      long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
-      MetricsSource metrics) throws IOException {
+  public WALEntryStream(PriorityBlockingQueue<WALIdentity> logQueue, FileSystem fs,
+      Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider,
+      ServerName serverName, MetricsSource metrics) throws IOException {
     this.logQueue = logQueue;
     this.fs = fs;
     this.conf = conf;
@@ -135,16 +137,16 @@ class WALEntryStream implements Closeable {
   }
 
   /**
-   * @return the {@link Path} of the current WAL
+   * @return the {@link WALIdentity} of the current WAL
    */
-  public Path getCurrentPath() {
-    return currentPath;
+  public WALIdentity getCurrentWalIdentity() {
+    return currentWAlIdentity;
   }
 
-  private String getCurrentPathStat() {
+  private String getCurrentWalIdStat() {
     StringBuilder sb = new StringBuilder();
-    if (currentPath != null) {
-      sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
+    if (currentWAlIdentity != null) {
+      sb.append("currently replicating from: ").append(currentWAlIdentity).append(" at position: ")
           .append(currentPositionOfEntry).append("\n");
     } else {
       sb.append("no replication ongoing, waiting for new log");
@@ -157,7 +159,7 @@ class WALEntryStream implements Closeable {
    * false)
    */
   public void reset() throws IOException {
-    if (reader != null && currentPath != null) {
+    if (reader != null && currentWAlIdentity != null) {
       resetReader();
     }
   }
@@ -166,8 +168,8 @@ class WALEntryStream implements Closeable {
     currentPositionOfEntry = position;
   }
 
-  private void setCurrentPath(Path path) {
-    this.currentPath = path;
+  private void setCurrentWalId(WALIdentity walId) {
+    this.currentWAlIdentity = walId;
   }
 
   private void tryAdvanceEntry() throws IOException {
@@ -203,10 +205,10 @@ class WALEntryStream implements Closeable {
     final long trailerSize = currentTrailerSize();
     FileStatus stat = null;
     try {
-      stat = fs.getFileStatus(this.currentPath);
+      stat = fs.getFileStatus(((FSWALIdentity)this.currentWAlIdentity).getPath());
     } catch (IOException exception) {
       LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
-        currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat());
+        currentWAlIdentity, trailerSize < 0 ? "was not" : "was", getCurrentWalIdStat());
       metrics.incrUnknownFileLengthForClosedWAL();
     }
     // Here we use currentPositionOfReader instead of currentPositionOfEntry.
@@ -222,7 +224,7 @@ class WALEntryStream implements Closeable {
           LOG.debug(
             "Reached the end of WAL file '{}'. It was not closed cleanly," +
               " so we did not parse {} bytes of data. This is normally ok.",
-            currentPath, skippedBytes);
+            currentWAlIdentity, skippedBytes);
           metrics.incrUncleanlyClosedWALs();
           metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
         }
@@ -230,7 +232,7 @@ class WALEntryStream implements Closeable {
         LOG.warn(
           "Processing end of WAL file '{}'. At position {}, which is too far away from" +
             " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
-          currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
+          currentWAlIdentity, currentPositionOfReader, stat.getLen(), getCurrentWalIdStat());
         setPosition(0);
         resetReader();
         metrics.incrRestartedWALReading();
@@ -239,15 +241,15 @@ class WALEntryStream implements Closeable {
       }
     }
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " +
-        (stat == null ? "N/A" : stat.getLen()));
+      LOG.trace("Reached the end of log " + this.currentWAlIdentity
+          + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
     }
     metrics.incrCompletedWAL();
     return true;
   }
 
   private void dequeueCurrentLog() throws IOException {
-    LOG.debug("Reached the end of log {}", currentPath);
+    LOG.debug("Reached the end of log {}", currentWAlIdentity);
     closeReader();
     logQueue.remove();
     setPosition(0);
@@ -260,12 +262,13 @@ class WALEntryStream implements Closeable {
   private boolean readNextEntryAndRecordReaderPosition() throws IOException {
     Entry readEntry = reader.next();
     long readerPos = reader.getPosition();
-    OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
+    OptionalLong fileLength =
+        walFileLengthProvider.getLogFileSizeIfBeingWritten(currentWAlIdentity);
     if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
       // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
       // data, so we need to make sure that we do not read beyond the committed file length.
       if (LOG.isDebugEnabled()) {
-        LOG.debug("The provider tells us the valid length for " + currentPath + " is " +
+        LOG.debug("The provider tells us the valid length for " + currentWAlIdentity + " is " +
             fileLength.getAsLong() + ", but we have advanced to " + readerPos);
       }
       resetReader();
@@ -297,16 +300,16 @@ class WALEntryStream implements Closeable {
 
   // open a reader on the next log in queue
   private boolean openNextLog() throws IOException {
-    Path nextPath = logQueue.peek();
-    if (nextPath != null) {
-      openReader(nextPath);
+    WALIdentity nextWalId = logQueue.peek();
+    if (nextWalId != null) {
+      openReader((FSWALIdentity)nextWalId);
       if (reader != null) {
         return true;
       }
     } else {
       // no more files in queue, this could happen for recovered queue, or for a wal group of a sync
       // replication peer which has already been transited to DA or S.
-      setCurrentPath(null);
+      setCurrentWalId(null);
     }
     return false;
   }
@@ -336,38 +339,39 @@ class WALEntryStream implements Closeable {
     return path;
   }
 
-  private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
+  private void handleFileNotFound(FSWALIdentity walId, FileNotFoundException fnfe)
+      throws IOException {
     // If the log was archived, continue reading from there
-    Path archivedLog = getArchivedLog(path);
-    if (!path.equals(archivedLog)) {
+    FSWALIdentity archivedLog = new FSWALIdentity(getArchivedLog(walId.getPath()));
+    if (!walId.equals(archivedLog)) {
       openReader(archivedLog);
     } else {
       throw fnfe;
     }
   }
 
-  private void openReader(Path path) throws IOException {
+  private void openReader(FSWALIdentity walId) throws IOException {
     try {
       // Detect if this is a new file, if so get a new reader else
       // reset the current reader so that we see the new data
-      if (reader == null || !getCurrentPath().equals(path)) {
+      if (reader == null || !getCurrentWalIdentity().equals(walId)) {
         closeReader();
-        reader = WALFactory.createReader(fs, path, conf);
+        reader = WALFactory.createReader(fs, walId.getPath(), conf);
         seek();
-        setCurrentPath(path);
+        setCurrentWalId(walId);
       } else {
         resetReader();
       }
     } catch (FileNotFoundException fnfe) {
-      handleFileNotFound(path, fnfe);
+      handleFileNotFound(walId, fnfe);
     }  catch (RemoteException re) {
       IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
       if (!(ioe instanceof FileNotFoundException)) throw ioe;
-      handleFileNotFound(path, (FileNotFoundException)ioe);
+      handleFileNotFound(walId, (FileNotFoundException)ioe);
     } catch (LeaseNotRecoveredException lnre) {
       // HBASE-15019 the WAL was not closed due to some hiccup.
-      LOG.warn("Try to recover the WAL lease " + currentPath, lnre);
-      recoverLease(conf, currentPath);
+      LOG.warn("Try to recover the WAL lease " + currentWAlIdentity, lnre);
+      recoverLease(conf, ((FSWALIdentity)currentWAlIdentity).getPath());
       reader = null;
     } catch (NullPointerException npe) {
       // Workaround for race condition in HDFS-4380
@@ -402,8 +406,9 @@ class WALEntryStream implements Closeable {
       seek();
     } catch (FileNotFoundException fnfe) {
       // If the log was archived, continue reading from there
-      Path archivedLog = getArchivedLog(currentPath);
-      if (!currentPath.equals(archivedLog)) {
+      FSWALIdentity archivedLog =
+          new FSWALIdentity(getArchivedLog(((FSWALIdentity) currentWAlIdentity).getPath()));
+      if (!currentWAlIdentity.equals(archivedLog)) {
         openReader(archivedLog);
       } else {
         throw fnfe;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
index 010fa69..d0b63cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.util.OptionalLong;
 
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -30,5 +30,5 @@ import org.apache.yetus.audience.InterfaceAudience;
 @FunctionalInterface
 public interface WALFileLengthProvider {
 
-  OptionalLong getLogFileSizeIfBeingWritten(Path path);
+  OptionalLong getLogFileSizeIfBeingWritten(WALIdentity walId);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 75439fe..8dee012 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -63,7 +63,29 @@ class DisabledWALProvider implements WALProvider {
     if (null == providerId) {
       providerId = "defaultDisabled";
     }
-    disabled = new DisabledWAL(new Path(FSUtils.getWALRootDir(conf), providerId), conf, null);
+    final Path path = new Path(FSUtils.getWALRootDir(conf), providerId);
+    disabled = new DisabledWAL(new WALIdentity() {
+
+      @Override
+      public int compareTo(WALIdentity o) {
+        return 0;
+      }
+
+      @Override
+      public String getName() {
+        return path.getName();
+      }
+
+      @Override
+      public boolean equals(Object obj) {
+        return true;
+      }
+
+      @Override
+      public int hashCode() {
+        return 0;
+      }
+    }, conf, null);
   }
 
   @Override
@@ -90,14 +112,14 @@ class DisabledWALProvider implements WALProvider {
 
   private static class DisabledWAL implements WAL {
     protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
-    protected final Path path;
+    protected final WALIdentity walId;
     protected final WALCoprocessorHost coprocessorHost;
     protected final AtomicBoolean closed = new AtomicBoolean(false);
 
-    public DisabledWAL(final Path path, final Configuration conf,
+    public DisabledWAL(final WALIdentity walId, final Configuration conf,
         final List<WALActionsListener> listeners) {
       this.coprocessorHost = new WALCoprocessorHost(this, conf);
-      this.path = path;
+      this.walId = walId;
       if (null != listeners) {
         for(WALActionsListener listener : listeners) {
           registerWALActionsListener(listener);
@@ -123,14 +145,14 @@ class DisabledWALProvider implements WALProvider {
         }
         for (WALActionsListener listener : listeners) {
           try {
-            listener.preLogRoll(path, path);
+            listener.preLogRoll(walId, walId);
           } catch (IOException exception) {
             LOG.debug("Ignoring exception from listener.", exception);
           }
         }
         for (WALActionsListener listener : listeners) {
           try {
-            listener.postLogRoll(path, path);
+            listener.postLogRoll(walId, walId);
           } catch (IOException exception) {
             LOG.debug("Ignoring exception from listener.", exception);
           }
@@ -243,7 +265,7 @@ class DisabledWALProvider implements WALProvider {
     }
 
     @Override
-    public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
+    public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity path) {
       return OptionalLong.empty();
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c738e157/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
index 7cd39ea..b02a4d3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.wal;
 
 import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -27,7 +28,6 @@ import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;