You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/09/19 14:50:17 UTC

svn commit: r1387563 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/replication/regionserver/ test/java/org/apache/hadoop/hbase/replication/ tes...

Author: mbautin
Date: Wed Sep 19 12:50:17 2012
New Revision: 1387563

URL: http://svn.apache.org/viewvc?rev=1387563&view=rev
Log:
[master] [89-fb] Remove push-style replication

Author: mbautin

Summary: Removing the "push-style" replication (pushing edits from the RS to the destination cluster) because we are using a different replication approach.

Test Plan: Unit tests

Reviewers: kannan, nspiegelberg, mycnyc, amirshim

Reviewed By: kannan

CC: Liyin, Karthik

Differential Revision: https://reviews.facebook.net/D5445

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed Sep 19 12:50:17 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerInfo;
@@ -39,7 +38,6 @@ import org.apache.hadoop.hbase.client.Ro
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.master.AssignmentPlan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.io.MapWritable;
 
 /**
@@ -376,17 +374,6 @@ public interface HRegionInterface extend
       byte[] regionName, byte[] familyName, boolean assignSeqNum) throws IOException;
 
   /**
-   * Replicates the given entries. The guarantee is that the given entries
-   * will be durable on the slave cluster if this method returns without
-   * any exception.
-   * hbase.replication has to be set to true for this to work.
-   *
-   * @param entries entries to replicate
-   * @throws IOException
-   */
-  public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
-
-  /**
    * Closes the specified region.
    * @param hri region to be closed
    * @param reportWhenCompleted whether to report to master

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Sep 19 12:50:17 2012
@@ -129,7 +129,6 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DaemonThreadFactory;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -311,10 +310,6 @@ public class HRegionServer implements HR
 
   private final String machineName;
 
-  // Replication-related attributes
-  private Replication replicationHandler;
-  // End of replication
-
   private final AtomicLong globalMemstoreSize = new AtomicLong(0);
 
   // reference to the Thrift Server.
@@ -1276,18 +1271,14 @@ public class HRegionServer implements HR
         "running at " + this.serverInfo.getServerName() +
         " because logdir " + logdir.toString() + " exists");
     }
-    this.replicationHandler = new Replication(this.conf, this.zooKeeperWrapper,
-        this.serverInfo, this.fs, oldLogDir, stopRequested);
     HLog log = instantiateHLog(logdir, oldLogDir);
-    this.replicationHandler.addLogEntryVisitor(log);
     return log;
   }
 
   // instantiate
   protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
     return new HLog(this.fs, logdir, oldLogDir, this.conf, this.hlogRoller,
-      this.replicationHandler.getReplicationManager(),
-        this.serverInfo.getServerAddress().toString());
+        null, this.serverInfo.getServerAddress().toString());
   }
 
 
@@ -1492,8 +1483,6 @@ public class HRegionServer implements HR
       }
     }
 
-    this.replicationHandler.startReplicationServices();
-
     if (this.server == null) {
       // Start Server to handle client requests.
       this.server = HBaseRPC.getServer(this,
@@ -1664,9 +1653,6 @@ public class HRegionServer implements HR
     Threads.shutdown(this.cacheFlusher);
     Threads.shutdown(this.hlogRoller);
     this.compactSplitThread.join();
-    if (replicationHandler != null) {
-      this.replicationHandler.join();
-    }
   }
 
   private boolean getMaster() {
@@ -3353,11 +3339,6 @@ public class HRegionServer implements HR
     }
   }
 
-  @Override
-  public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
-    this.replicationHandler.replicateLogEntries(entries);
-  }
-
   /**
    * Do class main.
    * @param args

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Wed Sep 19 12:50:17 2012
@@ -1,162 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.LogEntryVisitor;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-
-/**
- * Replication serves as an umbrella over the setup of replication and
- * is used by HRS.
- */
-public class Replication implements LogEntryVisitor {
-
-  private final boolean replication;
-  private final ReplicationSourceManager replicationManager;
-  private boolean replicationMaster;
-  private final AtomicBoolean replicating = new AtomicBoolean(true);
-  private final ReplicationZookeeperWrapper zkHelper;
-  private final Configuration conf;
-  private final AtomicBoolean  stopRequested;
-  private ReplicationSink replicationSink;
-
-  /**
-   * Instantiate the replication management (if rep is enabled).
-   * @param conf conf to use
-   * @param zkWrapper the ZooKeeperWrapper of this region server
-   * @param hsi the info if this region server
-   * @param fs handle to the filesystem
-   * @param oldLogDir directory where logs are archived
-   * @param stopRequested boolean that tells us if we are shutting down
-   * @throws IOException
-   */
-  public Replication(Configuration conf, ZooKeeperWrapper zkWrapper,
-      HServerInfo hsi, FileSystem fs, Path oldLogDir,
-      AtomicBoolean stopRequested) throws IOException {
-    this.conf = conf;
-    this.stopRequested = stopRequested;
-    this.replication =
-        conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
-    if (replication) {
-      this.zkHelper = new ReplicationZookeeperWrapper(zkWrapper, conf,
-        this.replicating, hsi.getServerName());
-      this.replicationMaster = zkHelper.isReplicationMaster();
-      this.replicationManager = this.replicationMaster ?
-        new ReplicationSourceManager(zkHelper, conf, stopRequested,
-          fs, this.replicating, oldLogDir) : null;
-    } else {
-      replicationManager = null;
-      zkHelper = null;
-    }
-  }
-
-  /**
-   * Join with the replication threads
-   */
-  public void join() {
-    if (this.replication) {
-      if (this.replicationMaster) {
-        this.replicationManager.join();
-      }
-      this.zkHelper.deleteOwnRSZNode();
-    }
-  }
-
-  /**
-   * Carry on the list of log entries down to the sink
-   * @param entries list of entries to replicate
-   * @throws IOException
-   */
-  public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
-    if (this.replication && !this.replicationMaster) {
-      this.replicationSink.replicateEntries(entries);
-    }
-  }
-
-  /**
-   * If replication is enabled and this cluster is a master,
-   * it starts
-   * @throws IOException
-   */
-  public void startReplicationServices() throws IOException {
-    if (this.replication) {
-      if (this.replicationMaster) {
-        this.replicationManager.init();
-      } else {
-        this.replicationSink =
-            new ReplicationSink(this.conf, this.stopRequested);
-      }
-    }
-  }
-
-  /**
-   * Get the replication sources manager
-   * @return the manager if replication is enabled, else returns false
-   */
-  public ReplicationSourceManager getReplicationManager() {
-    return replicationManager;
-  }
-
-  @Override
-  public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
-                                       WALEdit logEdit) {
-    NavigableMap<byte[], Integer> scopes =
-        new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
-    byte[] family;
-    for (KeyValue kv : logEdit.getKeyValues()) {
-      family = kv.getFamily();
-      int scope = info.getTableDesc().getFamily(family).getScope();
-      if (scope != HConstants.REPLICATION_SCOPE_LOCAL &&
-          !scopes.containsKey(family)) {
-        scopes.put(family, scope);
-      }
-    }
-    if (!scopes.isEmpty()) {
-      logEdit.setScopes(scopes);
-    }
-  }
-
-  /**
-   * Add this class as a log entry visitor for HLog if replication is enabled
-   * @param hlog log that was add ourselves on
-   */
-  public void addLogEntryVisitor(HLog hlog) {
-    if (replication) {
-      hlog.addLogEntryVisitor(this);
-    }
-  }
-}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Sep 19 12:50:17 2012
@@ -1,648 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Threads;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.hadoop.hbase.util.HasThread;
-
-/**
- * Class that handles the source of a replication stream.
- * Currently does not handle more than 1 slave
- * For each slave cluster it selects a random number of peers
- * using a replication ratio. For example, if replication ration = 0.1
- * and slave cluster has 100 region servers, 10 will be selected.
- * <p/>
- * A stream is considered down when we cannot contact a region server on the
- * peer cluster for more than 55 seconds by default.
- * <p/>
- *
- */
-public class ReplicationSource extends HasThread
-    implements ReplicationSourceInterface {
-
-  private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
-  // Lock to manage when a HLog is changing path (archiving)
-  private final ReentrantLock pathLock = new ReentrantLock();
-  // Queue of logs to process
-  private PriorityBlockingQueue<Path> queue;
-  // container of entries to replicate
-  private HLog.Entry[] entriesArray;
-  private HConnection conn;
-  // Helper class for zookeeper
-  private ReplicationZookeeperWrapper zkHelper;
-  private Configuration conf;
-  // ratio   of region servers to chose from a slave cluster
-  private float ratio;
-  private Random random;
-  // should we replicate or not?
-  private AtomicBoolean replicating;
-  // id of the peer cluster this source replicates to
-  private String peerClusterId;
-  // The manager of all sources to which we ping back our progress
-  private ReplicationSourceManager manager;
-  // Should we stop everything?
-  private AtomicBoolean stop;
-  // List of chosen sinks (region servers)
-  private List<HServerAddress> currentPeers;
-  // How long should we sleep for each retry
-  private long sleepForRetries;
-  // Max size in bytes of entriesArray
-  private long replicationQueueSizeCapacity;
-  // Max number of entries in entriesArray
-  private int replicationQueueNbCapacity;
-  // Our reader for the current log
-  private HLog.Reader reader;
-  // Current position in the log
-  private long position = 0;
-  // Path of the current log
-  private volatile Path currentPath;
-  private FileSystem fs;
-  // id of this cluster
-  private byte clusterId;
-  // total number of edits we replicated
-  private long totalReplicatedEdits = 0;
-  // The znode we currently play with
-  private String peerClusterZnode;
-  // Indicates if this queue is recovered (and will be deleted when depleted)
-  private boolean queueRecovered;
-  // Maximum number of retries before taking bold actions
-  private long maxRetriesMultiplier;
-  // Current number of entries that we need to replicate
-  private int currentNbEntries = 0;
-  // Indicates if this particular source is running
-  private volatile boolean running = true;
-
-  /**
-   * Instantiation method used by region servers
-   *
-   * @param conf configuration to use
-   * @param fs file system to use
-   * @param manager replication manager to ping to
-   * @param stopper     the atomic boolean to use to stop the regionserver
-   * @param replicating the atomic boolean that starts/stops replication
-   * @param peerClusterZnode the name of our znode
-   * @throws IOException
-   */
-  public void init(final Configuration conf,
-                   final FileSystem fs,
-                   final ReplicationSourceManager manager,
-                   final AtomicBoolean stopper,
-                   final AtomicBoolean replicating,
-                   final String peerClusterZnode)
-      throws IOException {
-    this.stop = stopper;
-    this.conf = conf;
-    this.replicationQueueSizeCapacity =
-        this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
-    this.replicationQueueNbCapacity =
-        this.conf.getInt("replication.source.nb.capacity", 25000);
-    this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity];
-    for (int i = 0; i < this.replicationQueueNbCapacity; i++) {
-      this.entriesArray[i] = new HLog.Entry();
-    }
-    this.maxRetriesMultiplier =
-        this.conf.getLong("replication.source.maxretriesmultiplier", 10);
-    this.queue =
-        new PriorityBlockingQueue<Path>(
-            conf.getInt("hbase.regionserver.maxlogs", 32),
-            new LogsComparator());
-    this.conn = HConnectionManager.getConnection(conf);
-    this.zkHelper = manager.getRepZkWrapper();
-    this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
-    this.currentPeers = new ArrayList<HServerAddress>();
-    this.random = new Random();
-    this.replicating = replicating;
-    this.manager = manager;
-    this.sleepForRetries =
-        this.conf.getLong("replication.source.sleepforretries", 1000);
-    this.fs = fs;
-    this.clusterId = Byte.valueOf(zkHelper.getClusterId());
-
-    // Finally look if this is a recovered queue
-    this.checkIfQueueRecovered(peerClusterZnode);
-  }
-
-  // The passed znode will be either the id of the peer cluster or
-  // the handling story of that queue in the form of id-startcode-*
-  private void checkIfQueueRecovered(String peerClusterZnode) {
-    String[] parts = peerClusterZnode.split("-");
-    this.queueRecovered = parts.length != 1;
-    this.peerClusterId = this.queueRecovered ?
-        parts[0] : peerClusterZnode;
-    this.peerClusterZnode = peerClusterZnode;
-  }
-
-  /**
-   * Select a number of peers at random using the ratio. Mininum 1.
-   */
-  private void chooseSinks() {
-    this.currentPeers.clear();
-    List<HServerAddress> addresses =
-        this.zkHelper.getPeersAddresses(peerClusterId);
-    Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
-    int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
-    LOG.info("Getting " + nbPeers +
-        " rs from peer cluster # " + peerClusterId);
-    for (int i = 0; i < nbPeers; i++) {
-      HServerAddress address;
-      // Make sure we get one address that we don't already have
-      do {
-        address = addresses.get(this.random.nextInt(addresses.size()));
-      } while (setOfAddr.contains(address));
-      LOG.info("Choosing peer " + address);
-      setOfAddr.add(address);
-    }
-    this.currentPeers.addAll(setOfAddr);
-  }
-
-  @Override
-  public void enqueueLog(Path log) {
-    this.queue.put(log);
-  }
-
-  @Override
-  public void logArchived(Path oldPath, Path newPath) {
-    // in sync with queue polling
-    this.pathLock.lock();
-    try {
-      if (oldPath.equals(this.currentPath)) {
-        this.currentPath = newPath;
-        LOG.debug("Current log moved, changing currentPath to " +newPath);
-        return;
-      }
-
-      boolean present = this.queue.remove(oldPath);
-      LOG.debug("old log was " + (present ?
-          "present, changing the queue" : "already processed"));
-      if (present) {
-        this.queue.add(newPath);
-      }
-    } finally {
-      this.pathLock.unlock();
-    }
-  }
-
-  @Override
-  public void run() {
-    connectToPeers();
-    // We were stopped while looping to connect to sinks, just abort
-    if (this.stop.get()) {
-      return;
-    }
-    // If this is recovered, the queue is already full and the first log
-    // normally has a position (unless the RS failed between 2 logs)
-    if (this.queueRecovered) {
-      this.position = this.zkHelper.getHLogRepPosition(
-          this.peerClusterZnode, this.queue.peek().getName());
-    }
-    int sleepMultiplier = 1;
-    // Loop until we close down
-    while (!stop.get() && this.running) {
-
-      // In sync with logArchived
-      this.pathLock.lock();
-      try {
-        // Get a new path
-        if (!getNextPath()) {
-          if (sleepForRetries("No log to process", sleepMultiplier)) {
-            sleepMultiplier++;
-          }
-          continue;
-        }
-        // Open a reader on it
-        if (!openReader(sleepMultiplier)) {
-          continue;
-        }
-      } finally {
-        this.pathLock.unlock();
-      }
-
-      // If we got a null reader but didn't continue, then sleep and continue
-      if (this.reader == null) {
-        if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-        continue;
-      }
-
-      boolean gotIOE = false; // TODO this is a hack for HDFS-1057
-      currentNbEntries = 0;
-      try {
-        if(readAllEntriesToReplicateOrNextFile()) {
-          continue;
-        }
-      } catch (IOException ioe) {
-        LOG.warn(peerClusterZnode + " Got: ", ioe);
-        gotIOE = true;
-        if (ioe.getCause() instanceof EOFException) {
-
-          boolean considerDumping = false;
-          if (this.queueRecovered) {
-            try {
-              FileStatus stat = this.fs.getFileStatus(this.currentPath);
-              if (stat.getLen() == 0) {
-                LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
-              }
-              considerDumping = true;
-            } catch (IOException e) {
-              LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
-            }
-          } else if (currentNbEntries != 0) {
-            LOG.warn(peerClusterZnode + " Got EOF while reading, " +
-                "looks like this file is broken? " + currentPath);
-            considerDumping = true;
-            currentNbEntries = 0;
-          }
-
-          if (considerDumping &&
-              sleepMultiplier == this.maxRetriesMultiplier &&
-              processEndOfFile()) {
-            continue;
-          }
-        }
-      } finally {
-        try {
-          // if current path is null, it means we processEndOfFile hence
-          if (this.currentPath != null && !gotIOE) {
-            this.position = this.reader.getPosition();
-          }
-          if (this.reader != null) {
-            this.reader.close();
-          }
-        } catch (IOException e) {
-          gotIOE = true;
-          LOG.warn("Unable to finalize the tailing of a file", e);
-        }
-      }
-
-      // If we didn't get anything to replicate, or if we hit a IOE,
-      // wait a bit and retry.
-      // But if we need to stop, don't bother sleeping
-      if (!stop.get() && (gotIOE || currentNbEntries == 0)) {
-        if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-        continue;
-      }
-      sleepMultiplier = 1;
-      shipEdits();
-
-    }
-    LOG.debug("Source exiting " + peerClusterId);
-  }
-
-  /**
-   * Read all the entries from the current log files and retain those
-   * that need to be replicated. Else, process the end of the current file.
-   * @return true if we got nothing and went to the next file, false if we got
-   * entries
-   * @throws IOException
-   */
-  protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
-    long seenEntries = 0;
-    if (this.position != 0) {
-      this.reader.seek(this.position);
-    }
-    HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
-    while (entry != null) {
-      WALEdit edit = entry.getEdit();
-      seenEntries++;
-      // Remove all KVs that should not be replicated
-      removeNonReplicableEdits(edit);
-      HLogKey logKey = entry.getKey();
-      // Don't replicate catalog entries, if the WALEdit wasn't
-      // containing anything to replicate and if we're currently not set to replicate
-      if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
-          Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
-          edit.size() != 0 && replicating.get()) {
-        logKey.setClusterId(this.clusterId);
-        currentNbEntries++;
-      }
-      // Stop if too many entries or too big
-      if ((this.reader.getPosition() - this.position)
-          >= this.replicationQueueSizeCapacity ||
-          currentNbEntries >= this.replicationQueueNbCapacity) {
-        break;
-      }
-      entry = this.reader.next(entriesArray[currentNbEntries]);
-    }
-    LOG.debug("currentNbEntries:" + currentNbEntries +
-        " and seenEntries:" + seenEntries +
-        " and size: " + (this.reader.getPosition() - this.position));
-    // If we didn't get anything and the queue has an object, it means we
-    // hit the end of the file for sure
-    return seenEntries == 0 && processEndOfFile();
-  }
-
-  private void connectToPeers() {
-    // Connect to peer cluster first, unless we have to stop
-    while (!this.stop.get() && this.currentPeers.size() == 0) {
-      try {
-        chooseSinks();
-        Thread.sleep(this.sleepForRetries);
-      } catch (InterruptedException e) {
-        LOG.error("Interrupted while trying to connect to sinks", e);
-      }
-    }
-  }
-
-  /**
-   * Poll for the next path
-   * @return true if a path was obtained, false if not
-   */
-  protected boolean getNextPath() {
-    try {
-      if (this.currentPath == null) {
-        this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
-      }
-    } catch (InterruptedException e) {
-      LOG.warn("Interrupted while reading edits", e);
-    }
-    return this.currentPath != null;
-  }
-
-  /**
-   * Open a reader on the current path
-   *
-   * @param sleepMultiplier by how many times the default sleeping time is augmented
-   * @return true if we should continue with that file, false if we are over with it
-   */
-  protected boolean openReader(int sleepMultiplier) {
-    try {
-      LOG.info("Opening log for replication " + this.currentPath.getName() + " at " + this.position);
-      this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
-    } catch (IOException ioe) {
-      this.reader = null;
-      LOG.warn(peerClusterZnode + " Got: ", ioe);
-      // TODO Need a better way to determinate if a file is really gone but
-      // TODO without scanning all logs dir
-      if (sleepMultiplier == this.maxRetriesMultiplier) {
-        LOG.warn("Waited too long for this file, considering dumping");
-        return !processEndOfFile();
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Do the sleeping logic
-   * @param msg Why we sleep
-   * @param sleepMultiplier by how many times the default sleeping time is augmented
-   * @return
-   */
-  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
-    try {
-      LOG.debug(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
-      Thread.sleep(this.sleepForRetries * sleepMultiplier);
-    } catch (InterruptedException e) {
-      LOG.debug("Interrupted while sleeping between retries");
-    }
-    return sleepMultiplier < maxRetriesMultiplier;
-  }
-
-  /**
-   * We only want KVs that are scoped other than local
-   * @param edit The KV to check for replication
-   */
-  protected void removeNonReplicableEdits(WALEdit edit) {
-    NavigableMap<byte[], Integer> scopes = edit.getScopes();
-    List<KeyValue> kvs = edit.getKeyValues();
-    for (int i = 0; i < edit.size(); i++) {
-      KeyValue kv = kvs.get(i);
-      // The scope will be null or empty if
-      // there's nothing to replicate in that WALEdit
-      if (scopes == null || !scopes.containsKey(kv.getFamily())) {
-        kvs.remove(i);
-        i--;
-      }
-    }
-  }
-
-  /**
-   * Do the shipping logic
-   */
-  protected void shipEdits() {
-    int sleepMultiplier = 1;
-    while (!stop.get()) {
-      try {
-        HRegionInterface rrs = getRS();
-        LOG.debug("Replicating " + currentNbEntries);
-        rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
-        this.manager.logPositionAndCleanOldLogs(this.currentPath,
-            this.peerClusterZnode, this.position, queueRecovered);
-        this.totalReplicatedEdits += currentNbEntries;
-        LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
-        break;
-
-      } catch (IOException ioe) {
-        LOG.warn("Unable to replicate because ", ioe);
-        try {
-          boolean down;
-          do {
-            down = isSlaveDown();
-            if (down) {
-              LOG.debug("The region server we tried to ping didn't answer, " +
-                  "sleeping " + sleepForRetries + " times " + sleepMultiplier);
-              Thread.sleep(this.sleepForRetries * sleepMultiplier);
-              if (sleepMultiplier < maxRetriesMultiplier) {
-                sleepMultiplier++;
-              } else {
-                chooseSinks();
-              }
-            }
-          } while (!stop.get() && down);
-        } catch (InterruptedException e) {
-          LOG.debug("Interrupted while trying to contact the peer cluster");
-        }
-
-      }
-    }
-  }
-
-  /**
-   * If the queue isn't empty, switch to the next one
-   * Else if this is a recovered queue, it means we're done!
-   * Else we'll just continue to try reading the log file
-   * @return true if we're done with the current file, false if we should
-   * continue trying to read from it
-   */
-  protected boolean processEndOfFile() {
-    this.pathLock.lock();
-    try {
-      if (this.queue.size() != 0) {
-        this.currentPath = null;
-        this.position = 0;
-        return true;
-      } else if (this.queueRecovered) {
-        this.manager.closeRecoveredQueue(this);
-        this.abort();
-        return true;
-      }
-    } finally {
-      this.pathLock.unlock();
-    }
-    return false;
-  }
-
-  public void startup() {
-    String n = Thread.currentThread().getName();
-    Thread.UncaughtExceptionHandler handler =
-        new Thread.UncaughtExceptionHandler() {
-          public void uncaughtException(final Thread t, final Throwable e) {
-            LOG.fatal("Set stop flag in " + t.getName(), e);
-            abort();
-          }
-        };
-    Threads.setDaemonThreadRunning(
-        this, n + ".replicationSource," + clusterId, handler);
-  }
-
-  /**
-   * Hastily stop the replication, then wait for shutdown
-   */
-  private void abort() {
-    LOG.info("abort");
-    this.running = false;
-    terminate();
-  }
-
-  public void terminate() {
-    LOG.info("terminate");
-    Threads.shutdown(this, this.sleepForRetries);
-  }
-
-  /**
-   * Get a new region server at random from this peer
-   * @return
-   * @throws IOException
-   */
-  private HRegionInterface getRS() throws IOException {
-    if (this.currentPeers.size() == 0) {
-      throw new IOException(this.peerClusterZnode + " has 0 region servers");
-    }
-    HServerAddress address =
-        currentPeers.get(random.nextInt(this.currentPeers.size()));
-    return this.conn.getHRegionConnection(address);
-  }
-
-  /**
-   * Check if the slave is down by trying to establish a connection
-   * @return true if down, false if up
-   * @throws InterruptedException
-   */
-  public boolean isSlaveDown() throws InterruptedException {
-    final CountDownLatch latch = new CountDownLatch(1);
-    Thread pingThread = new Thread() {
-      public void run() {
-        try {
-          HRegionInterface rrs = getRS();
-          // Dummy call which should fail
-          rrs.getHServerInfo();
-          latch.countDown();
-        } catch (IOException ex) {
-          LOG.info("Slave cluster looks down: " + ex.getMessage());
-        }
-      }
-    };
-    pingThread.start();
-    // awaits returns true if countDown happened
-    boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
-    pingThread.interrupt();
-    return down;
-  }
-
-  /**
-   * Get the id that the source is replicating to
-   *
-   * @return peer cluster id
-   */
-  public String getPeerClusterZnode() {
-    return this.peerClusterZnode;
-  }
-
-  /**
-   * Get the path of the current HLog
-   * @return current hlog's path
-   */
-  public Path getCurrentPath() {
-    return this.currentPath;
-  }
-
-  /**
-   * Comparator used to compare logs together based on their start time
-   */
-  public static class LogsComparator implements Comparator<Path> {
-
-    @Override
-    public int compare(Path o1, Path o2) {
-      return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      return true;
-    }
-
-    /**
-     * Split a path to get the start time
-     * For example: 10.20.20.171%3A60020.1277499063250
-     * @param p path to split
-     * @return start time
-     */
-    private long getTS(Path p) {
-      String[] parts = p.getName().split("\\.");
-      return Long.parseLong(parts[parts.length-1]);
-    }
-  }
-
-}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java Wed Sep 19 12:50:17 2012
@@ -1,87 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Interface that defines a replication source
- */
-public interface ReplicationSourceInterface {
-
-  /**
-   * Initializer for the source
-   * @param conf the configuration to use
-   * @param fs the file system to use
-   * @param manager the manager to use
-   * @param stopper the stopper object for this region server
-   * @param replicating the status of the replication on this cluster
-   * @param peerClusterId the id of the peer cluster
-   * @throws IOException
-   */
-  public void init(final Configuration conf,
-                   final FileSystem fs,
-                   final ReplicationSourceManager manager,
-                   final AtomicBoolean stopper,
-                   final AtomicBoolean replicating,
-                   final String peerClusterId) throws IOException;
-
-  /**
-   * Add a log to the list of logs to replicate
-   * @param log path to the log to replicate
-   */
-  public void enqueueLog(Path log);
-
-  /**
-   * Get the current log that's replicated
-   * @return the current log
-   */
-  public Path getCurrentPath();
-
-  /**
-   * Start the replication
-   */
-  public void startup();
-
-  /**
-   * End the replication
-   */
-  public void terminate();
-
-  /**
-   * Get the id that the source is replicating to
-   *
-   * @return peer cluster id
-   */
-  public String getPeerClusterZnode();
-
-  /**
-   * Notify this source that a log was archived
-   * @param oldPath old path of the log
-   * @param newPath new path of the log (archive)
-   */
-  public void logArchived(Path oldPath, Path newPath);
-}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Wed Sep 19 12:50:17 2012
@@ -1,353 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.regionserver.wal.LogActionsListener;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * This class is responsible to manage all the replication
- * sources. There are two classes of sources:
- * <li> Normal sources are persistent and one per peer cluster</li>
- * <li> Old sources are recovered from a failed region server and our
- * only goal is to finish replicating the HLog queue it had up in ZK</li>
- *
- * When a region server dies, this class uses a watcher to get notified and it
- * tries to grab a lock in order to transfer all the queues in a local
- * old source.
- */
-public class ReplicationSourceManager implements LogActionsListener {
-
-  private static final Log LOG =
-      LogFactory.getLog(ReplicationSourceManager.class);
-  // List of all the sources that read this RS's logs
-  private final List<ReplicationSourceInterface> sources;
-  // List of all the sources we got from died RSs
-  private final List<ReplicationSourceInterface> oldsources;
-  // Indicates if we are currently replicating
-  private final AtomicBoolean replicating;
-  // Helper for zookeeper
-  private final ReplicationZookeeperWrapper zkHelper;
-  // Indicates if the region server is closing
-  private final AtomicBoolean stopper;
-  // All logs we are currently trackign
-  private final SortedSet<String> hlogs;
-  private final Configuration conf;
-  private final FileSystem fs;
-  // The path to the latest log we saw, for new coming sources
-  private Path latestPath;
-  // List of all the other region servers in this cluster
-  private final List<String> otherRegionServers;
-  // Path to the hlog archive
-  private final Path oldLogDir;
-
-  /**
-   * Creates a replication manager and sets the watch on all the other
-   * registered region servers
-   * @param zkHelper the zk helper for replication
-   * @param conf the configuration to use
-   * @param stopper the stopper object for this region server
-   * @param fs the file system to use
-   * @param replicating the status of the replication on this cluster
-   * @param oldLogDir the directory where old logs are archived
-   */
-  public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper,
-                                  final Configuration conf,
-                                  final AtomicBoolean stopper,
-                                  final FileSystem fs,
-                                  final AtomicBoolean replicating,
-                                  final Path oldLogDir) {
-    this.sources = new ArrayList<ReplicationSourceInterface>();
-    this.replicating = replicating;
-    this.zkHelper = zkHelper;
-    this.stopper = stopper;
-    this.hlogs = new TreeSet<String>();
-    this.oldsources = new ArrayList<ReplicationSourceInterface>();
-    this.conf = conf;
-    this.fs = fs;
-    this.oldLogDir = oldLogDir;
-    List<String> otherRSs =
-        this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
-    this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
-  }
-
-  /**
-   * Provide the id of the peer and a log key and this method will figure which
-   * hlog it belongs to and will log, for this region server, the current
-   * position. It will also clean old logs from the queue.
-   * @param log Path to the log currently being replicated from
-   * replication status in zookeeper. It will also delete older entries.
-   * @param id id of the peer cluster
-   * @param position current location in the log
-   * @param queueRecovered indicates if this queue comes from another region server
-   */
-  public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
-    String key = log.getName();
-    LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
-    this.zkHelper.writeReplicationStatus(key.toString(), id, position);
-    synchronized (this.hlogs) {
-      if (!queueRecovered && this.hlogs.first() != key) {
-        SortedSet<String> hlogSet = this.hlogs.headSet(key);
-        LOG.info("Removing " + hlogSet.size() +
-            " logs in the list: " + hlogSet);
-        for (String hlog : hlogSet) {
-          this.zkHelper.removeLogFromList(hlog.toString(), id);
-        }
-        hlogSet.clear();
-      }
-    }
-  }
-
-  /**
-   * Adds a normal source per registered peer cluster and tries to process all
-   * old region server hlog queues
-   */
-  public void init() throws IOException {
-    for (String id : this.zkHelper.getPeerClusters().keySet()) {
-      ReplicationSourceInterface src = addSource(id);
-      src.startup();
-    }
-    List<String> currentReplicators = this.zkHelper.getListOfReplicators(null);
-    synchronized (otherRegionServers) {
-      LOG.info("Current list of replicators: " + currentReplicators
-          + " other RSs: " + otherRegionServers);
-    }
-    // Look if there's anything to process after a restart
-    for (String rs : currentReplicators) {
-      synchronized (otherRegionServers) {
-        if (!this.otherRegionServers.contains(rs)) {
-          transferQueues(rs);
-        }
-      }
-    }
-  }
-
-  /**
-   * Add a new normal source to this region server
-   * @param id the id of the peer cluster
-   * @return the created source
-   * @throws IOException
-   */
-  public ReplicationSourceInterface addSource(String id) throws IOException {
-    ReplicationSourceInterface src =
-        getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
-    this.sources.add(src);
-    synchronized (this.hlogs) {
-      if (this.hlogs.size() > 0) {
-        this.zkHelper.addLogToList(this.hlogs.first(),
-            this.sources.get(0).getPeerClusterZnode());
-        src.enqueueLog(this.latestPath);
-      }
-    }
-    return src;
-  }
-
-  /**
-   * Terminate the replication on this region server
-   */
-  public void join() {
-    if (this.sources.size() == 0) {
-      this.zkHelper.deleteOwnRSZNode();
-    }
-    for (ReplicationSourceInterface source : this.sources) {
-      source.terminate();
-    }
-  }
-
-  /**
-   * Get a copy of the hlogs of the first source on this rs
-   * @return a sorted set of hlog names
-   */
-  protected SortedSet<String> getHLogs() {
-    return new TreeSet(this.hlogs);
-  }
-
-  /**
-   * Get a list of all the normal sources of this rs
-   * @return lis of all sources
-   */
-  public List<ReplicationSourceInterface> getSources() {
-    return this.sources;
-  }
-
-  @Override
-  public void logRolled(Path newLog) {
-    if (this.sources.size() > 0) {
-      this.zkHelper.addLogToList(newLog.getName(),
-          this.sources.get(0).getPeerClusterZnode());
-    }
-    synchronized (this.hlogs) {
-      this.hlogs.add(newLog.getName());
-    }
-    this.latestPath = newLog;
-    for (ReplicationSourceInterface source : this.sources) {
-      source.enqueueLog(newLog);
-    }
-  }
-
-  @Override
-  public void logArchived(Path oldPath, Path newPath) {
-    for (ReplicationSourceInterface source : this.sources) {
-      source.logArchived(oldPath, newPath);
-    }
-  }
-
-  /**
-   * Get the ZK help of this manager
-   * @return the helper
-   */
-  public ReplicationZookeeperWrapper getRepZkWrapper() {
-    return zkHelper;
-  }
-
-  /**
-   * Factory method to create a replication source
-   * @param conf the configuration to use
-   * @param fs the file system to use
-   * @param manager the manager to use
-   * @param stopper the stopper object for this region server
-   * @param replicating the status of the replication on this cluster
-   * @param peerClusterId the id of the peer cluster
-   * @return the created source
-   * @throws IOException
-   */
-  public ReplicationSourceInterface getReplicationSource(
-      final Configuration conf,
-      final FileSystem fs,
-      final ReplicationSourceManager manager,
-      final AtomicBoolean stopper,
-      final AtomicBoolean replicating,
-      final String peerClusterId) throws IOException {
-    ReplicationSourceInterface src;
-    try {
-      Class c = Class.forName(conf.get("replication.replicationsource.implementation",
-          ReplicationSource.class.getCanonicalName()));
-      src = (ReplicationSourceInterface) c.newInstance();
-    } catch (Exception e) {
-      LOG.warn("Passed replication source implemention throws errors, " +
-          "defaulting to ReplicationSource", e);
-      src = new ReplicationSource();
-
-    }
-    src.init(conf, fs, manager, stopper, replicating, peerClusterId);
-    return src;
-  }
-
-  /**
-   * Transfer all the queues of the specified to this region server.
-   * First it tries to grab a lock and if it works it will move the
-   * znodes and finally will delete the old znodes.
-   *
-   * It creates one old source for any type of source of the old rs.
-   * @param rsZnode
-   */
-  public void transferQueues(String rsZnode) {
-    // We try to lock that rs' queue directory
-    if (this.stopper.get()) {
-      LOG.info("Not transferring queue since we are shutting down");
-      return;
-    }
-    if (!this.zkHelper.lockOtherRS(rsZnode)) {
-      return;
-    }
-    LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
-    SortedMap<String, SortedSet<String>> newQueues =
-        this.zkHelper.copyQueuesFromRS(rsZnode);
-    if (newQueues == null || newQueues.size() == 0) {
-      return;
-    }
-    this.zkHelper.deleteRsQueues(rsZnode);
-
-    for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
-      String peerId = entry.getKey();
-      try {
-        ReplicationSourceInterface src = getReplicationSource(this.conf,
-            this.fs, this, this.stopper, this.replicating, peerId);
-        this.oldsources.add(src);
-        for (String hlog : entry.getValue()) {
-          src.enqueueLog(new Path(this.oldLogDir, hlog));
-        }
-        src.startup();
-      } catch (IOException e) {
-        // TODO manage it
-        LOG.error("Failed creating a source", e);
-      }
-    }
-  }
-
-  /**
-   * Clear the references to the specified old source
-   * @param src source to clear
-   */
-  public void closeRecoveredQueue(ReplicationSourceInterface src) {
-    LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
-    this.oldsources.remove(src);
-    this.zkHelper.deleteSource(src.getPeerClusterZnode());
-  }
-
-  /**
-   * Watcher used to be notified of the other region server's death
-   * in the local cluster. It initiates the process to transfer the queues
-   * if it is able to grab the lock.
-   */
-  public class OtherRegionServerWatcher implements Watcher {
-    @Override
-    public void process(WatchedEvent watchedEvent) {
-      LOG.info(" event " + watchedEvent);
-      if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
-          watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
-        return;
-      }
-
-      List<String> newRsList = (zkHelper.getRegisteredRegionServers(this));
-      if (newRsList == null) {
-        return;
-      } else {
-        synchronized (otherRegionServers) {
-          otherRegionServers.clear();
-          otherRegionServers.addAll(newRsList);
-        }
-      }
-      if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
-        LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
-        String[] rsZnodeParts = watchedEvent.getPath().split("/");
-        transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
-      }
-    }
-  }
-
-}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java Wed Sep 19 12:50:17 2012
@@ -1,77 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Source that does nothing at all, helpful to test ReplicationSourceManager
- */
-public class ReplicationSourceDummy implements ReplicationSourceInterface {
-
-  ReplicationSourceManager manager;
-  String peerClusterId;
-  Path currentPath;
-
-  @Override
-  public void init(Configuration conf, FileSystem fs,
-                   ReplicationSourceManager manager, AtomicBoolean stopper,
-                   AtomicBoolean replicating, String peerClusterId)
-      throws IOException {
-    this.manager = manager;
-    this.peerClusterId = peerClusterId;
-  }
-
-  @Override
-  public void enqueueLog(Path log) {
-    this.currentPath = log;
-  }
-
-  @Override
-  public Path getCurrentPath() {
-    return this.currentPath;
-  }
-
-  @Override
-  public void startup() {
-
-  }
-
-  @Override
-  public void terminate() {
-
-  }
-
-  @Override
-  public String getPeerClusterZnode() {
-    return peerClusterId;
-  }
-
-  @Override
-  public void logArchived(Path oldPath, Path newPath) {
-  }
-}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java Wed Sep 19 12:50:17 2012
@@ -1,472 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MiniZooKeeperCluster;
-import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestReplication {
-
-  private static final Log LOG = LogFactory.getLog(TestReplication.class);
-
-  private static Configuration conf1;
-  private static Configuration conf2;
-
-  private static ZooKeeperWrapper zkw1;
-  private static ZooKeeperWrapper zkw2;
-
-  private static HTable htable1;
-  private static HTable htable2;
-
-  private static HBaseTestingUtility utility1;
-  private static HBaseTestingUtility utility2;
-  private static final int NB_ROWS_IN_BATCH = 100;
-  private static final long SLEEP_TIME = 500;
-  private static final int NB_RETRIES = 10;
-
-  private static final byte[] tableName = Bytes.toBytes("test");
-  private static final byte[] famName = Bytes.toBytes("f");
-  private static final byte[] row = Bytes.toBytes("row");
-  private static final byte[] noRepfamName = Bytes.toBytes("norep");
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    conf1 = HBaseConfiguration.create();
-    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
-    // smaller block size and capacity to trigger more operations
-    // and test them
-    conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
-    conf1.setInt("replication.source.nb.capacity", 5);
-    conf1.setLong("replication.source.sleepforretries", 100);
-    conf1.setInt("hbase.regionserver.maxlogs", 10);
-    conf1.setLong("hbase.master.logcleaner.ttl", 10);
-    conf1.setLong("hbase.client.retries.number", 4);
-    conf1.setInt(HConstants.ZOOKEEPER_CONNECTION_RETRY_NUM, 2);
-    conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
-    conf1.setBoolean("dfs.support.append", true);
-    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
-
-    utility1 = new HBaseTestingUtility(conf1);
-    utility1.startMiniZKCluster();
-    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
-    zkw1 = ZooKeeperWrapper.createInstance(conf1, "cluster1");
-    zkw1.writeZNode("/1", "replication", "");
-    zkw1.writeZNode("/1/replication", "master",
-        conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
-            conf1.get(HConstants.ZOOKEEPER_CLIENT_PORT)+":/1");
-    setIsReplication(true);
-
-    LOG.info("Setup first Zk");
-
-    conf2 = HBaseConfiguration.create();
-    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
-    conf2.setInt("hbase.client.retries.number", 6);
-    conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
-    conf2.setBoolean("dfs.support.append", true);
-    conf2.setInt(HConstants.ZOOKEEPER_CONNECTION_RETRY_NUM, 2);
-    utility2 = new HBaseTestingUtility(conf2);
-    utility2.setZkCluster(miniZK);
-    zkw2 = ZooKeeperWrapper.createInstance(conf2, "cluster2");
-    zkw2.writeZNode("/2", "replication", "");
-    zkw2.writeZNode("/2/replication", "master",
-        conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
-            conf1.get(HConstants.ZOOKEEPER_CLIENT_PORT)+":/1");
-
-    zkw1.writeZNode("/1/replication/peers", "1",
-        conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
-            conf2.get(HConstants.ZOOKEEPER_CLIENT_PORT)+":/2");
-
-    LOG.info("Setup second Zk");
-
-    utility1.startMiniCluster(2);
-    utility2.startMiniCluster(2);
-
-    HTableDescriptor table = new HTableDescriptor(tableName);
-    table.setDeferredLogFlush(false);
-    HColumnDescriptor fam = new HColumnDescriptor(famName);
-    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
-    table.addFamily(fam);
-    fam = new HColumnDescriptor(noRepfamName);
-    table.addFamily(fam);
-    HBaseAdmin admin1 = new HBaseAdmin(conf1);
-    HBaseAdmin admin2 = new HBaseAdmin(conf2);
-    admin1.createTable(table);
-    admin2.createTable(table);
-
-    htable1 = new HTable(conf1, tableName);
-    htable1.setWriteBufferSize(1024*5);
-    htable2 = new HTable(conf2, tableName);
-  }
-
-  private static void setIsReplication(boolean rep) throws Exception {
-    LOG.info("Set rep " + rep);
-    zkw1.writeZNode("/1/replication", "state", Boolean.toString(rep));
-    // Takes some ms for ZK to fire the watcher
-    Thread.sleep(SLEEP_TIME);
-  }
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @Before
-  public void setUp() throws Exception {
-    setIsReplication(false);
-    utility1.truncateTable(tableName);
-    utility2.truncateTable(tableName);
-    // If test is flaky, set that sleep higher
-    Thread.sleep(SLEEP_TIME*8);
-    setIsReplication(true);
-  }
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    utility2.shutdownMiniCluster();
-    utility1.shutdownMiniCluster();
-  }
-
-  /**
-   * Add a row, check it's replicated, delete it, check's gone
-   * @throws Exception
-   */
-  @Test
-  public void testSimplePutDelete() throws Exception {
-    LOG.info("testSimplePutDelete");
-    Put put = new Put(row);
-    put.add(famName, row, row);
-
-    htable1 = new HTable(conf1, tableName);
-    htable1.put(put);
-
-    HTable table2 = new HTable(conf2, tableName);
-    Get get = new Get(row);
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for put replication");
-      }
-      Result res = table2.get(get);
-      if (res.size() == 0) {
-        LOG.info("Row not available");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        assertArrayEquals(res.value(), row);
-        break;
-      }
-    }
-
-    Delete del = new Delete(row);
-    htable1.delete(del);
-
-    table2 = new HTable(conf2, tableName);
-    get = new Get(row);
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for del replication");
-      }
-      Result res = table2.get(get);
-      if (res.size() >= 1) {
-        LOG.info("Row not deleted");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        break;
-      }
-    }
-  }
-
-  /**
-   * Try a small batch upload using the write buffer, check it's replicated
-   * @throws Exception
-   */
-  @Test
-  public void testSmallBatch() throws Exception {
-    LOG.info("testSmallBatch");
-    Put put;
-    // normal Batch tests
-    htable1.setAutoFlush(false);
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      put = new Put(Bytes.toBytes(i));
-      put.add(famName, row, row);
-      htable1.put(put);
-    }
-    htable1.flushCommits();
-
-    Scan scan = new Scan();
-
-    ResultScanner scanner1 = htable1.getScanner(scan);
-    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
-    scanner1.close();
-    assertEquals(NB_ROWS_IN_BATCH, res1.length);
-
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for normal batch replication");
-      }
-      ResultScanner scanner = htable2.getScanner(scan);
-      Result[] res = scanner.next(NB_ROWS_IN_BATCH);
-      scanner.close();
-      if (res.length != NB_ROWS_IN_BATCH) {
-        LOG.info("Only got " + res.length + " rows");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        break;
-      }
-    }
-
-    htable1.setAutoFlush(true);
-
-  }
-
-  /**
-   * Test stopping replication, trying to insert, make sure nothing's
-   * replicated, enable it, try replicating and it should work
-   * @throws Exception
-   */
-  @Test
-  public void testStartStop() throws Exception {
-
-    // Test stopping replication
-    setIsReplication(false);
-
-    Put put = new Put(Bytes.toBytes("stop start"));
-    put.add(famName, row, row);
-    htable1.put(put);
-
-    Get get = new Get(Bytes.toBytes("stop start"));
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        break;
-      }
-      Result res = htable2.get(get);
-      if(res.size() >= 1) {
-        fail("Replication wasn't stopped");
-
-      } else {
-        LOG.info("Row not replicated, let's wait a bit more...");
-        Thread.sleep(SLEEP_TIME);
-      }
-    }
-
-    // Test restart replication
-    setIsReplication(true);
-
-    htable1.put(put);
-
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for put replication");
-      }
-      Result res = htable2.get(get);
-      if(res.size() == 0) {
-        LOG.info("Row not available");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        assertArrayEquals(res.value(), row);
-        break;
-      }
-    }
-
-    put = new Put(Bytes.toBytes("do not rep"));
-    put.add(noRepfamName, row, row);
-    htable1.put(put);
-
-    get = new Get(Bytes.toBytes("do not rep"));
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i == NB_RETRIES-1) {
-        break;
-      }
-      Result res = htable2.get(get);
-      if (res.size() >= 1) {
-        fail("Not supposed to be replicated");
-      } else {
-        LOG.info("Row not replicated, let's wait a bit more...");
-        Thread.sleep(SLEEP_TIME);
-      }
-    }
-
-  }
-
-  /**
-   * Do a more intense version testSmallBatch, one  that will trigger
-   * hlog rolling and other non-trivial code paths
-   * @throws Exception
-   */
-  @Test
-  public void loadTesting() throws Exception {
-    htable1.setWriteBufferSize(1024);
-    htable1.setAutoFlush(false);
-    for (int i = 0; i < NB_ROWS_IN_BATCH *10; i++) {
-      Put put = new Put(Bytes.toBytes(i));
-      put.add(famName, row, row);
-      htable1.put(put);
-    }
-    htable1.flushCommits();
-
-    Scan scan = new Scan();
-
-    ResultScanner scanner = htable1.getScanner(scan);
-    Result[] res = scanner.next(NB_ROWS_IN_BATCH * 100);
-    scanner.close();
-
-    assertEquals(NB_ROWS_IN_BATCH *10, res.length);
-
-    scan = new Scan();
-
-    for (int i = 0; i < NB_RETRIES; i++) {
-
-      scanner = htable2.getScanner(scan);
-      res = scanner.next(NB_ROWS_IN_BATCH * 100);
-      scanner.close();
-      if (res.length != NB_ROWS_IN_BATCH *10) {
-        if (i == NB_RETRIES-1) {
-          int lastRow = -1;
-          for (Result result : res) {
-            int currentRow = Bytes.toInt(result.getRow());
-            for (int row = lastRow+1; row < currentRow; row++) {
-              LOG.error("Row missing: " + row);
-            }
-            lastRow = currentRow;
-          }
-          LOG.error("Last row: " + lastRow);
-          fail("Waited too much time for normal batch replication, "
-              + res.length + " instead of " + NB_ROWS_IN_BATCH *10);
-        } else {
-          LOG.info("Only got " + res.length + " rows");
-          Thread.sleep(SLEEP_TIME);
-        }
-      } else {
-        break;
-      }
-    }
-  }
-
-  /**
-   * Load up multiple tables over 2 region servers and kill a source during
-   * the upload. The failover happens internally.
-   * @throws Exception
-   */
-  @Test (timeout=300000)
-  public void queueFailover() throws Exception {
-    utility1.createMultiRegions(htable1, famName);
-
-    // killing the RS with .META. can result into failed puts until we solve
-    // IO fencing
-    int rsToKill1 =
-        utility1.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
-    int rsToKill2 =
-        utility2.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
-
-    // Takes about 20 secs to run the full loading, kill around the middle
-    Thread killer1 = killARegionServer(utility1, 5000, rsToKill1);
-    Thread killer2 = killARegionServer(utility2, 5000, rsToKill2);
-
-    LOG.info("Start loading table");
-    int initialCount = utility1.loadTable(htable1, famName);
-    LOG.info("Done loading table");
-    killer1.join(5000);
-    killer2.join(5000);
-    LOG.info("Done waiting for threads");
-
-    Result[] res = null;
-    while (true) {
-      try {
-        Scan scan = new Scan();
-        ResultScanner scanner = htable1.getScanner(scan);
-        res = scanner.next(initialCount);
-        scanner.close();
-        break;
-      } catch (UnknownScannerException ex) {
-        LOG.info("Cluster wasn't ready yet, restarting scanner");
-      }
-    }
-
-    // Test we actually have all the rows, we may miss some because we
-    // don't have IO fencing.
-    if (res.length != initialCount) {
-      LOG.warn("We lost some rows on the master cluster!");
-      // We don't really expect the other cluster to have more rows
-      initialCount = res.length;
-    }
-
-    Scan scan2 = new Scan();
-
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for queueFailover replication");
-      }
-      ResultScanner scanner2 = htable2.getScanner(scan2);
-      Result[] res2 = scanner2.next(initialCount * 2);
-      scanner2.close();
-      if (res2.length < initialCount) {
-        LOG.info("Only got " + res2.length + " rows instead of " +
-            initialCount + " current i=" + i);
-        Thread.sleep(SLEEP_TIME * 2);
-      } else {
-        break;
-      }
-    }
-  }
-
-  private static Thread killARegionServer(final HBaseTestingUtility utility,
-                                   final long timeout, final int rs) {
-    Thread killer = new Thread() {
-      public void run() {
-        try {
-          Thread.sleep(timeout);
-          utility.expireRegionServerSession(rs);
-        } catch (Exception e) {
-          LOG.error(e);
-        }
-      }
-    };
-    killer.start();
-    return killer;
-  }
-}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java Wed Sep 19 12:50:17 2012
@@ -1,111 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-public class TestReplicationSource {
-
-  private static final Log LOG =
-      LogFactory.getLog(TestReplicationSource.class);
-  private final static HBaseTestingUtility TEST_UTIL =
-      new HBaseTestingUtility();
-  private static FileSystem fs;
-  private static Path oldLogDir;
-  private static Path logDir;
-  private static Configuration conf = HBaseConfiguration.create();
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL.startMiniDFSCluster(1);
-    fs = TEST_UTIL.getDFSCluster().getFileSystem();
-
-    oldLogDir = TEST_UTIL.getTestDir(HConstants.HREGION_OLDLOGDIR_NAME);
-    fs.mkdirs(oldLogDir);
-    logDir = TEST_UTIL.getTestDir(HConstants.HREGION_LOGDIR_NAME);
-    fs.mkdirs(logDir);
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    fs.delete(oldLogDir, true);
-    fs.delete(logDir, true);
-  }
-
-  /**
-   * Sanity check that we can move logs around while we are reading
-   * from them. Should this test fail, ReplicationSource would have a hard
-   * time reading logs that are being archived.
-   * @throws Exception
-   */
-  @Test
-  public void testLogMoving() throws Exception{
-    Path logPath = new Path(logDir, "log");
-    HLog.Writer writer = HLog.createWriter(fs, logPath, conf);
-    for(int i = 0; i < 3; i++) {
-      byte[] b = Bytes.toBytes(Integer.toString(i));
-      KeyValue kv = new KeyValue(b,b,b);
-      WALEdit edit = new WALEdit();
-      edit.add(kv);
-      HLogKey key = new HLogKey(b, b, 0, 0);
-      writer.append(new HLog.Entry(key, edit));
-      writer.sync();
-    }
-    writer.close();
-
-    HLog.Reader reader = HLog.getReader(fs, logPath, conf);
-    HLog.Entry entry = reader.next();
-    assertNotNull(entry);
-
-    Path oldLogPath = new Path(oldLogDir, "log");
-    fs.rename(logPath, oldLogPath);
-
-    entry = reader.next();
-    assertNotNull(entry);
-
-    entry = reader.next();
-    entry = reader.next();
-
-    assertNull(entry);
-
-  }
-}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1387563&r1=1387562&r2=1387563&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Wed Sep 19 12:50:17 2012
@@ -1,209 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
-import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.net.URLEncoder;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestReplicationSourceManager {
-
-  private static final Log LOG =
-      LogFactory.getLog(TestReplicationSourceManager.class);
-
-  private static Configuration conf;
-
-  private static HBaseTestingUtility utility;
-
-  private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
-
-  private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
-
-  private static ReplicationSourceManager manager;
-
-  private static ZooKeeperWrapper zkw;
-
-  private static HTableDescriptor htd;
-
-  private static HRegionInfo hri;
-
-  private static final byte[] r1 = Bytes.toBytes("r1");
-
-  private static final byte[] r2 = Bytes.toBytes("r2");
-
-  private static final byte[] f1 = Bytes.toBytes("f1");
-
-  private static final byte[] test = Bytes.toBytes("test");
-
-  private static FileSystem fs;
-
-  private static Path oldLogDir;
-
-  private static Path logDir;
-
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-
-    conf = HBaseConfiguration.create();
-    conf.set("replication.replicationsource.implementation",
-        ReplicationSourceDummy.class.getCanonicalName());
-    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
-    utility = new HBaseTestingUtility(conf);
-    utility.startMiniZKCluster();
-
-    zkw = ZooKeeperWrapper.createInstance(conf, "test");
-    zkw.writeZNode("/hbase", "replication", "");
-    final String clusterKey = conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
-        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1";
-    zkw.writeZNode("/hbase/replication", "master", clusterKey);
-    zkw.writeZNode("/hbase/replication/peers", "1", clusterKey);
-
-    // set port to 0 so that RS picks a port dynamically to
-    // avoid clash over the default port in unit test runs.
-    conf.set(HConstants.REGIONSERVER_PORT, "0");
-
-    HRegionServer server = new HRegionServer(conf);
-    ReplicationZookeeperWrapper helper = new ReplicationZookeeperWrapper(
-        server.getZooKeeperWrapper(), conf,
-        REPLICATING, "123456789");
-    fs = FileSystem.get(conf);
-    oldLogDir = new Path(utility.getTestDir(),
-        HConstants.HREGION_OLDLOGDIR_NAME);
-    logDir = new Path(utility.getTestDir(),
-        HConstants.HREGION_LOGDIR_NAME);
-
-    manager = new ReplicationSourceManager(helper,
-        conf, STOPPER, fs, REPLICATING, oldLogDir);
-    manager.addSource("1");
-
-    htd = new HTableDescriptor(test);
-    HColumnDescriptor col = new HColumnDescriptor("f1");
-    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
-    htd.addFamily(col);
-    col = new HColumnDescriptor("f2");
-    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
-    htd.addFamily(col);
-
-    hri = new HRegionInfo(htd, r1, r2);
-
-
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    manager.join();
-    utility.shutdownMiniCluster();
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    fs.delete(logDir, true);
-    fs.delete(oldLogDir, true);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    setUp();
-  }
-
-  @Test
-  public void testLogRoll() throws Exception {
-    long seq = 0;
-    long baseline = 1000;
-    long time = baseline;
-    KeyValue kv = new KeyValue(r1, f1, r1);
-    WALEdit edit = new WALEdit();
-    edit.add(kv);
-
-    HLog hlog = new HLog(fs, logDir, oldLogDir, conf, null, manager,
-      URLEncoder.encode("regionserver:60020", "UTF8"));
-
-    manager.init();
-
-    // Testing normal log rolling every 20
-    for(long i = 1; i < 101; i++) {
-      if(i > 1 && i % 20 == 0) {
-        hlog.rollWriter();
-      }
-      LOG.info(i);
-      HLogKey key = new HLogKey(hri.getRegionName(),
-        test, seq++, System.currentTimeMillis());
-      hlog.append(hri, key, edit);
-    }
-
-    // Simulate a rapid insert that's followed
-    // by a report that's still not totally complete (missing last one)
-    LOG.info(baseline + " and " + time);
-    baseline += 101;
-    time = baseline;
-    LOG.info(baseline + " and " + time);
-
-    for (int i = 0; i < 3; i++) {
-      HLogKey key = new HLogKey(hri.getRegionName(),
-        test, seq++, System.currentTimeMillis());
-      hlog.append(hri, key, edit);
-    }
-
-    assertEquals(6, manager.getHLogs().size());
-
-    hlog.rollWriter();
-
-    manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
-        "1", 0, false);
-
-    HLogKey key = new HLogKey(hri.getRegionName(),
-          test, seq++, System.currentTimeMillis());
-    hlog.append(hri, key, edit);
-
-    assertEquals(1, manager.getHLogs().size());
-
-
-    // TODO Need a case with only 2 HLogs and we only want to delete the first one
-  }
-
-}