You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by "Qianxi Zhang (JIRA)" <ji...@apache.org> on 2014/06/06 13:48:02 UTC
[jira] [Created] (HBASE-11302) ReplicationSourceManager is not
thread safe
Qianxi Zhang created HBASE-11302:
------------------------------------
Summary: ReplicationSourceManager is not thread safe
Key: HBASE-11302
URL: https://issues.apache.org/jira/browse/HBASE-11302
Project: HBase
Issue Type: Bug
Components: Replication
Affects Versions: 0.99.0
Reporter: Qianxi Zhang
In ReplicationSourceManager, sources is used to record the peers. It could be removed in removePeer method, and read in preLogRoll method. it is not thread safe.
ReplicationSourceManager#296
{code}
void preLogRoll(Path newLog) throws IOException {
synchronized (this.hlogsById) {
String name = newLog.getName();
for (ReplicationSourceInterface source : this.sources) {
try {
this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
} catch (ReplicationException e) {
throw new IOException("Cannot add log to replication queue with id="
+ source.getPeerClusterZnode() + ", filename=" + name, e);
}
}
for (SortedSet<String> hlogs : this.hlogsById.values()) {
if (this.sources.isEmpty()) {
// If there's no slaves, don't need to keep the old hlogs since
// we only consider the last one when a new slave comes in
hlogs.clear();
}
hlogs.add(name);
}
}
this.latestPath = newLog;
}
{code}
ReplicationSourceManager#392
{code}
public void removePeer(String id) {
LOG.info("Closing the following queue " + id + ", currently have "
+ sources.size() + " and another "
+ oldsources.size() + " that were recovered");
String terminateMessage = "Replication stream was removed by a user";
ReplicationSourceInterface srcToRemove = null;
List<ReplicationSourceInterface> oldSourcesToDelete =
new ArrayList<ReplicationSourceInterface>();
// First close all the recovered sources for this peer
for (ReplicationSourceInterface src : oldsources) {
if (id.equals(src.getPeerClusterId())) {
oldSourcesToDelete.add(src);
}
}
for (ReplicationSourceInterface src : oldSourcesToDelete) {
src.terminate(terminateMessage);
closeRecoveredQueue((src));
}
LOG.info("Number of deleted recovered sources for " + id + ": "
+ oldSourcesToDelete.size());
// Now look for the one on this cluster
for (ReplicationSourceInterface src : this.sources) {
if (id.equals(src.getPeerClusterId())) {
srcToRemove = src;
break;
}
}
if (srcToRemove == null) {
LOG.error("The queue we wanted to close is missing " + id);
return;
}
srcToRemove.terminate(terminateMessage);
this.sources.remove(srcToRemove);
deleteSource(id, true);
}
{code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)