You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by "Enis Soztutar (JIRA)" <ji...@apache.org> on 2014/07/15 01:55:05 UTC

[jira] [Commented] (HBASE-11302) ReplicationSourceManager#sources is not thread safe

    [ https://issues.apache.org/jira/browse/HBASE-11302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14061449#comment-14061449 ] 

Enis Soztutar commented on HBASE-11302:
---------------------------------------

This bug fix looks important for 0.98 as well. Cherry-picked to 0.98.4. [~andrew.purtell@gmail.com] FYI. 

> ReplicationSourceManager#sources is not thread safe
> ---------------------------------------------------
>
>                 Key: HBASE-11302
>                 URL: https://issues.apache.org/jira/browse/HBASE-11302
>             Project: HBase
>          Issue Type: Bug
>          Components: Replication
>    Affects Versions: 0.99.0
>            Reporter: Qianxi Zhang
>            Assignee: Qianxi Zhang
>             Fix For: 0.99.0, 0.98.4
>
>         Attachments: HBase-11302-0.99 (1).patch, HBase-11302-0.99.diff
>
>
> In ReplicationSourceManager, sources is used to record the peers. It could be removed in removePeer method, and read in preLogRoll method. it is not thread safe.
> ReplicationSourceManager#296
> {code}
>   void preLogRoll(Path newLog) throws IOException {
>     synchronized (this.hlogsById) {
>       String name = newLog.getName();
>       for (ReplicationSourceInterface source : this.sources) {
>         try {
>           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
>         } catch (ReplicationException e) {
>           throw new IOException("Cannot add log to replication queue with id="
>               + source.getPeerClusterZnode() + ", filename=" + name, e);
>         }
>       }
>       for (SortedSet<String> hlogs : this.hlogsById.values()) {
>         if (this.sources.isEmpty()) {
>           // If there's no slaves, don't need to keep the old hlogs since
>           // we only consider the last one when a new slave comes in
>           hlogs.clear();
>         }
>         hlogs.add(name);
>       }
>     }
>     this.latestPath = newLog;
>   }
> {code}
> ReplicationSourceManager#392
> {code}
>   public void removePeer(String id) {
>     LOG.info("Closing the following queue " + id + ", currently have "
>         + sources.size() + " and another "
>         + oldsources.size() + " that were recovered");
>     String terminateMessage = "Replication stream was removed by a user";
>     ReplicationSourceInterface srcToRemove = null;
>     List<ReplicationSourceInterface> oldSourcesToDelete =
>         new ArrayList<ReplicationSourceInterface>();
>     // First close all the recovered sources for this peer
>     for (ReplicationSourceInterface src : oldsources) {
>       if (id.equals(src.getPeerClusterId())) {
>         oldSourcesToDelete.add(src);
>       }
>     }
>     for (ReplicationSourceInterface src : oldSourcesToDelete) {
>       src.terminate(terminateMessage);
>       closeRecoveredQueue((src));
>     }
>     LOG.info("Number of deleted recovered sources for " + id + ": "
>         + oldSourcesToDelete.size());
>     // Now look for the one on this cluster
>     for (ReplicationSourceInterface src : this.sources) {
>       if (id.equals(src.getPeerClusterId())) {
>         srcToRemove = src;
>         break;
>       }
>     }
>     if (srcToRemove == null) {
>       LOG.error("The queue we wanted to close is missing " + id);
>       return;
>     }
>     srcToRemove.terminate(terminateMessage);
>     this.sources.remove(srcToRemove);
>     deleteSource(id, true);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)