You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/08/05 15:08:15 UTC

[GitHub] [hbase] virajjasani commented on a change in pull request #2198: HBASE-24817 Allow configuring WALEntry filters on ReplicationSource

virajjasani commented on a change in pull request #2198:
URL: https://github.com/apache/hbase/pull/2198#discussion_r465794580



##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
##########
@@ -295,7 +356,40 @@ protected void stopServiceThreads() {
     }
   }
 
-  // Test HBASE-20497
+  /**
+   * Deadend Endpoint. Does nothing.
+   */
+  public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint {
+    private final UUID uuid = UUID.randomUUID();
+
+    @Override public void init(Context context) throws IOException {
+      this.ctx = context;
+      return;

Review comment:
       nit: redundant

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
##########
@@ -108,16 +107,100 @@ public static void tearDownAfterClass() throws Exception {
     TEST_UTIL.shutdownMiniDFSCluster();
   }
 
+  /**
+   * Test the default ReplicationSource skips queuing hbase:meta WAL files.
+   */
+  @Test
+  public void testDefaultSkipsMetaWAL() throws IOException {
+    ReplicationSource rs = new ReplicationSource();
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setInt("replication.source.maxretriesmultiplier", 1);
+    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
+    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
+    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
+    Mockito.when(peerConfig.getReplicationEndpointImpl()).
+      thenReturn(DoNothingReplicationEndpoint.class.getName());
+    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
+    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
+    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+    String queueId = "qid";
+    RegionServerServices rss =
+      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
+    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
+      p -> OptionalLong.empty(), new MetricsSource(queueId));
+    try {
+      rs.startup();
+      assertTrue(rs.isSourceActive());
+      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
+      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
+      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
+      rs.enqueueLog(new Path("a.1"));
+      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
+    } finally {
+      rs.terminate("Done");
+      rss.stop("Done");
+    }
+  }
+
+  /**
+   * Test that we filter out meta edits, etc.
+   */
+  @Test
+  public void testWALEntryFilter() throws IOException {
+    // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource
+    // instance and init it.
+    ReplicationSource rs = new ReplicationSource();
+    UUID uuid = UUID.randomUUID();
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
+    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
+    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
+    Mockito.when(peerConfig.getReplicationEndpointImpl()).
+      thenReturn(DoNothingReplicationEndpoint.class.getName());
+    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
+    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
+    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+    String queueId = "qid";
+    RegionServerServices rss =
+      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
+    rs.init(conf, null, manager, null, mockPeer, rss, queueId,
+      uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
+    try {
+      rs.startup();
+      TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
+      WALEntryFilter wef = rs.getWalEntryFilter();
+      // Test non-system WAL edit.
+      WAL.Entry e = new WAL.Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY,
+        TableName.valueOf("test"), -1), new WALEdit());
+      assertTrue(wef.filter(e) == e);

Review comment:
       nit: just in case if you like `assertSame(e, wef.filter(e))`

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
##########
@@ -49,5 +47,5 @@
    * @return a (possibly modified) Entry to use. Returning null or an entry with no cells will cause
    *         the entry to be skipped for replication.
    */
-  public Entry filter(Entry entry);
+  Entry filter(Entry entry);

Review comment:
       For system tables we return null Entry. Good to consider return type `Optional<Entry>` here?
   Maybe as follow up task?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org