You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by "Qianxi Zhang (JIRA)" <ji...@apache.org> on 2014/06/06 13:48:02 UTC

[jira] [Created] (HBASE-11302) ReplicationSourceManager is not thread safe

Qianxi Zhang created HBASE-11302:
------------------------------------

             Summary: ReplicationSourceManager is not thread safe
                 Key: HBASE-11302
                 URL: https://issues.apache.org/jira/browse/HBASE-11302
             Project: HBase
          Issue Type: Bug
          Components: Replication
    Affects Versions: 0.99.0
            Reporter: Qianxi Zhang


In ReplicationSourceManager, sources is used to record the peers. It could be removed in removePeer method, and read in preLogRoll method. it is not thread safe.
ReplicationSourceManager#296
{code}
  void preLogRoll(Path newLog) throws IOException {

    synchronized (this.hlogsById) {
      String name = newLog.getName();
      for (ReplicationSourceInterface source : this.sources) {
        try {
          this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
        } catch (ReplicationException e) {
          throw new IOException("Cannot add log to replication queue with id="
              + source.getPeerClusterZnode() + ", filename=" + name, e);
        }
      }
      for (SortedSet<String> hlogs : this.hlogsById.values()) {
        if (this.sources.isEmpty()) {
          // If there's no slaves, don't need to keep the old hlogs since
          // we only consider the last one when a new slave comes in
          hlogs.clear();
        }
        hlogs.add(name);
      }
    }

    this.latestPath = newLog;
  }
{code}

ReplicationSourceManager#392
{code}
  public void removePeer(String id) {
    LOG.info("Closing the following queue " + id + ", currently have "
        + sources.size() + " and another "
        + oldsources.size() + " that were recovered");
    String terminateMessage = "Replication stream was removed by a user";
    ReplicationSourceInterface srcToRemove = null;
    List<ReplicationSourceInterface> oldSourcesToDelete =
        new ArrayList<ReplicationSourceInterface>();
    // First close all the recovered sources for this peer
    for (ReplicationSourceInterface src : oldsources) {
      if (id.equals(src.getPeerClusterId())) {
        oldSourcesToDelete.add(src);
      }
    }
    for (ReplicationSourceInterface src : oldSourcesToDelete) {
      src.terminate(terminateMessage);
      closeRecoveredQueue((src));
    }
    LOG.info("Number of deleted recovered sources for " + id + ": "
        + oldSourcesToDelete.size());
    // Now look for the one on this cluster
    for (ReplicationSourceInterface src : this.sources) {
      if (id.equals(src.getPeerClusterId())) {
        srcToRemove = src;
        break;
      }
    }
    if (srcToRemove == null) {
      LOG.error("The queue we wanted to close is missing " + id);
      return;
    }
    srcToRemove.terminate(terminateMessage);
    this.sources.remove(srcToRemove);
    deleteSource(id, true);
  }
{code}




--
This message was sent by Atlassian JIRA
(v6.2#6252)