You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by "Ted Yu (JIRA)" <ji...@apache.org> on 2014/06/06 19:06:02 UTC
[jira] [Assigned] (HBASE-11302) ReplicationSourceManager#sources is
not thread safe
[ https://issues.apache.org/jira/browse/HBASE-11302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ted Yu reassigned HBASE-11302:
------------------------------
Assignee: Qianxi Zhang
> ReplicationSourceManager#sources is not thread safe
> ---------------------------------------------------
>
> Key: HBASE-11302
> URL: https://issues.apache.org/jira/browse/HBASE-11302
> Project: HBase
> Issue Type: Bug
> Components: Replication
> Affects Versions: 0.99.0, 0.98.2
> Reporter: Qianxi Zhang
> Assignee: Qianxi Zhang
> Fix For: 0.99.0
>
> Attachments: HBase-11302-0.99.diff
>
>
> In ReplicationSourceManager, sources is used to record the peers. It could be removed in removePeer method, and read in preLogRoll method. it is not thread safe.
> ReplicationSourceManager#296
> {code}
> void preLogRoll(Path newLog) throws IOException {
> synchronized (this.hlogsById) {
> String name = newLog.getName();
> for (ReplicationSourceInterface source : this.sources) {
> try {
> this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
> } catch (ReplicationException e) {
> throw new IOException("Cannot add log to replication queue with id="
> + source.getPeerClusterZnode() + ", filename=" + name, e);
> }
> }
> for (SortedSet<String> hlogs : this.hlogsById.values()) {
> if (this.sources.isEmpty()) {
> // If there's no slaves, don't need to keep the old hlogs since
> // we only consider the last one when a new slave comes in
> hlogs.clear();
> }
> hlogs.add(name);
> }
> }
> this.latestPath = newLog;
> }
> {code}
> ReplicationSourceManager#392
> {code}
> public void removePeer(String id) {
> LOG.info("Closing the following queue " + id + ", currently have "
> + sources.size() + " and another "
> + oldsources.size() + " that were recovered");
> String terminateMessage = "Replication stream was removed by a user";
> ReplicationSourceInterface srcToRemove = null;
> List<ReplicationSourceInterface> oldSourcesToDelete =
> new ArrayList<ReplicationSourceInterface>();
> // First close all the recovered sources for this peer
> for (ReplicationSourceInterface src : oldsources) {
> if (id.equals(src.getPeerClusterId())) {
> oldSourcesToDelete.add(src);
> }
> }
> for (ReplicationSourceInterface src : oldSourcesToDelete) {
> src.terminate(terminateMessage);
> closeRecoveredQueue((src));
> }
> LOG.info("Number of deleted recovered sources for " + id + ": "
> + oldSourcesToDelete.size());
> // Now look for the one on this cluster
> for (ReplicationSourceInterface src : this.sources) {
> if (id.equals(src.getPeerClusterId())) {
> srcToRemove = src;
> break;
> }
> }
> if (srcToRemove == null) {
> LOG.error("The queue we wanted to close is missing " + id);
> return;
> }
> srcToRemove.terminate(terminateMessage);
> this.sources.remove(srcToRemove);
> deleteSource(id, true);
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)