You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/01/31 20:58:41 UTC

hbase git commit: HBASE-17543 - Create additional ReplicationEndpoint WALEntryFilters by configuration

Repository: hbase
Updated Branches:
  refs/heads/master ae2179730 -> 5ebaadf1a


HBASE-17543 - Create additional ReplicationEndpoint WALEntryFilters by configuration

Signed-off-by: tedyu <yu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5ebaadf1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5ebaadf1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5ebaadf1

Branch: refs/heads/master
Commit: 5ebaadf1a6d8388f3c5633fb76ecfc8c0adc2da2
Parents: ae21797
Author: Geoffrey <gj...@salesforce.com>
Authored: Thu Jan 26 17:06:28 2017 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Tue Jan 31 12:58:37 2017 -0800

----------------------------------------------------------------------
 .../master/replication/ReplicationManager.java  | 28 ++++++++++-
 .../replication/BaseReplicationEndpoint.java    | 16 +++++++
 .../replication/TestReplicationEndpoint.java    | 50 ++++++++++++++++++--
 3 files changed, 89 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5ebaadf1/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
index 139a380..c86f33c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
@@ -27,9 +27,11 @@ import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -68,9 +70,10 @@ public class ReplicationManager {
   }
 
   public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
-      throws ReplicationException {
+      throws ReplicationException, IOException {
     checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
       peerConfig.getTableCFsMap());
+    checkConfiguredWALEntryFilters(peerConfig);
     replicationPeers.registerPeer(peerId, peerConfig);
     replicationPeers.peerConnected(peerId);
   }
@@ -98,9 +101,10 @@ public class ReplicationManager {
   }
 
   public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
-      throws ReplicationException {
+      throws ReplicationException, IOException {
     checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
       peerConfig.getTableCFsMap());
+    checkConfiguredWALEntryFilters(peerConfig);
     this.replicationPeers.updatePeerConfig(peerId, peerConfig);
   }
 
@@ -146,5 +150,25 @@ public class ReplicationManager {
             "Table-cfs config conflict with namespaces config in peer");
       }
     }
+
+
+  }
+
+  private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
+      throws IOException {
+    String filterCSV = peerConfig.getConfiguration().
+        get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
+    if (filterCSV != null && !filterCSV.isEmpty()){
+      String [] filters = filterCSV.split(",");
+      for (String filter : filters) {
+        try {
+          Class clazz = Class.forName(filter);
+          Object o = clazz.newInstance();
+        } catch (Exception e) {
+          throw new DoNotRetryIOException("Configured WALEntryFilter " + filter +
+              " could not be created. Failing add/update " + "peer operation.", e);
+        }
+      }
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ebaadf1/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
index 48f3ac5..cf141c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
@@ -39,6 +39,8 @@ public abstract class BaseReplicationEndpoint extends AbstractService
   implements ReplicationEndpoint {
 
   private static final Log LOG = LogFactory.getLog(BaseReplicationEndpoint.class);
+  public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY
+      = "hbase.replication.source.custom.walentryfilters";
   protected Context ctx;
 
   @Override
@@ -76,6 +78,20 @@ public abstract class BaseReplicationEndpoint extends AbstractService
     if (tableCfFilter != null) {
       filters.add(tableCfFilter);
     }
+    if (ctx != null && ctx.getPeerConfig() != null) {
+      String filterNameCSV = ctx.getPeerConfig().getConfiguration().get(REPLICATION_WALENTRYFILTER_CONFIG_KEY);
+      if (filterNameCSV != null && !filterNameCSV.isEmpty()) {
+        String[] filterNames = filterNameCSV.split(",");
+        for (String filterName : filterNames) {
+          try {
+            Class<?> clazz = Class.forName(filterName);
+            filters.add((WALEntryFilter) clazz.newInstance());
+          } catch (Exception e) {
+            LOG.error("Unable to create WALEntryFilter " + filterName, e);
+          }
+        }
+      }
+    }
     return filters.isEmpty() ? null : new ChainWALEntryFilter(filters);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ebaadf1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index f4f5f24..5e8d569 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -225,6 +225,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
       public boolean evaluate() throws Exception {
         return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
       }
+
       @Override
       public String explainFailure() throws Exception {
         String failure = "Failed to replicate all edits, expected = " + numEdits
@@ -239,9 +240,13 @@ public class TestReplicationEndpoint extends TestReplicationBase {
 
   @Test (timeout=120000)
   public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
-    admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
-      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
-        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
+    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
+    //test that we can create mutliple WALFilters reflectively
+    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
+        EverythingPassesWALEntryFilter.class.getName() +
+            "," + EverythingPassesWALEntryFilterSubclass.class.getName());
+    admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
     // now replicate some data.
     try (Connection connection = ConnectionFactory.createConnection(conf1)) {
       doPut(connection, Bytes.toBytes("row1"));
@@ -257,9 +262,31 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     });
 
     Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
+    //make sure our reflectively created filter is in the filter chain
+    Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
     admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
   }
 
+  @Test (timeout=120000, expected=IOException.class)
+  public void testWALEntryFilterAddValidation() throws Exception {
+    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
+    //test that we can create mutliple WALFilters reflectively
+    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
+        "IAmNotARealWalEntryFilter");
+    admin.addPeer("testWALEntryFilterAddValidation", rpc);
+  }
+
+  @Test (timeout=120000, expected=IOException.class)
+  public void testWALEntryFilterUpdateValidation() throws Exception {
+    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
+    //test that we can create mutliple WALFilters reflectively
+    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
+        "IAmNotARealWalEntryFilter");
+    admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc);
+  }
+
 
   @Test
   public void testMetricsSourceBaseSourcePassthrough(){
@@ -488,4 +515,21 @@ public class TestReplicationEndpoint extends TestReplicationBase {
       });
     }
   }
+
+  public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
+    private static boolean passedEntry = false;
+    @Override
+    public Entry filter(Entry entry) {
+      passedEntry = true;
+      return entry;
+    }
+
+    public static boolean hasPassedAnEntry(){
+      return passedEntry;
+    }
+  }
+
+  public static class EverythingPassesWALEntryFilterSubclass extends EverythingPassesWALEntryFilter {
+
+  }
 }