You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/01/07 08:04:53 UTC
[11/16] hbase git commit: HBASE-17290 Potential loss of data for
replication of bulk loaded hfiles
HBASE-17290 Potential loss of data for replication of bulk loaded hfiles
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5f631b96
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5f631b96
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5f631b96
Branch: refs/heads/hbase-12439
Commit: 5f631b9653a4bf86a2bebed58abed747c04b704f
Parents: 629b04f
Author: Ashish Singhi <as...@apache.org>
Authored: Fri Jan 6 16:15:49 2017 +0530
Committer: Ashish Singhi <as...@apache.org>
Committed: Fri Jan 6 16:18:20 2017 +0530
----------------------------------------------------------------------
.../hbase/replication/ReplicationQueues.java | 6 +-
.../replication/ReplicationQueuesZKImpl.java | 11 ++--
.../TableBasedReplicationQueuesImpl.java | 4 +-
.../hbase/regionserver/HRegionServer.java | 4 ++
.../regionserver/HFileReplicator.java | 2 +-
.../replication/regionserver/Replication.java | 55 +++++++----------
.../regionserver/ReplicationObserver.java | 62 ++++++++++++++++++++
.../regionserver/ReplicationSource.java | 11 ++--
.../ReplicationSourceInterface.java | 6 +-
.../regionserver/ReplicationSourceManager.java | 4 +-
.../cleaner/TestReplicationHFileCleaner.java | 9 +--
.../replication/ReplicationSourceDummy.java | 3 +-
.../replication/TestReplicationStateBasic.java | 33 ++++++-----
13 files changed, 140 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index 0ae27d0..be5a590 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.SortedSet;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Pair;
@@ -144,10 +145,11 @@ public interface ReplicationQueues {
/**
* Add new hfile references to the queue.
* @param peerId peer cluster id to which the hfiles need to be replicated
- * @param files list of hfile references to be added
+ * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
+ * will be added in the queue }
* @throws ReplicationException if fails to add a hfile reference
*/
- void addHFileRefs(String peerId, List<String> files) throws ReplicationException;
+ void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException;
/**
* Remove hfile references from the queue.
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 7c548d9..1de1315 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -319,16 +320,18 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
}
@Override
- public void addHFileRefs(String peerId, List<String> files) throws ReplicationException {
+ public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
+ throws ReplicationException {
String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
boolean debugEnabled = LOG.isDebugEnabled();
if (debugEnabled) {
- LOG.debug("Adding hfile references " + files + " in queue " + peerZnode);
+ LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode);
}
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
- int size = files.size();
+ int size = pairs.size();
for (int i = 0; i < size; i++) {
- listOfOps.add(ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)),
+ listOfOps.add(ZKUtilOp.createAndFailSilent(
+ ZKUtil.joinZNode(peerZnode, pairs.get(i).getSecond().getName()),
HConstants.EMPTY_BYTE_ARRAY));
}
if (debugEnabled) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
index 28b9bdf..1023e0d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -307,7 +308,8 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
}
@Override
- public void addHFileRefs(String peerId, List<String> files) throws ReplicationException {
+ public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
+ throws ReplicationException {
// TODO
throw new NotImplementedException();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 853d699..3c9d54f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFa
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.UserProvider;
@@ -524,6 +525,9 @@ public class HRegionServer extends HasThread implements
checkCodecs(this.conf);
this.userProvider = UserProvider.instantiate(conf);
FSUtils.setupShortCircuitRead(this.conf);
+
+ Replication.decorateRegionServerConfiguration(this.conf);
+
// Disable usage of meta replicas in the regionserver
this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
index 256c24c..35aa1fb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -380,7 +380,7 @@ public class HFileReplicator {
} catch (FileNotFoundException e1) {
// This will mean that the hfile does not exists any where in source cluster FS. So we
// cannot do anything here just log and continue.
- LOG.error("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
+ LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
+ ". Hence ignoring this hfile from replication..",
e1);
continue;
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 5f87690..d3f9ba2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.NavigableMap;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -32,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -44,8 +44,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -236,34 +235,6 @@ public class Replication extends WALActionsListener.Base implements
scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager());
}
- @Override
- public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey,
- final WALEdit edit) throws IOException {
- NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
- if (this.replicationForBulkLoadData && scopes != null && !scopes.isEmpty()) {
- TableName tableName = logKey.getTablename();
- for (Cell c : edit.getCells()) {
- // Only check for bulk load events
- if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) {
- BulkLoadDescriptor bld = null;
- try {
- bld = WALEdit.getBulkLoadDescriptor(c);
- } catch (IOException e) {
- LOG.error("Failed to get bulk load events information from the wal file.", e);
- throw e;
- }
-
- for (StoreDescriptor s : bld.getStoresList()) {
- byte[] fam = s.getFamilyName().toByteArray();
- if (scopes.containsKey(fam)) {
- addHFileRefsToQueue(this.getReplicationManager(), tableName, fam, s);
- }
- }
- }
- }
- }
- }
-
/**
* Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
* compaction WAL edits and if the scope is local.
@@ -298,10 +269,10 @@ public class Replication extends WALActionsListener.Base implements
}
}
- private static void addHFileRefsToQueue(ReplicationSourceManager replicationManager,
- TableName tableName, byte[] family, StoreDescriptor s) throws IOException {
+ void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
+ throws IOException {
try {
- replicationManager.addHFileRefs(tableName, family, s.getStoreFileList());
+ this.replicationManager.addHFileRefs(tableName, family, pairs);
} catch (ReplicationException e) {
LOG.error("Failed to add hfile references in the replication queue.", e);
throw new IOException(e);
@@ -337,6 +308,22 @@ public class Replication extends WALActionsListener.Base implements
}
}
+ /**
+ * This method modifies the region server's configuration in order to inject replication-related
+ * features
+ * @param conf region server configurations
+ */
+ public static void decorateRegionServerConfiguration(Configuration conf) {
+ if (isReplicationForBulkLoadDataEnabled(conf)) {
+ String plugins = conf.get(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, "");
+ String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName();
+ if (!plugins.contains(rsCoprocessorClass)) {
+ conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
+ plugins + "," + rsCoprocessorClass);
+ }
+ }
+ }
+
/*
* Statistics thread. Periodically prints the cache statistics to the log.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
new file mode 100644
index 0000000..03046b4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * An Observer to facilitate replication operations
+ */
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class ReplicationObserver extends BaseRegionObserver {
+ private static final Log LOG = LogFactory.getLog(ReplicationObserver.class);
+
+ @Override
+ public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
+ final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
+ RegionCoprocessorEnvironment env = ctx.getEnvironment();
+ Configuration c = env.getConfiguration();
+ if (pairs == null || pairs.isEmpty()
+ || !c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
+ LOG.debug("Skipping recording bulk load entries in preCommitStoreFile for bulkloaded "
+ + "data replication.");
+ return;
+ }
+ HRegionServer rs = (HRegionServer) env.getRegionServerServices();
+ Replication rep = (Replication) rs.getReplicationSourceService();
+ rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3eeb4b8..7a229eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
@@ -253,7 +254,7 @@ public class ReplicationSource extends Thread
}
@Override
- public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+ public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
String peerId = peerClusterZnode;
if (peerId.contains("-")) {
@@ -266,8 +267,8 @@ public class ReplicationSource extends Thread
List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName)
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
- this.replicationQueues.addHFileRefs(peerId, files);
- metrics.incrSizeOfHFileRefsQueue(files.size());
+ this.replicationQueues.addHFileRefs(peerId, pairs);
+ metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
+ Bytes.toString(family) + " to peer id " + peerId);
@@ -275,8 +276,8 @@ public class ReplicationSource extends Thread
} else {
// user has explicitly not defined any table cfs for replication, means replicate all the
// data
- this.replicationQueues.addHFileRefs(peerId, files);
- metrics.incrSizeOfHFileRefsQueue(files.size());
+ this.replicationQueues.addHFileRefs(peerId, pairs);
+ metrics.incrSizeOfHFileRefsQueue(pairs.size());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 7f4a9f7..8d5451c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.util.Pair;
/**
* Interface that defines a replication source
@@ -112,10 +113,11 @@ public interface ReplicationSourceInterface {
* Add hfile names to the queue to be replicated.
* @param tableName Name of the table these files belongs to
* @param family Name of the family these files belong to
- * @param files files whose names needs to be added to the queue to be replicated
+ * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
+ * will be added in the queue for replication}
* @throws ReplicationException If failed to add hfile references
*/
- void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+ void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index ef4093e..5b574da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -846,10 +846,10 @@ public class ReplicationSourceManager implements ReplicationListener {
return stats.toString();
}
- public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+ public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
for (ReplicationSourceInterface source : this.sources) {
- source.addHFileRefs(tableName, family, files);
+ source.addHFileRefs(tableName, family, pairs);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index fc3e516..817cfb4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -138,8 +139,8 @@ public class TestReplicationHFileCleaner {
+ "for it in the queue.",
cleaner.isFileDeletable(fs.getFileStatus(file)));
- List<String> files = new ArrayList<String>(1);
- files.add(file.getName());
+ List<Pair<Path, Path>> files = new ArrayList<>(1);
+ files.add(new Pair<Path, Path>(null, file));
// 4. Add the file to hfile-refs queue
rq.addHFileRefs(peerId, files);
// 5. Assert file should not be deletable
@@ -166,8 +167,8 @@ public class TestReplicationHFileCleaner {
f.setPath(notDeletablefile);
files.add(f);
- List<String> hfiles = new ArrayList<>(1);
- hfiles.add(notDeletablefile.getName());
+ List<Pair<Path, Path>> hfiles = new ArrayList<>(1);
+ hfiles.add(new Pair<Path, Path>(null, notDeletablefile));
// 2. Add one file to hfile-refs queue
rq.addHFileRefs(peerId, hfiles);
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index abe484e..57e54d7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.util.Pair;
/**
* Source that does nothing at all, helpful to test ReplicationSourceManager
@@ -93,7 +94,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
}
@Override
- public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+ public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> files)
throws ReplicationException {
return;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index fcab105..f8be9a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -25,7 +25,9 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.zookeeper.KeeperException;
import org.junit.Before;
@@ -202,10 +204,10 @@ public abstract class TestReplicationStateBasic {
rq1.init(server1);
rqc.init();
- List<String> files1 = new ArrayList<String>(3);
- files1.add("file_1");
- files1.add("file_2");
- files1.add("file_3");
+ List<Pair<Path, Path>> files1 = new ArrayList<>(3);
+ files1.add(new Pair<Path, Path>(null, new Path("file_1")));
+ files1.add(new Pair<Path, Path>(null, new Path("file_2")));
+ files1.add(new Pair<Path, Path>(null, new Path("file_3")));
assertNull(rqc.getReplicableHFiles(ID_ONE));
assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
@@ -213,13 +215,16 @@ public abstract class TestReplicationStateBasic {
rq1.addHFileRefs(ID_ONE, files1);
assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
- List<String> files2 = new ArrayList<>(files1);
- String removedString = files2.remove(0);
- rq1.removeHFileRefs(ID_ONE, files2);
+ List<String> hfiles2 = new ArrayList<>();
+ for (Pair<Path, Path> p : files1) {
+ hfiles2.add(p.getSecond().getName());
+ }
+ String removedString = hfiles2.remove(0);
+ rq1.removeHFileRefs(ID_ONE, hfiles2);
assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size());
- files2 = new ArrayList<>(1);
- files2.add(removedString);
- rq1.removeHFileRefs(ID_ONE, files2);
+ hfiles2 = new ArrayList<>(1);
+ hfiles2.add(removedString);
+ rq1.removeHFileRefs(ID_ONE, hfiles2);
assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
rp.unregisterPeer(ID_ONE);
}
@@ -235,10 +240,10 @@ public abstract class TestReplicationStateBasic {
rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
rq1.addPeerToHFileRefs(ID_TWO);
- List<String> files1 = new ArrayList<String>(3);
- files1.add("file_1");
- files1.add("file_2");
- files1.add("file_3");
+ List<Pair<Path, Path>> files1 = new ArrayList<>(3);
+ files1.add(new Pair<Path, Path>(null, new Path("file_1")));
+ files1.add(new Pair<Path, Path>(null, new Path("file_2")));
+ files1.add(new Pair<Path, Path>(null, new Path("file_3")));
rq1.addHFileRefs(ID_ONE, files1);
rq1.addHFileRefs(ID_TWO, files1);
assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());