You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2016/09/11 02:11:58 UTC
hbase git commit: HBASE-16086 TableCfWALEntryFilter and
ScopeWALEntryFilter should not redundantly iterate over cells (Vincent Poon)
Repository: hbase
Updated Branches:
refs/heads/master cc2a40a78 -> 80d8b2100
HBASE-16086 TableCfWALEntryFilter and ScopeWALEntryFilter should not redundantly iterate over cells (Vincent Poon)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/80d8b210
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/80d8b210
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/80d8b210
Branch: refs/heads/master
Commit: 80d8b2100d9f4dc2a01ea6bdbded6ec52d7e4263
Parents: cc2a40a
Author: chenheng <ch...@apache.org>
Authored: Sun Sep 11 09:55:08 2016 +0800
Committer: chenheng <ch...@apache.org>
Committed: Sun Sep 11 09:55:08 2016 +0800
----------------------------------------------------------------------
.../hbase/replication/BulkLoadCellFilter.java | 81 ++++++++++++
.../hbase/replication/ChainWALEntryFilter.java | 38 +++++-
.../hbase/replication/ScopeWALEntryFilter.java | 94 ++++----------
.../replication/TableCfWALEntryFilter.java | 124 +++++++------------
.../hadoop/hbase/replication/WALCellFilter.java | 41 ++++++
.../TestReplicationWALEntryFilters.java | 12 +-
6 files changed, 231 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java
new file mode 100644
index 0000000..3599d10
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+import com.google.common.base.Predicate;
+
+public class BulkLoadCellFilter {
+ private static final Log LOG = LogFactory.getLog(BulkLoadCellFilter.class);
+
+ /**
+ * Filters the bulk load cell using the supplied predicate.
+ * @param cell The WAL cell to filter.
+ * @param famPredicate Returns true of given family should be removed.
+ * @return The filtered cell.
+ */
+ public Cell filterCell(Cell cell, Predicate<byte[]> famPredicate) {
+ byte[] fam;
+ BulkLoadDescriptor bld = null;
+ try {
+ bld = WALEdit.getBulkLoadDescriptor(cell);
+ } catch (IOException e) {
+ LOG.warn("Failed to get bulk load events information from the WAL file.", e);
+ return cell;
+ }
+ List<StoreDescriptor> storesList = bld.getStoresList();
+ // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
+ List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
+ Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
+ boolean anyStoreRemoved = false;
+ while (copiedStoresListIterator.hasNext()) {
+ StoreDescriptor sd = copiedStoresListIterator.next();
+ fam = sd.getFamilyName().toByteArray();
+ if (famPredicate.apply(fam)) {
+ copiedStoresListIterator.remove();
+ anyStoreRemoved = true;
+ }
+ }
+
+ if (!anyStoreRemoved) {
+ return cell;
+ } else if (copiedStoresList.isEmpty()) {
+ return null;
+ }
+ BulkLoadDescriptor.Builder newDesc =
+ BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
+ .setEncodedRegionName(bld.getEncodedRegionName())
+ .setBulkloadSeqNum(bld.getBulkloadSeqNum());
+ newDesc.addAllStores(copiedStoresList);
+ BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
+ return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
+ cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
index 6a3981a..1d67faa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -34,9 +35,11 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
public class ChainWALEntryFilter implements WALEntryFilter {
private final WALEntryFilter[] filters;
+ private WALCellFilter[] cellFilters;
public ChainWALEntryFilter(WALEntryFilter...filters) {
this.filters = filters;
+ initCellFilters();
}
public ChainWALEntryFilter(List<WALEntryFilter> filters) {
@@ -49,8 +52,18 @@ public class ChainWALEntryFilter implements WALEntryFilter {
rawFilters.add(filter);
}
}
-
this.filters = rawFilters.toArray(new WALEntryFilter[rawFilters.size()]);
+ initCellFilters();
+ }
+
+ public void initCellFilters() {
+ ArrayList<WALCellFilter> cellFilters = new ArrayList<>(filters.length);
+ for (WALEntryFilter filter : filters) {
+ if (filter instanceof WALCellFilter) {
+ cellFilters.add((WALCellFilter) filter);
+ }
+ }
+ this.cellFilters = cellFilters.toArray(new WALCellFilter[cellFilters.size()]);
}
@Override
@@ -61,7 +74,30 @@ public class ChainWALEntryFilter implements WALEntryFilter {
}
entry = filter.filter(entry);
}
+ filterCells(entry);
return entry;
}
+ private void filterCells(Entry entry) {
+ if (entry == null || cellFilters.length == 0) {
+ return;
+ }
+ ArrayList<Cell> cells = entry.getEdit().getCells();
+ int size = cells.size();
+ for (int i = size - 1; i >= 0; i--) {
+ Cell cell = cells.get(i);
+ for (WALCellFilter filter : cellFilters) {
+ cell = filter.filterCell(entry, cell);
+ if (cell != null) {
+ cells.set(i, cell);
+ } else {
+ cells.remove(i);
+ break;
+ }
+ }
+ }
+ if (cells.size() < size / 2) {
+ cells.trimToSize();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index 28a83dd..b084a04 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -18,29 +18,24 @@
package org.apache.hadoop.hbase.replication;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
import java.util.NavigableMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import com.google.common.base.Predicate;
+
/**
* Keeps KVs that are scoped other than local
*/
@InterfaceAudience.Private
-public class ScopeWALEntryFilter implements WALEntryFilter {
- private static final Log LOG = LogFactory.getLog(ScopeWALEntryFilter.class);
+public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
+
+ BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
@Override
public Entry filter(Entry entry) {
@@ -48,72 +43,27 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
if (scopes == null || scopes.isEmpty()) {
return null;
}
- ArrayList<Cell> cells = entry.getEdit().getCells();
- int size = cells.size();
- byte[] fam;
- for (int i = size - 1; i >= 0; i--) {
- Cell cell = cells.get(i);
- // If a bulk load entry has a scope then that means user has enabled replication for bulk load
- // hfiles.
- // TODO There is a similar logic in TableCfWALEntryFilter but data structures are different so
- // cannot refactor into one now, can revisit and see if any way to unify them.
+ return entry;
+ }
+
+ @Override
+ public Cell filterCell(Entry entry, Cell cell) {
+ final NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
+ // The scope will be null or empty if
+ // there's nothing to replicate in that WALEdit
+ byte[] fam = CellUtil.cloneFamily(cell);
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
- Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(scopes, cell);
- if (filteredBulkLoadEntryCell != null) {
- cells.set(i, filteredBulkLoadEntryCell);
- } else {
- cells.remove(i);
- }
+ cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
+ @Override
+ public boolean apply(byte[] fam) {
+ return !scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL;
+ }
+ });
} else {
- // The scope will be null or empty if
- // there's nothing to replicate in that WALEdit
- fam = CellUtil.cloneFamily(cell);
if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
- cells.remove(i);
+ return null;
}
}
- }
- if (cells.size() < size / 2) {
- cells.trimToSize();
- }
- return entry;
- }
-
- private Cell filterBulkLoadEntries(NavigableMap<byte[], Integer> scopes, Cell cell) {
- byte[] fam;
- BulkLoadDescriptor bld = null;
- try {
- bld = WALEdit.getBulkLoadDescriptor(cell);
- } catch (IOException e) {
- LOG.warn("Failed to get bulk load events information from the WAL file.", e);
- return cell;
- }
- List<StoreDescriptor> storesList = bld.getStoresList();
- // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
- List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
- Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
- boolean anyStoreRemoved = false;
- while (copiedStoresListIterator.hasNext()) {
- StoreDescriptor sd = copiedStoresListIterator.next();
- fam = sd.getFamilyName().toByteArray();
- if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
- copiedStoresListIterator.remove();
- anyStoreRemoved = true;
- }
- }
-
- if (!anyStoreRemoved) {
- return cell;
- } else if (copiedStoresList.isEmpty()) {
- return null;
- }
- BulkLoadDescriptor.Builder newDesc =
- BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
- .setEncodedRegionName(bld.getEncodedRegionName())
- .setBulkloadSeqNum(bld.getBulkloadSeqNum());
- newDesc.addAllStores(copiedStoresList);
- BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
- return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
- cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
+ return cell;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
index f10849b..d890e3e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
@@ -18,9 +18,6 @@
package org.apache.hadoop.hbase.replication;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -29,16 +26,17 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-public class TableCfWALEntryFilter implements WALEntryFilter {
+import com.google.common.base.Predicate;
+
+public class TableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class);
- private final ReplicationPeer peer;
+ private ReplicationPeer peer;
+ private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
public TableCfWALEntryFilter(ReplicationPeer peer) {
this.peer = peer;
@@ -47,91 +45,57 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
@Override
public Entry filter(Entry entry) {
TableName tabName = entry.getKey().getTablename();
- ArrayList<Cell> cells = entry.getEdit().getCells();
- Map<TableName, List<String>> tableCFs = null;
-
- try {
- tableCFs = this.peer.getTableCFs();
- } catch (IllegalArgumentException e) {
- LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
- ", degenerate as if it's not configured by keeping tableCFs==null");
- }
- int size = cells.size();
+ Map<TableName, List<String>> tableCFs = getTableCfs();
// If null means user has explicitly not configured any table CFs so all the tables data are
// applicable for replication
- if (tableCFs == null) {
- return entry;
- }
- // return null(prevent replicating) if logKey's table isn't in this peer's
- // replicable table list
+ if (tableCFs == null) return entry;
+
if (!tableCFs.containsKey(tabName)) {
return null;
- } else {
- List<String> cfs = tableCFs.get(tabName);
- for (int i = size - 1; i >= 0; i--) {
- Cell cell = cells.get(i);
- // TODO There is a similar logic in ScopeWALEntryFilter but data structures are different so
- // cannot refactor into one now, can revisit and see if any way to unify them.
- // Filter bulk load entries separately
- if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
- Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(cfs, cell);
- if (filteredBulkLoadEntryCell != null) {
- cells.set(i, filteredBulkLoadEntryCell);
- } else {
- cells.remove(i);
- }
- } else {
- // ignore(remove) kv if its cf isn't in the replicable cf list
- // (empty cfs means all cfs of this table are replicable)
- if ((cfs != null) && !cfs.contains(Bytes.toString(cell.getFamilyArray(),
- cell.getFamilyOffset(), cell.getFamilyLength()))) {
- cells.remove(i);
- }
- }
- }
- }
- if (cells.size() < size/2) {
- cells.trimToSize();
}
+
return entry;
}
- private Cell filterBulkLoadEntries(List<String> cfs, Cell cell) {
- byte[] fam;
- BulkLoadDescriptor bld = null;
- try {
- bld = WALEdit.getBulkLoadDescriptor(cell);
- } catch (IOException e) {
- LOG.warn("Failed to get bulk load events information from the WAL file.", e);
- return cell;
- }
- List<StoreDescriptor> storesList = bld.getStoresList();
- // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
- List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
- Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
- boolean anyStoreRemoved = false;
- while (copiedStoresListIterator.hasNext()) {
- StoreDescriptor sd = copiedStoresListIterator.next();
- fam = sd.getFamilyName().toByteArray();
- if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
- copiedStoresListIterator.remove();
- anyStoreRemoved = true;
+ @Override
+ public Cell filterCell(final Entry entry, Cell cell) {
+ final Map<TableName, List<String>> tableCfs = getTableCfs();
+ if (tableCfs == null) return cell;
+ TableName tabName = entry.getKey().getTablename();
+ List<String> cfs = tableCfs.get(tabName);
+ // ignore(remove) kv if its cf isn't in the replicable cf list
+ // (empty cfs means all cfs of this table are replicable)
+ if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+ cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
+ @Override
+ public boolean apply(byte[] fam) {
+ if (tableCfs != null) {
+ List<String> cfs = tableCfs.get(entry.getKey().getTablename());
+ if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
+ return true;
+ }
+ }
+ return false;
+ }
+ });
+ } else {
+ if ((cfs != null) && !cfs.contains(
+ Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) {
+ return null;
}
}
+ return cell;
+ }
- if (!anyStoreRemoved) {
- return cell;
- } else if (copiedStoresList.isEmpty()) {
- return null;
+ Map<TableName, List<String>> getTableCfs() {
+ Map<TableName, List<String>> tableCFs = null;
+ try {
+ tableCFs = this.peer.getTableCFs();
+ } catch (IllegalArgumentException e) {
+ LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
+ ", degenerate as if it's not configured by keeping tableCFs==null");
}
- BulkLoadDescriptor.Builder newDesc =
- BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
- .setEncodedRegionName(bld.getEncodedRegionName())
- .setBulkloadSeqNum(bld.getBulkloadSeqNum());
- newDesc.addAllStores(copiedStoresList);
- BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
- return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
- cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
+ return tableCFs;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java
new file mode 100644
index 0000000..78b3ed4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+/**
+ * A filter for WAL entry cells before being sent over to replication.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface WALCellFilter {
+
+ /**
+ * Applies the filter, possibly returning a different Cell instance.
+ * If null is returned, the cell will be skipped.
+ * @param entry Entry which contains the cell
+ * @param cell Cell to filter
+ * @return a (possibly modified) Cell to use. Returning null will cause the cell
+ * to be skipped for replication.
+ */
+ public Cell filterCell(Entry entry, Cell cell);
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index c906d6a..04d9232 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -78,7 +78,7 @@ public class TestReplicationWALEntryFilters {
@Test
public void testScopeWALEntryFilter() {
- ScopeWALEntryFilter filter = new ScopeWALEntryFilter();
+ WALEntryFilter filter = new ChainWALEntryFilter(new ScopeWALEntryFilter());
Entry userEntry = createEntry(null, a, b);
Entry userEntryA = createEntry(null, a);
@@ -201,14 +201,14 @@ public class TestReplicationWALEntryFilters {
when(peer.getTableCFs()).thenReturn(null);
Entry userEntry = createEntry(null, a, b, c);
- TableCfWALEntryFilter filter = new TableCfWALEntryFilter(peer);
+ WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
// empty map
userEntry = createEntry(null, a, b, c);
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
when(peer.getTableCFs()).thenReturn(tableCfs);
- filter = new TableCfWALEntryFilter(peer);
+ filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// table bar
@@ -216,7 +216,7 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("bar"), null);
when(peer.getTableCFs()).thenReturn(tableCfs);
- filter = new TableCfWALEntryFilter(peer);
+ filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// table foo:a
@@ -224,7 +224,7 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
when(peer.getTableCFs()).thenReturn(tableCfs);
- filter = new TableCfWALEntryFilter(peer);
+ filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a), filter.filter(userEntry));
// table foo:a,c
@@ -232,7 +232,7 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
when(peer.getTableCFs()).thenReturn(tableCfs);
- filter = new TableCfWALEntryFilter(peer);
+ filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,c), filter.filter(userEntry));
}