You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/10/01 17:05:24 UTC

[hbase] branch master updated: HBASE-25091 Move LogComparator from ReplicationSource to AbstractFSWALProvider#.WALsStartTimeComparator (#2449)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b91a15  HBASE-25091 Move LogComparator from ReplicationSource to AbstractFSWALProvider#.WALsStartTimeComparator (#2449)
3b91a15 is described below

commit 3b91a15183482a243a443509dfc7385fac856beb
Author: Michael Stack <sa...@users.noreply.github.com>
AuthorDate: Thu Oct 1 10:04:58 2020 -0700

    HBASE-25091 Move LogComparator from ReplicationSource to AbstractFSWALProvider#.WALsStartTimeComparator (#2449)
    
    Give the comparator a more descriptive name, a better location,
    and make it work even when passed hbase:meta WAL files.
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../regionserver/RecoveredReplicationSource.java   |  4 +-
 .../regionserver/ReplicationSource.java            | 29 +---------
 .../hadoop/hbase/wal/AbstractFSWALProvider.java    | 36 +++++++++++--
 .../apache/hadoop/hbase/wal/TestWALProvider.java   | 62 ++++++++++++++++++++++
 4 files changed, 98 insertions(+), 33 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 00aa026..46cf851 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -64,8 +64,8 @@ public class RecoveredReplicationSource extends ReplicationSource {
 
   public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
     boolean hasPathChanged = false;
-    PriorityBlockingQueue<Path> newPaths =
-        new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
+    PriorityBlockingQueue<Path> newPaths = new PriorityBlockingQueue<Path>(queueSizePerGroup,
+      new AbstractFSWALProvider.WALStartTimeComparator());
     pathsLoop: for (Path path : queue) {
       if (fs.exists(path)) { // still in same location, don't need to do anything
         newPaths.add(path);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 8212073..4b034f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -24,7 +24,6 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -253,7 +252,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
     String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
     PriorityBlockingQueue<Path> queue = queues.get(walPrefix);
     if (queue == null) {
-      queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
+      queue = new PriorityBlockingQueue<>(queueSizePerGroup,
+        new AbstractFSWALProvider.WALStartTimeComparator());
       // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
       // the shipper may quit immediately
       queue.put(wal);
@@ -759,31 +759,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
     return !this.server.isStopped() && this.sourceRunning;
   }
 
-  /**
-   * Comparator used to compare logs together based on their start time
-   */
-  public static class LogsComparator implements Comparator<Path> {
-
-    @Override
-    public int compare(Path o1, Path o2) {
-      return Long.compare(getTS(o1), getTS(o2));
-    }
-
-    /**
-     * <p>
-     * Split a path to get the start time
-     * </p>
-     * <p>
-     * For example: 10.20.20.171%3A60020.1277499063250
-     * </p>
-     * @param p path to split
-     * @return start time
-     */
-    private static long getTS(Path p) {
-      return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName());
-    }
-  }
-
   public ReplicationQueueInfo getReplicationQueueInfo() {
     return replicationQueueInfo;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 94ae704..6f9c87b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -17,10 +17,12 @@
  */
 package org.apache.hadoop.hbase.wal;
 
+import static org.apache.commons.lang3.StringUtils.isNumeric;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -418,6 +420,36 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
     return p != null && p.endsWith(META_WAL_PROVIDER_ID);
   }
 
+  /**
+   * Comparator used to compare WAL files together based on their start time.
+   * Just compares start times and nothing else.
+   */
+  public static class WALStartTimeComparator implements Comparator<Path> {
+    @Override
+    public int compare(Path o1, Path o2) {
+      return Long.compare(getTS(o1), getTS(o2));
+    }
+
+    /**
+     * Split a path to get the start time
+     * For example: 10.20.20.171%3A60020.1277499063250
+     * Could also be a meta WAL which adds a '.meta' suffix or a synchronous replication WAL
+     * which adds a '.syncrep' suffix. Check.
+     * @param p path to split
+     * @return start time
+     */
+    private static long getTS(Path p) {
+      String name = p.getName();
+      String [] splits = name.split("\\.");
+      String ts = splits[splits.length - 1];
+      if (!isNumeric(ts)) {
+        // Its a '.meta' or a '.syncrep' suffix.
+        ts = splits[splits.length - 2];
+      }
+      return Long.parseLong(ts);
+    }
+  }
+
   public static boolean isArchivedLogFile(Path p) {
     String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR;
     return p.toString().contains(oldLog);
@@ -545,8 +577,4 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
   public static String getWALPrefixFromWALName(String name) {
     return getWALNameGroupFromWALName(name, 1);
   }
-
-  public static long getWALStartTimeFromWALName(String name) {
-    return Long.parseLong(getWALNameGroupFromWALName(name, 2));
-  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALProvider.java
new file mode 100644
index 0000000..bc06147
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.util.Comparator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class})
+public class TestWALProvider {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestWALProvider.class);
+
+  /**
+   * Test start time comparator.
+   */
+  @Test
+  public void testWALStartTimeComparator() throws IOException {
+    Path metaPath1 = new Path("hdfs://localhost:59875/user/stack/test-data/" +
+      "f4cb8ffa-6ff7-59a6-f167-6cc00f24899a/WALs/localhost,59908,1600304600425/" +
+      "localhost%2C59908%2C1600304600425.meta.1600304604319.meta");
+    Path metaPath2 = new Path("hdfs://localhost:59875/user/stack/test-data/" +
+      "f4cb8ffa-6ff7-59a6-f167-6cc00f24899a/WALs/localhost,59908,1600304600425/" +
+      "localhost%2C59908%2C1600304600425.meta.1600304604320.meta");
+    Path path3 = new Path("hdfs://localhost:59875/user/stack/test-data/" +
+      "f4cb8ffa-6ff7-59a6-f167-6cc00f24899a/WALs/localhost,59908,1600304600425/" +
+      "localhost%2C59908%2C1600304600425.1600304604321");
+    Path metaPath4 = new Path("hdfs://localhost:59875/user/stack/test-data/" +
+      "f4cb8ffa-6ff7-59a6-f167-6cc00f24899a/WALs/localhost,59908,1600304600425/" +
+      "localhost%2C59908%2C1600304600425.meta.1600304604321.meta");
+    Comparator c = new AbstractFSWALProvider.WALStartTimeComparator();
+    assertTrue(c.compare(metaPath1, metaPath1) == 0);
+    assertTrue(c.compare(metaPath2, metaPath2) == 0);
+    assertTrue(c.compare(metaPath1, metaPath2) < 0);
+    assertTrue(c.compare(metaPath2, metaPath1) > 0);
+    assertTrue(c.compare(metaPath2, path3) < 0);
+    assertTrue(c.compare(path3, metaPath4) == 0);
+  }
+}