You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2010/07/01 02:25:51 UTC

svn commit: r959479 [2/2] - in /hbase/trunk: ./ bin/replication/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/mai...

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu Jul  1 00:25:50 2010
@@ -0,0 +1,647 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Class that handles the source of a replication stream.
+ * Currently does not handle more than 1 slave
+ * For each slave cluster it selects a random number of peers
+ * using a replication ratio. For example, if replication ration = 0.1
+ * and slave cluster has 100 region servers, 10 will be selected.
+ * <p/>
+ * A stream is considered down when we cannot contact a region server on the
+ * peer cluster for more than 55 seconds by default.
+ * <p/>
+ *
+ */
+public class ReplicationSource extends Thread
+    implements ReplicationSourceInterface {
+
+  private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
+  // Lock to manage when a HLog is changing path (archiving)
+  private final ReentrantLock pathLock = new ReentrantLock();
+  // Queue of logs to process
+  private PriorityBlockingQueue<Path> queue;
+  // container of entries to replicate
+  private HLog.Entry[] entriesArray;
+  private HConnection conn;
+  // Helper class for zookeeper
+  private ReplicationZookeeperWrapper zkHelper;
+  private Configuration conf;
+  // ratio   of region servers to chose from a slave cluster
+  private float ratio;
+  private Random random;
+  // should we replicate or not?
+  private AtomicBoolean replicating;
+  // id of the peer cluster this source replicates to
+  private String peerClusterId;
+  // The manager of all sources to which we ping back our progress
+  private ReplicationSourceManager manager;
+  // Should we stop everything?
+  private AtomicBoolean stop;
+  // List of chosen sinks (region servers)
+  private List<HServerAddress> currentPeers;
+  // How long should we sleep for each retry
+  private long sleepForRetries;
+  // Max size in bytes of entriesArray
+  private long replicationQueueSizeCapacity;
+  // Max number of entries in entriesArray
+  private int replicationQueueNbCapacity;
+  // Our reader for the current log
+  private HLog.Reader reader;
+  // Current position in the log
+  private long position = 0;
+  // Path of the current log
+  private volatile Path currentPath;
+  private FileSystem fs;
+  // id of this cluster
+  private byte clusterId;
+  // total number of edits we replicated
+  private long totalReplicatedEdits = 0;
+  // The znode we currently play with
+  private String peerClusterZnode;
+  // Indicates if this queue is recovered (and will be deleted when depleted)
+  private boolean queueRecovered;
+  // Maximum number of retries before taking bold actions
+  private long maxRetriesMultiplier;
+  // Current number of entries that we need to replicate
+  private int currentNbEntries = 0;
+  // Indicates if this particular source is running
+  private volatile boolean running = true;
+
+  /**
+   * Instantiation method used by region servers
+   *
+   * @param conf configuration to use
+   * @param fs file system to use
+   * @param manager replication manager to ping to
+   * @param stopper     the atomic boolean to use to stop the regionserver
+   * @param replicating the atomic boolean that starts/stops replication
+   * @param peerClusterZnode the name of our znode
+   * @throws IOException
+   */
+  public void init(final Configuration conf,
+                   final FileSystem fs,
+                   final ReplicationSourceManager manager,
+                   final AtomicBoolean stopper,
+                   final AtomicBoolean replicating,
+                   final String peerClusterZnode)
+      throws IOException {
+    this.stop = stopper;
+    this.conf = conf;
+    this.replicationQueueSizeCapacity =
+        this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
+    this.replicationQueueNbCapacity =
+        this.conf.getInt("replication.source.nb.capacity", 25000);
+    this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity];
+    for (int i = 0; i < this.replicationQueueNbCapacity; i++) {
+      this.entriesArray[i] = new HLog.Entry();
+    }
+    this.maxRetriesMultiplier =
+        this.conf.getLong("replication.source.maxretriesmultiplier", 10);
+    this.queue =
+        new PriorityBlockingQueue<Path>(
+            conf.getInt("hbase.regionserver.maxlogs", 32),
+            new LogsComparator());
+    this.conn = HConnectionManager.getConnection(conf);
+    this.zkHelper = manager.getRepZkWrapper();
+    this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
+    this.currentPeers = new ArrayList<HServerAddress>();
+    this.random = new Random();
+    this.replicating = replicating;
+    this.manager = manager;
+    this.sleepForRetries =
+        this.conf.getLong("replication.source.sleepforretries", 1000);
+    this.fs = fs;
+    this.clusterId = Byte.valueOf(zkHelper.getClusterId());
+
+    // Finally look if this is a recovered queue
+    this.checkIfQueueRecovered(peerClusterZnode);
+  }
+
+  // The passed znode will be either the id of the peer cluster or
+  // the handling story of that queue in the form of id-startcode-*
+  private void checkIfQueueRecovered(String peerClusterZnode) {
+    String[] parts = peerClusterZnode.split("-");
+    this.queueRecovered = parts.length != 1;
+    this.peerClusterId = this.queueRecovered ?
+        parts[0] : peerClusterZnode;
+    this.peerClusterZnode = peerClusterZnode;
+  }
+
+  /**
+   * Select a number of peers at random using the ratio. Mininum 1.
+   */
+  private void chooseSinks() {
+    this.currentPeers.clear();
+    List<HServerAddress> addresses =
+        this.zkHelper.getPeersAddresses(peerClusterId);
+    Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
+    int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
+    LOG.info("Getting " + nbPeers +
+        " rs from peer cluster # " + peerClusterId);
+    for (int i = 0; i < nbPeers; i++) {
+      HServerAddress address;
+      // Make sure we get one address that we don't already have
+      do {
+        address = addresses.get(this.random.nextInt(addresses.size()));
+      } while (setOfAddr.contains(address));
+      LOG.info("Choosing peer " + address);
+      setOfAddr.add(address);
+    }
+    this.currentPeers.addAll(setOfAddr);
+  }
+
+  @Override
+  public void enqueueLog(Path log) {
+    this.queue.put(log);
+  }
+
+  @Override
+  public void logArchived(Path oldPath, Path newPath) {
+    // in sync with queue polling
+    this.pathLock.lock();
+    try {
+      if (oldPath.equals(this.currentPath)) {
+        this.currentPath = newPath;
+        LOG.debug("Current log moved, changing currentPath to " +newPath);
+        return;
+      }
+
+      boolean present = this.queue.remove(oldPath);
+      LOG.debug("old log was " + (present ?
+          "present, changing the queue" : "already processed"));
+      if (present) {
+        this.queue.add(newPath);
+      }
+    } finally {
+      this.pathLock.unlock();
+    }
+  }
+
+  @Override
+  public void run() {
+    connectToPeers();
+    // We were stopped while looping to connect to sinks, just abort
+    if (this.stop.get()) {
+      return;
+    }
+    // If this is recovered, the queue is already full and the first log
+    // normally has a position (unless the RS failed between 2 logs)
+    if (this.queueRecovered) {
+      this.position = this.zkHelper.getHLogRepPosition(
+          this.peerClusterZnode, this.queue.peek().getName());
+    }
+    int sleepMultiplier = 1;
+    // Loop until we close down
+    while (!stop.get() && this.running) {
+
+      // In sync with logArchived
+      this.pathLock.lock();
+      try {
+        // Get a new path
+        if (!getNextPath()) {
+          if (sleepForRetries("No log to process", sleepMultiplier)) {
+            sleepMultiplier++;
+          }
+          continue;
+        }
+        // Open a reader on it
+        if (!openReader(sleepMultiplier)) {
+          continue;
+        }
+      } finally {
+        this.pathLock.unlock();
+      }
+
+      // If we got a null reader but didn't continue, then sleep and continue
+      if (this.reader == null) {
+        if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
+
+      boolean gotIOE = false; // TODO this is a hack for HDFS-1057
+      currentNbEntries = 0;
+      try {
+        if(readAllEntriesToReplicateOrNextFile()) {
+          continue;
+        }
+      } catch (IOException ioe) {
+        LOG.warn(peerClusterZnode + " Got: ", ioe);
+        gotIOE = true;
+        if (ioe.getCause() instanceof EOFException) {
+
+          boolean considerDumping = false;
+          if (this.queueRecovered) {
+            try {
+              FileStatus stat = this.fs.getFileStatus(this.currentPath);
+              if (stat.getLen() == 0) {
+                LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
+              }
+              considerDumping = true;
+            } catch (IOException e) {
+              LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
+            }
+          } else if (currentNbEntries != 0) {
+            LOG.warn(peerClusterZnode + " Got EOF while reading, " +
+                "looks like this file is broken? " + currentPath);
+            considerDumping = true;
+            currentNbEntries = 0;
+          }
+
+          if (considerDumping &&
+              sleepMultiplier == this.maxRetriesMultiplier &&
+              processEndOfFile()) {
+            continue;
+          }
+        }
+      } finally {
+        try {
+          // if current path is null, it means we processEndOfFile hence
+          if (this.currentPath != null && !gotIOE) {
+            this.position = this.reader.getPosition();
+          }
+          if (this.reader != null) {
+            this.reader.close();
+          }
+        } catch (IOException e) {
+          gotIOE = true;
+          LOG.warn("Unable to finalize the tailing of a file", e);
+        }
+      }
+
+      // If we didn't get anything to replicate, or if we hit a IOE,
+      // wait a bit and retry.
+      // But if we need to stop, don't bother sleeping
+      if (!stop.get() && (gotIOE || currentNbEntries == 0)) {
+        if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
+      sleepMultiplier = 1;
+      shipEdits();
+
+    }
+    LOG.debug("Source exiting " + peerClusterId);
+  }
+
+  /**
+   * Read all the entries from the current log files and retain those
+   * that need to be replicated. Else, process the end of the current file.
+   * @return true if we got nothing and went to the next file, false if we got
+   * entries
+   * @throws IOException
+   */
+  protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
+    long seenEntries = 0;
+    if (this.position != 0) {
+      this.reader.seek(this.position);
+    }
+    HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
+    while (entry != null) {
+      WALEdit edit = entry.getEdit();
+      seenEntries++;
+      // Remove all KVs that should not be replicated
+      removeNonReplicableEdits(edit);
+      HLogKey logKey = entry.getKey();
+      // Don't replicate catalog entries, if the WALEdit wasn't
+      // containing anything to replicate and if we're currently not set to replicate
+      if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
+          Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
+          edit.size() != 0 && replicating.get()) {
+        logKey.setClusterId(this.clusterId);
+        currentNbEntries++;
+      }
+      // Stop if too many entries or too big
+      if ((this.reader.getPosition() - this.position)
+          >= this.replicationQueueSizeCapacity ||
+          currentNbEntries >= this.replicationQueueNbCapacity) {
+        break;
+      }
+      entry = this.reader.next(entriesArray[currentNbEntries]);
+    }
+    LOG.debug("currentNbEntries:" + currentNbEntries +
+        " and seenEntries:" + seenEntries +
+        " and size: " + (this.reader.getPosition() - this.position));
+    // If we didn't get anything and the queue has an object, it means we
+    // hit the end of the file for sure
+    return seenEntries == 0 && processEndOfFile();
+  }
+
+  private void connectToPeers() {
+    // Connect to peer cluster first, unless we have to stop
+    while (!this.stop.get() && this.currentPeers.size() == 0) {
+      try {
+        chooseSinks();
+        Thread.sleep(this.sleepForRetries);
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while trying to connect to sinks", e);
+      }
+    }
+  }
+
+  /**
+   * Poll for the next path
+   * @return true if a path was obtained, false if not
+   */
+  protected boolean getNextPath() {
+    try {
+      if (this.currentPath == null) {
+        this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while reading edits", e);
+    }
+    return this.currentPath != null;
+  }
+
+  /**
+   * Open a reader on the current path
+   *
+   * @param sleepMultiplier by how many times the default sleeping time is augmented
+   * @return true if we should continue with that file, false if we are over with it
+   */
+  protected boolean openReader(int sleepMultiplier) {
+    try {
+      LOG.info("Opening log for replication " + this.currentPath.getName() + " at " + this.position);
+      this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
+    } catch (IOException ioe) {
+      this.reader = null;
+      LOG.warn(peerClusterZnode + " Got: ", ioe);
+      // TODO Need a better way to determinate if a file is really gone but
+      // TODO without scanning all logs dir
+      if (sleepMultiplier == this.maxRetriesMultiplier) {
+        LOG.warn("Waited too long for this file, considering dumping");
+        return !processEndOfFile();
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Do the sleeping logic
+   * @param msg Why we sleep
+   * @param sleepMultiplier by how many times the default sleeping time is augmented
+   * @return
+   */
+  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
+    try {
+      LOG.debug(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+      Thread.sleep(this.sleepForRetries * sleepMultiplier);
+    } catch (InterruptedException e) {
+      LOG.debug("Interrupted while sleeping between retries");
+    }
+    return sleepMultiplier < maxRetriesMultiplier;
+  }
+
+  /**
+   * We only want KVs that are scoped other than local
+   * @param edit The KV to check for replication
+   */
+  protected void removeNonReplicableEdits(WALEdit edit) {
+    NavigableMap<byte[], Integer> scopes = edit.getScopes();
+    List<KeyValue> kvs = edit.getKeyValues();
+    for (int i = 0; i < edit.size(); i++) {
+      KeyValue kv = kvs.get(i);
+      // The scope will be null or empty if
+      // there's nothing to replicate in that WALEdit
+      if (scopes == null || !scopes.containsKey(kv.getFamily())) {
+        kvs.remove(i);
+        i--;
+      }
+    }
+  }
+
+  /**
+   * Do the shipping logic
+   */
+  protected void shipEdits() {
+    int sleepMultiplier = 1;
+    while (!stop.get()) {
+      try {
+        HRegionInterface rrs = getRS();
+        LOG.debug("Replicating " + currentNbEntries);
+        rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
+        this.manager.logPositionAndCleanOldLogs(this.currentPath,
+            this.peerClusterZnode, this.position, queueRecovered);
+        this.totalReplicatedEdits += currentNbEntries;
+        LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
+        break;
+
+      } catch (IOException ioe) {
+        LOG.warn("Unable to replicate because ", ioe);
+        try {
+          boolean down;
+          do {
+            down = isSlaveDown();
+            if (down) {
+              LOG.debug("The region server we tried to ping didn't answer, " +
+                  "sleeping " + sleepForRetries + " times " + sleepMultiplier);
+              Thread.sleep(this.sleepForRetries * sleepMultiplier);
+              if (sleepMultiplier < maxRetriesMultiplier) {
+                sleepMultiplier++;
+              } else {
+                chooseSinks();
+              }
+            }
+          } while (!stop.get() && down);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted while trying to contact the peer cluster");
+        }
+
+      }
+    }
+  }
+
+  /**
+   * If the queue isn't empty, switch to the next one
+   * Else if this is a recovered queue, it means we're done!
+   * Else we'll just continue to try reading the log file
+   * @return true if we're done with the current file, false if we should
+   * continue trying to read from it
+   */
+  protected boolean processEndOfFile() {
+    this.pathLock.lock();
+    try {
+      if (this.queue.size() != 0) {
+        this.currentPath = null;
+        this.position = 0;
+        return true;
+      } else if (this.queueRecovered) {
+        this.manager.closeRecoveredQueue(this);
+        this.abort();
+        return true;
+      }
+    } finally {
+      this.pathLock.unlock();
+    }
+    return false;
+  }
+
+  public void startup() {
+    String n = Thread.currentThread().getName();
+    Thread.UncaughtExceptionHandler handler =
+        new Thread.UncaughtExceptionHandler() {
+          public void uncaughtException(final Thread t, final Throwable e) {
+            LOG.fatal("Set stop flag in " + t.getName(), e);
+            abort();
+          }
+        };
+    Threads.setDaemonThreadRunning(
+        this, n + ".replicationSource," + clusterId, handler);
+  }
+
+  /**
+   * Hastily stop the replication, then wait for shutdown
+   */
+  private void abort() {
+    LOG.info("abort");
+    this.running = false;
+    terminate();
+  }
+
+  public void terminate() {
+    LOG.info("terminate");
+    Threads.shutdown(this, this.sleepForRetries);
+  }
+
+  /**
+   * Get a new region server at random from this peer
+   * @return
+   * @throws IOException
+   */
+  private HRegionInterface getRS() throws IOException {
+    if (this.currentPeers.size() == 0) {
+      throw new IOException(this.peerClusterZnode + " has 0 region servers");
+    }
+    HServerAddress address =
+        currentPeers.get(random.nextInt(this.currentPeers.size()));
+    return this.conn.getHRegionConnection(address);
+  }
+
+  /**
+   * Check if the slave is down by trying to establish a connection
+   * @return true if down, false if up
+   * @throws InterruptedException
+   */
+  public boolean isSlaveDown() throws InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(1);
+    Thread pingThread = new Thread() {
+      public void run() {
+        try {
+          HRegionInterface rrs = getRS();
+          // Dummy call which should fail
+          rrs.getHServerInfo();
+          latch.countDown();
+        } catch (IOException ex) {
+          LOG.info("Slave cluster looks down: " + ex.getMessage());
+        }
+      }
+    };
+    pingThread.start();
+    // awaits returns true if countDown happened
+    boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
+    pingThread.interrupt();
+    return down;
+  }
+
+  /**
+   * Get the id that the source is replicating to
+   *
+   * @return peer cluster id
+   */
+  public String getPeerClusterZnode() {
+    return this.peerClusterZnode;
+  }
+
+  /**
+   * Get the path of the current HLog
+   * @return current hlog's path
+   */
+  public Path getCurrentPath() {
+    return this.currentPath;
+  }
+
+  /**
+   * Comparator used to compare logs together based on their start time
+   */
+  public static class LogsComparator implements Comparator<Path> {
+
+    @Override
+    public int compare(Path o1, Path o2) {
+      return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return true;
+    }
+
+    /**
+     * Split a path to get the start time
+     * For example: 10.20.20.171%3A60020.1277499063250
+     * @param p path to split
+     * @return start time
+     */
+    private long getTS(Path p) {
+      String[] parts = p.getName().split("\\.");
+      return Long.parseLong(parts[parts.length-1]);
+    }
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java Thu Jul  1 00:25:50 2010
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Interface that defines a replication source
+ */
+public interface ReplicationSourceInterface {
+
+  /**
+   * Initializer for the source
+   * @param conf the configuration to use
+   * @param fs the file system to use
+   * @param manager the manager to use
+   * @param stopper the stopper object for this region server
+   * @param replicating the status of the replication on this cluster
+   * @param peerClusterId the id of the peer cluster
+   * @throws IOException
+   */
+  public void init(final Configuration conf,
+                   final FileSystem fs,
+                   final ReplicationSourceManager manager,
+                   final AtomicBoolean stopper,
+                   final AtomicBoolean replicating,
+                   final String peerClusterId) throws IOException;
+
+  /**
+   * Add a log to the list of logs to replicate
+   * @param log path to the log to replicate
+   */
+  public void enqueueLog(Path log);
+
+  /**
+   * Get the current log that's replicated
+   * @return the current log
+   */
+  public Path getCurrentPath();
+
+  /**
+   * Start the replication
+   */
+  public void startup();
+
+  /**
+   * End the replication
+   */
+  public void terminate();
+
+  /**
+   * Get the id that the source is replicating to
+   *
+   * @return peer cluster id
+   */
+  public String getPeerClusterZnode();
+
+  /**
+   * Notify this source that a log was archived
+   * @param oldPath old path of the log
+   * @param newPath new path of the log (archive)
+   */
+  public void logArchived(Path oldPath, Path newPath);
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Thu Jul  1 00:25:50 2010
@@ -0,0 +1,353 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.LogActionsListener;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class is responsible to manage all the replication
+ * sources. There are two classes of sources:
+ * <li> Normal sources are persistent and one per peer cluster</li>
+ * <li> Old sources are recovered from a failed region server and our
+ * only goal is to finish replicating the HLog queue it had up in ZK</li>
+ *
+ * When a region server dies, this class uses a watcher to get notified and it
+ * tries to grab a lock in order to transfer all the queues in a local
+ * old source.
+ */
+public class ReplicationSourceManager implements LogActionsListener {
+
+  private static final Log LOG =
+      LogFactory.getLog(ReplicationSourceManager.class);
+  // List of all the sources that read this RS's logs
+  private final List<ReplicationSourceInterface> sources;
+  // List of all the sources we got from died RSs
+  private final List<ReplicationSourceInterface> oldsources;
+  // Indicates if we are currently replicating
+  private final AtomicBoolean replicating;
+  // Helper for zookeeper
+  private final ReplicationZookeeperWrapper zkHelper;
+  // Indicates if the region server is closing
+  private final AtomicBoolean stopper;
+  // All logs we are currently trackign
+  private final SortedSet<String> hlogs;
+  private final Configuration conf;
+  private final FileSystem fs;
+  // The path to the latest log we saw, for new coming sources
+  private Path latestPath;
+  // List of all the other region servers in this cluster
+  private final List<String> otherRegionServers;
+  // Path to the hlog archive
+  private final Path oldLogDir;
+
+  /**
+   * Creates a replication manager and sets the watch on all the other
+   * registered region servers
+   * @param zkHelper the zk helper for replication
+   * @param conf the configuration to use
+   * @param stopper the stopper object for this region server
+   * @param fs the file system to use
+   * @param replicating the status of the replication on this cluster
+   * @param oldLogDir the directory where old logs are archived
+   */
+  public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper,
+                                  final Configuration conf,
+                                  final AtomicBoolean stopper,
+                                  final FileSystem fs,
+                                  final AtomicBoolean replicating,
+                                  final Path oldLogDir) {
+    this.sources = new ArrayList<ReplicationSourceInterface>();
+    this.replicating = replicating;
+    this.zkHelper = zkHelper;
+    this.stopper = stopper;
+    this.hlogs = new TreeSet<String>();
+    this.oldsources = new ArrayList<ReplicationSourceInterface>();
+    this.conf = conf;
+    this.fs = fs;
+    this.oldLogDir = oldLogDir;
+    List<String> otherRSs =
+        this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
+    this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
+  }
+
+  /**
+   * Provide the id of the peer and a log key and this method will figure which
+   * hlog it belongs to and will log, for this region server, the current
+   * position. It will also clean old logs from the queue.
+   * @param log Path to the log currently being replicated from
+   * replication status in zookeeper. It will also delete older entries.
+   * @param id id of the peer cluster
+   * @param position current location in the log
+   * @param queueRecovered indicates if this queue comes from another region server
+   */
+  public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
+    String key = log.getName();
+    LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
+    this.zkHelper.writeReplicationStatus(key.toString(), id, position);
+    synchronized (this.hlogs) {
+      if (!queueRecovered && this.hlogs.first() != key) {
+        SortedSet<String> hlogSet = this.hlogs.headSet(key);
+        LOG.info("Removing " + hlogSet.size() +
+            " logs in the list: " + hlogSet);
+        for (String hlog : hlogSet) {
+          this.zkHelper.removeLogFromList(hlog.toString(), id);
+        }
+        hlogSet.clear();
+      }
+    }
+  }
+
+  /**
+   * Adds a normal source per registered peer cluster and tries to process all
+   * old region server hlog queues
+   */
+  public void init() throws IOException {
+    for (String id : this.zkHelper.getPeerClusters().keySet()) {
+      ReplicationSourceInterface src = addSource(id);
+      src.startup();
+    }
+    List<String> currentReplicators = this.zkHelper.getListOfReplicators(null);
+    synchronized (otherRegionServers) {
+      LOG.info("Current list of replicators: " + currentReplicators
+          + " other RSs: " + otherRegionServers);
+    }
+    // Look if there's anything to process after a restart
+    for (String rs : currentReplicators) {
+      synchronized (otherRegionServers) {
+        if (!this.otherRegionServers.contains(rs)) {
+          transferQueues(rs);
+        }
+      }
+    }
+  }
+
+  /**
+   * Add a new normal source to this region server
+   * @param id the id of the peer cluster
+   * @return the created source
+   * @throws IOException
+   */
+  public ReplicationSourceInterface addSource(String id) throws IOException {
+    ReplicationSourceInterface src =
+        getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
+    this.sources.add(src);
+    synchronized (this.hlogs) {
+      if (this.hlogs.size() > 0) {
+        this.zkHelper.addLogToList(this.hlogs.first(),
+            this.sources.get(0).getPeerClusterZnode());
+        src.enqueueLog(this.latestPath);
+      }
+    }
+    return src;
+  }
+
+  /**
+   * Terminate the replication on this region server
+   */
+  public void join() {
+    if (this.sources.size() == 0) {
+      this.zkHelper.deleteOwnRSZNode();
+    }
+    for (ReplicationSourceInterface source : this.sources) {
+      source.terminate();
+    }
+  }
+
+  /**
+   * Get a copy of the hlogs of the first source on this rs
+   * @return a sorted set of hlog names
+   */
+  protected SortedSet<String> getHLogs() {
+    return new TreeSet(this.hlogs);
+  }
+
+  /**
+   * Get a list of all the normal sources of this rs
+   * @return lis of all sources
+   */
+  public List<ReplicationSourceInterface> getSources() {
+    return this.sources;
+  }
+
+  @Override
+  public void logRolled(Path newLog) {
+    if (this.sources.size() > 0) {
+      this.zkHelper.addLogToList(newLog.getName(),
+          this.sources.get(0).getPeerClusterZnode());
+    }
+    synchronized (this.hlogs) {
+      this.hlogs.add(newLog.getName());
+    }
+    this.latestPath = newLog;
+    for (ReplicationSourceInterface source : this.sources) {
+      source.enqueueLog(newLog);
+    }
+  }
+
+  @Override
+  public void logArchived(Path oldPath, Path newPath) {
+    for (ReplicationSourceInterface source : this.sources) {
+      source.logArchived(oldPath, newPath);
+    }
+  }
+
+  /**
+   * Get the ZK help of this manager
+   * @return the helper
+   */
+  public ReplicationZookeeperWrapper getRepZkWrapper() {
+    return zkHelper;
+  }
+
+  /**
+   * Factory method to create a replication source
+   * @param conf the configuration to use
+   * @param fs the file system to use
+   * @param manager the manager to use
+   * @param stopper the stopper object for this region server
+   * @param replicating the status of the replication on this cluster
+   * @param peerClusterId the id of the peer cluster
+   * @return the created source
+   * @throws IOException
+   */
+  public ReplicationSourceInterface getReplicationSource(
+      final Configuration conf,
+      final FileSystem fs,
+      final ReplicationSourceManager manager,
+      final AtomicBoolean stopper,
+      final AtomicBoolean replicating,
+      final String peerClusterId) throws IOException {
+    ReplicationSourceInterface src;
+    try {
+      Class c = Class.forName(conf.get("replication.replicationsource.implementation",
+          ReplicationSource.class.getCanonicalName()));
+      src = (ReplicationSourceInterface) c.newInstance();
+    } catch (Exception e) {
+      LOG.warn("Passed replication source implemention throws errors, " +
+          "defaulting to ReplicationSource", e);
+      src = new ReplicationSource();
+
+    }
+    src.init(conf, fs, manager, stopper, replicating, peerClusterId);
+    return src;
+  }
+
+  /**
+   * Transfer all the queues of the specified to this region server.
+   * First it tries to grab a lock and if it works it will move the
+   * znodes and finally will delete the old znodes.
+   *
+   * It creates one old source for any type of source of the old rs.
+   * @param rsZnode
+   */
+  public void transferQueues(String rsZnode) {
+    // We try to lock that rs' queue directory
+    if (this.stopper.get()) {
+      LOG.info("Not transferring queue since we are shutting down");
+      return;
+    }
+    if (!this.zkHelper.lockOtherRS(rsZnode)) {
+      return;
+    }
+    LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
+    SortedMap<String, SortedSet<String>> newQueues =
+        this.zkHelper.copyQueuesFromRS(rsZnode);
+    if (newQueues == null || newQueues.size() == 0) {
+      return;
+    }
+    this.zkHelper.deleteRsQueues(rsZnode);
+
+    for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
+      String peerId = entry.getKey();
+      try {
+        ReplicationSourceInterface src = getReplicationSource(this.conf,
+            this.fs, this, this.stopper, this.replicating, peerId);
+        this.oldsources.add(src);
+        for (String hlog : entry.getValue()) {
+          src.enqueueLog(new Path(this.oldLogDir, hlog));
+        }
+        src.startup();
+      } catch (IOException e) {
+        // TODO manage it
+        LOG.error("Failed creating a source", e);
+      }
+    }
+  }
+
+  /**
+   * Clear the references to the specified old source
+   * @param src source to clear
+   */
+  public void closeRecoveredQueue(ReplicationSourceInterface src) {
+    LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
+    this.oldsources.remove(src);
+    this.zkHelper.deleteSource(src.getPeerClusterZnode());
+  }
+
+  /**
+   * Watcher used to be notified of the other region server's death
+   * in the local cluster. It initiates the process to transfer the queues
+   * if it is able to grab the lock.
+   */
+  public class OtherRegionServerWatcher implements Watcher {
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+      LOG.info(" event " + watchedEvent);
+      if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
+          watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
+        return;
+      }
+
+      List<String> newRsList = (zkHelper.getRegisteredRegionServers(this));
+      if (newRsList == null) {
+        return;
+      } else {
+        synchronized (otherRegionServers) {
+          otherRegionServers.clear();
+          otherRegionServers.addAll(newRsList);
+        }
+      }
+      if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
+        LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
+        String[] rsZnodeParts = watchedEvent.getPath().split("/");
+        transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
+      }
+    }
+  }
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java Thu Jul  1 00:25:50 2010
@@ -467,6 +467,7 @@ public class ZooKeeperWrapper implements
 
   private HServerAddress readAddress(String znode, Watcher watcher) {
     try {
+      LOG.debug("<" + instanceName + ">" + "Trying to read " + znode);
       return readAddressOrThrow(znode, watcher);
     } catch (IOException e) {
       LOG.debug("<" + instanceName + ">" + "Failed to read " + e.getMessage());
@@ -572,7 +573,7 @@ public class ZooKeeperWrapper implements
     if (recursive) {
       LOG.info("<" + instanceName + ">" + "deleteZNode get children for " + znode);
       List<String> znodes = this.zooKeeper.getChildren(znode, false);
-      if (znodes.size() > 0) {
+      if (znodes != null && znodes.size() > 0) {
         for (String child : znodes) {
           String childFullPath = getZNode(znode, child);
           LOG.info("<" + instanceName + ">" + "deleteZNode recursive call " + childFullPath);
@@ -914,10 +915,10 @@ public class ZooKeeperWrapper implements
     if (failOnWrite || stat == null) {
       this.zooKeeper.create(path, data,
           Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-      LOG.debug("<" + instanceName + ">" + "Created " + path);
+      LOG.debug("<" + instanceName + ">" + "Created " + path + " with data " + strData);
     } else {
       this.zooKeeper.setData(path, data, -1);
-      LOG.debug("<" + instanceName + ">" + "Updated " + path);
+      LOG.debug("<" + instanceName + ">" + "Updated " + path + " with data " + strData);
     }
   }
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu Jul  1 00:25:50 2010
@@ -466,6 +466,7 @@ public class HBaseTestingUtility {
    * @throws IOException
    */
   public int loadTable(final HTable t, final byte[] f) throws IOException {
+    t.setAutoFlush(false);
     byte[] k = new byte[3];
     int rowCount = 0;
     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
@@ -481,6 +482,7 @@ public class HBaseTestingUtility {
         }
       }
     }
+    t.flushCommits();
     return rowCount;
   }
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Thu Jul  1 00:25:50 2010
@@ -525,4 +525,47 @@ public class TestHLog extends HBaseTestC
       }
     }
   }
+
+  /**
+   * Test that we can visit entries before they are appended
+   * @throws Exception
+   */
+  public void testVisitors() throws Exception {
+    final int COL_COUNT = 10;
+    final byte [] tableName = Bytes.toBytes("tablename");
+    final byte [] row = Bytes.toBytes("row");
+    this.conf.setBoolean("dfs.support.append", true);
+    HLog log = new HLog(this.fs, dir, this.oldLogDir, this.conf, null);
+    DumbLogEntriesVisitor visitor = new DumbLogEntriesVisitor();
+    log.addLogEntryVisitor(visitor);
+    long timestamp = System.currentTimeMillis();
+    HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName),
+        HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    for (int i = 0; i < COL_COUNT; i++) {
+      WALEdit cols = new WALEdit();
+      cols.add(new KeyValue(row, Bytes.toBytes("column"),
+          Bytes.toBytes(Integer.toString(i)),
+          timestamp, new byte[]{(byte) (i + '0')}));
+      log.append(hri, tableName, cols, System.currentTimeMillis());
+    }
+    assertEquals(COL_COUNT, visitor.increments);
+    log.removeLogEntryVisitor(visitor);
+    WALEdit cols = new WALEdit();
+    cols.add(new KeyValue(row, Bytes.toBytes("column"),
+        Bytes.toBytes(Integer.toString(11)),
+        timestamp, new byte[]{(byte) (11 + '0')}));
+    log.append(hri, tableName, cols, System.currentTimeMillis());
+    assertEquals(COL_COUNT, visitor.increments);
+  }
+
+  static class DumbLogEntriesVisitor implements LogEntryVisitor {
+
+    int increments = 0;
+
+    @Override
+    public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+                                         WALEdit logEdit) {
+      increments++;
+    }
+  }
 }

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java Thu Jul  1 00:25:50 2010
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Source that does nothing at all, helpful to test ReplicationSourceManager
+ */
+public class ReplicationSourceDummy implements ReplicationSourceInterface {
+
+  ReplicationSourceManager manager;
+  String peerClusterId;
+  Path currentPath;
+
+  @Override
+  public void init(Configuration conf, FileSystem fs,
+                   ReplicationSourceManager manager, AtomicBoolean stopper,
+                   AtomicBoolean replicating, String peerClusterId)
+      throws IOException {
+    this.manager = manager;
+    this.peerClusterId = peerClusterId;
+  }
+
+  @Override
+  public void enqueueLog(Path log) {
+    this.currentPath = log;
+  }
+
+  @Override
+  public Path getCurrentPath() {
+    return this.currentPath;
+  }
+
+  @Override
+  public void startup() {
+
+  }
+
+  @Override
+  public void terminate() {
+
+  }
+
+  @Override
+  public String getPeerClusterZnode() {
+    return peerClusterId;
+  }
+
+  @Override
+  public void logArchived(Path oldPath, Path newPath) {
+  }
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java Thu Jul  1 00:25:50 2010
@@ -0,0 +1,478 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.EmptyWatcher;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestReplication {
+
+  private static final Log LOG = LogFactory.getLog(TestReplication.class);
+
+  private static Configuration conf1;
+  private static Configuration conf2;
+
+  private static ZooKeeperWrapper zkw1;
+  private static ZooKeeperWrapper zkw2;
+
+  private static HTable htable1;
+  private static HTable htable2;
+
+  private static HBaseTestingUtility utility1;
+  private static HBaseTestingUtility utility2;
+  private static final int NB_ROWS_IN_BATCH = 100;
+  private static final long SLEEP_TIME = 500;
+  private static final int NB_RETRIES = 10;
+
+  private static final byte[] tableName = Bytes.toBytes("test");
+  private static final byte[] famName = Bytes.toBytes("f");
+  private static final byte[] row = Bytes.toBytes("row");
+  private static final byte[] noRepfamName = Bytes.toBytes("norep");
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf1 = HBaseConfiguration.create();
+    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+    // smaller block size and capacity to trigger more operations
+    // and test them
+    conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
+    conf1.setInt("replication.source.nb.capacity", 5);
+    conf1.setLong("replication.source.sleepforretries", 100);
+    conf1.setInt("hbase.regionserver.maxlogs", 10);
+    conf1.setLong("hbase.master.logcleaner.ttl", 10);
+    conf1.setLong("hbase.client.retries.number", 4);
+    conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+    conf1.setBoolean("dfs.support.append", true);
+    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+
+    utility1 = new HBaseTestingUtility(conf1);
+    utility1.startMiniZKCluster();
+    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+    zkw1 = ZooKeeperWrapper.createInstance(conf1, "cluster1");
+    zkw1.writeZNode("/1", "replication", "");
+    zkw1.writeZNode("/1/replication", "master",
+        conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+            conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+    setIsReplication(true);
+
+    LOG.info("Setup first Zk");
+
+    conf2 = HBaseConfiguration.create();
+    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+    conf2.setInt("hbase.client.retries.number", 6);
+    conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+    conf2.setBoolean("dfs.support.append", true);
+
+    utility2 = new HBaseTestingUtility(conf2);
+    utility2.setZkCluster(miniZK);
+    zkw2 = ZooKeeperWrapper.createInstance(conf2, "cluster2");
+    zkw2.writeZNode("/2", "replication", "");
+    zkw2.writeZNode("/2/replication", "master",
+        conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+            conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+
+    zkw1.writeZNode("/1/replication/peers", "1",
+        conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+            conf2.get("hbase.zookeeper.property.clientPort")+":/2");
+
+    LOG.info("Setup second Zk");
+
+    utility1.startMiniCluster(2);
+    utility2.startMiniCluster(2);
+
+    HTableDescriptor table = new HTableDescriptor(tableName);
+    table.setDeferredLogFlush(false);
+    HColumnDescriptor fam = new HColumnDescriptor(famName);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    table.addFamily(fam);
+    fam = new HColumnDescriptor(noRepfamName);
+    table.addFamily(fam);
+    HBaseAdmin admin1 = new HBaseAdmin(conf1);
+    HBaseAdmin admin2 = new HBaseAdmin(conf2);
+    admin1.createTable(table);
+    admin2.createTable(table);
+
+    htable1 = new HTable(conf1, tableName);
+    htable1.setWriteBufferSize(1024*5);
+    htable2 = new HTable(conf2, tableName);
+  }
+
+  private static void setIsReplication(boolean rep) throws Exception {
+    LOG.info("Set rep " + rep);
+    zkw1.writeZNode("/1/replication", "state", Boolean.toString(rep));
+    // Takes some ms for ZK to fire the watcher
+    Thread.sleep(SLEEP_TIME);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    setIsReplication(false);
+    utility1.truncateTable(tableName);
+    utility2.truncateTable(tableName);
+    // If test is flaky, set that sleep higher
+    Thread.sleep(SLEEP_TIME*8);
+    setIsReplication(true);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    utility2.shutdownMiniCluster();
+    utility1.shutdownMiniCluster();
+  }
+
+  /**
+   * Add a row, check it's replicated, delete it, check's gone
+   * @throws Exception
+   */
+  @Test
+  public void testSimplePutDelete() throws Exception {
+    LOG.info("testSimplePutDelete");
+    Put put = new Put(row);
+    put.add(famName, row, row);
+
+    htable1 = new HTable(conf1, tableName);
+    htable1.put(put);
+
+    HTable table2 = new HTable(conf2, tableName);
+    Get get = new Get(row);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = table2.get(get);
+      if (res.size() == 0) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(res.value(), row);
+        break;
+      }
+    }
+
+    Delete del = new Delete(row);
+    htable1.delete(del);
+
+    table2 = new HTable(conf2, tableName);
+    get = new Get(row);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for del replication");
+      }
+      Result res = table2.get(get);
+      if (res.size() >= 1) {
+        LOG.info("Row not deleted");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Try a small batch upload using the write buffer, check it's replicated
+   * @throws Exception
+   */
+  @Test
+  public void testSmallBatch() throws Exception {
+    LOG.info("testSmallBatch");
+    Put put;
+    // normal Batch tests
+    htable1.setAutoFlush(false);
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      put = new Put(Bytes.toBytes(i));
+      put.add(famName, row, row);
+      htable1.put(put);
+    }
+    htable1.flushCommits();
+
+    Scan scan = new Scan();
+
+    ResultScanner scanner1 = htable1.getScanner(scan);
+    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
+    scanner1.close();
+    assertEquals(NB_ROWS_IN_BATCH, res1.length);
+
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for normal batch replication");
+      }
+      ResultScanner scanner = htable2.getScanner(scan);
+      Result[] res = scanner.next(NB_ROWS_IN_BATCH);
+      scanner.close();
+      if (res.length != NB_ROWS_IN_BATCH) {
+        LOG.info("Only got " + res.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+
+    htable1.setAutoFlush(true);
+
+  }
+
+  /**
+   * Test stopping replication, trying to insert, make sure nothing's
+   * replicated, enable it, try replicating and it should work
+   * @throws Exception
+   */
+  @Test
+  public void testStartStop() throws Exception {
+
+    // Test stopping replication
+    setIsReplication(false);
+
+    Put put = new Put(Bytes.toBytes("stop start"));
+    put.add(famName, row, row);
+    htable1.put(put);
+
+    Get get = new Get(Bytes.toBytes("stop start"));
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        break;
+      }
+      Result res = htable2.get(get);
+      if(res.size() >= 1) {
+        fail("Replication wasn't stopped");
+
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+
+    // Test restart replication
+    setIsReplication(true);
+
+    htable1.put(put);
+
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if(res.size() == 0) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(res.value(), row);
+        break;
+      }
+    }
+
+    put = new Put(Bytes.toBytes("do not rep"));
+    put.add(noRepfamName, row, row);
+    htable1.put(put);
+
+    get = new Get(Bytes.toBytes("do not rep"));
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES-1) {
+        break;
+      }
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        fail("Not supposed to be replicated");
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+
+  }
+
+  /**
+   * Do a more intense version testSmallBatch, one  that will trigger
+   * hlog rolling and other non-trivial code paths
+   * @throws Exception
+   */
+  @Test
+  public void loadTesting() throws Exception {
+    htable1.setWriteBufferSize(1024);
+    htable1.setAutoFlush(false);
+    for (int i = 0; i < NB_ROWS_IN_BATCH *10; i++) {
+      Put put = new Put(Bytes.toBytes(i));
+      put.add(famName, row, row);
+      htable1.put(put);
+    }
+    htable1.flushCommits();
+
+    Scan scan = new Scan();
+
+    ResultScanner scanner = htable1.getScanner(scan);
+    Result[] res = scanner.next(NB_ROWS_IN_BATCH * 100);
+    scanner.close();
+
+    assertEquals(NB_ROWS_IN_BATCH *10, res.length);
+
+    scan = new Scan();
+
+    for (int i = 0; i < NB_RETRIES; i++) {
+
+      scanner = htable2.getScanner(scan);
+      res = scanner.next(NB_ROWS_IN_BATCH * 100);
+      scanner.close();
+      if (res.length != NB_ROWS_IN_BATCH *10) {
+        if (i == NB_RETRIES-1) {
+          int lastRow = -1;
+          for (Result result : res) {
+            int currentRow = Bytes.toInt(result.getRow());
+            for (int row = lastRow+1; row < currentRow; row++) {
+              LOG.error("Row missing: " + row);
+            }
+            lastRow = currentRow;
+          }
+          LOG.error("Last row: " + lastRow);
+          fail("Waited too much time for normal batch replication, "
+              + res.length + " instead of " + NB_ROWS_IN_BATCH *10);
+        } else {
+          LOG.info("Only got " + res.length + " rows");
+          Thread.sleep(SLEEP_TIME);
+        }
+      } else {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Load up multiple tables over 2 region servers and kill a source during
+   * the upload. The failover happens internally.
+   * @throws Exception
+   */
+  @Test
+  public void queueFailover() throws Exception {
+    utility1.createMultiRegions(htable1, famName);
+
+    // killing the RS with .META. can result into failed puts until we solve
+    // IO fencing
+    int rsToKill1 =
+        utility1.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
+    int rsToKill2 =
+        utility2.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
+
+    // Takes about 20 secs to run the full loading, kill around the middle
+    Thread killer1 = killARegionServer(utility1, 7500, rsToKill1);
+    Thread killer2 = killARegionServer(utility2, 10000, rsToKill2);
+
+    LOG.info("Start loading table");
+    int initialCount = utility1.loadTable(htable1, famName);
+    LOG.info("Done loading table");
+    killer1.join(5000);
+    killer2.join(5000);
+    LOG.info("Done waiting for threads");
+
+    Result[] res;
+    while (true) {
+      try {
+        Scan scan = new Scan();
+        ResultScanner scanner = htable1.getScanner(scan);
+        res = scanner.next(initialCount);
+        scanner.close();
+        break;
+      } catch (UnknownScannerException ex) {
+        LOG.info("Cluster wasn't ready yet, restarting scanner");
+      }
+    }
+    // Test we actually have all the rows, we may miss some because we
+    // don't have IO fencing.
+    if (res.length != initialCount) {
+      LOG.warn("We lost some rows on the master cluster!");
+      // We don't really expect the other cluster to have more rows
+      initialCount = res.length;
+    }
+
+    Scan scan2 = new Scan();
+
+    int lastCount = 0;
+
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for queueFailover replication");
+      }
+      ResultScanner scanner2 = htable2.getScanner(scan2);
+      Result[] res2 = scanner2.next(initialCount * 2);
+      scanner2.close();
+      if (res2.length < initialCount) {
+        if (lastCount < res2.length) {
+          i--; // Don't increment timeout if we make progress
+        }
+        lastCount = res2.length;
+        LOG.info("Only got " + lastCount + " rows instead of " +
+            initialCount + " current i=" + i);
+        Thread.sleep(SLEEP_TIME*2);
+      } else {
+        break;
+      }
+    }
+  }
+
+  private static Thread killARegionServer(final HBaseTestingUtility utility,
+                                   final long timeout, final int rs) {
+    Thread killer = new Thread() {
+      public void run() {
+        try {
+          Thread.sleep(timeout);
+          utility.expireRegionServerSession(rs);
+        } catch (Exception e) {
+          LOG.error(e);
+        }
+      }
+    };
+    killer.start();
+    return killer;
+  }
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java Thu Jul  1 00:25:50 2010
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestReplicationSource {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestReplicationSource.class);
+  private final static HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+  private static FileSystem fs;
+  private static Path oldLogDir;
+  private static Path logDir;
+  private static Configuration conf = HBaseConfiguration.create();
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniDFSCluster(1);
+    fs = TEST_UTIL.getDFSCluster().getFileSystem();
+    oldLogDir = new Path(fs.getHomeDirectory(),
+        HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(fs.getHomeDirectory(),
+        HConstants.HREGION_LOGDIR_NAME);
+  }
+
+  /**
+   * Sanity check that we can move logs around while we are reading
+   * from them. Should this test fail, ReplicationSource would have a hard
+   * time reading logs that are being archived.
+   * @throws Exception
+   */
+  @Test
+  public void testLogMoving() throws Exception{
+    Path logPath = new Path(logDir, "log");
+    HLog.Writer writer = HLog.createWriter(fs, logPath, conf);
+    for(int i = 0; i < 3; i++) {
+      byte[] b = Bytes.toBytes(Integer.toString(i));
+      KeyValue kv = new KeyValue(b,b,b);
+      WALEdit edit = new WALEdit();
+      edit.add(kv);
+      HLogKey key = new HLogKey(b, b, 0, 0);
+      writer.append(new HLog.Entry(key, edit));
+      writer.sync();
+    }
+    writer.close();
+
+    HLog.Reader reader = HLog.getReader(fs, logPath, conf);
+    HLog.Entry entry = reader.next();
+    assertNotNull(entry);
+
+    Path oldLogPath = new Path(oldLogDir, "log");
+    fs.rename(logPath, oldLogPath);
+
+    entry = reader.next();
+    assertNotNull(entry);
+
+    entry = reader.next();
+    entry = reader.next();
+
+    assertNull(entry);
+
+  }
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java Thu Jul  1 00:25:50 2010
@@ -0,0 +1,248 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestReplicationSink {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestReplicationSink.class);
+
+  private static final int BATCH_SIZE = 10;
+
+  private static final long SLEEP_TIME = 500;
+
+  private final static Configuration conf = HBaseConfiguration.create();
+
+  private final static HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+
+  private static ReplicationSink SINK;
+
+  private static final byte[] TABLE_NAME1 =
+      Bytes.toBytes("table1");
+  private static final byte[] TABLE_NAME2 =
+      Bytes.toBytes("table2");
+
+  private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
+  private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
+
+  private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
+
+  private static HTable table1;
+
+  private static HTable table2;
+
+   /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
+    TEST_UTIL.getConfiguration().setBoolean(
+        HConstants.REPLICATION_ENABLE_KEY, true);
+    TEST_UTIL.startMiniCluster(3);
+    conf.setBoolean("dfs.support.append", true);
+    SINK = new ReplicationSink(conf,STOPPER);
+    table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
+    table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    STOPPER.set(true);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    table1 = TEST_UTIL.truncateTable(TABLE_NAME1);
+    table2 = TEST_UTIL.truncateTable(TABLE_NAME2);
+    Thread.sleep(SLEEP_TIME);
+  }
+
+  /**
+   * Insert a whole batch of entries
+   * @throws Exception
+   */
+  @Test
+  public void testBatchSink() throws Exception {
+    HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
+    for(int i = 0; i < BATCH_SIZE; i++) {
+      entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+    }
+    SINK.replicateEntries(entries);
+    Scan scan = new Scan();
+    ResultScanner scanRes = table1.getScanner(scan);
+    assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
+  }
+
+  /**
+   * Insert a mix of puts and deletes
+   * @throws Exception
+   */
+  @Test
+  public void testMixedPutDelete() throws Exception {
+    HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2];
+    for(int i = 0; i < BATCH_SIZE/2; i++) {
+      entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+    }
+    SINK.replicateEntries(entries);
+
+    entries = new HLog.Entry[BATCH_SIZE];
+    for(int i = 0; i < BATCH_SIZE; i++) {
+      entries[i] = createEntry(TABLE_NAME1, i,
+          i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn);
+    }
+
+    SINK.replicateEntries(entries);
+    Scan scan = new Scan();
+    ResultScanner scanRes = table1.getScanner(scan);
+    assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
+  }
+
+  /**
+   * Insert to 2 different tables
+   * @throws Exception
+   */
+  @Test
+  public void testMixedPutTables() throws Exception {
+    HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
+    for(int i = 0; i < BATCH_SIZE; i++) {
+      entries[i] =
+          createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
+              i, KeyValue.Type.Put);
+    }
+
+    SINK.replicateEntries(entries);
+    Scan scan = new Scan();
+    ResultScanner scanRes = table2.getScanner(scan);
+    for(Result res : scanRes) {
+      assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
+    }
+  }
+
+  /**
+   * Insert then do different types of deletes
+   * @throws Exception
+   */
+  @Test
+  public void testMixedDeletes() throws Exception {
+    HLog.Entry[] entries = new HLog.Entry[3];
+    for(int i = 0; i < 3; i++) {
+      entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+    }
+    SINK.replicateEntries(entries);
+    entries = new HLog.Entry[3];
+
+    entries[0] = createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn);
+    entries[1] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
+    entries[2] = createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn);
+
+    SINK.replicateEntries(entries);
+
+    Scan scan = new Scan();
+    ResultScanner scanRes = table1.getScanner(scan);
+    assertEquals(0, scanRes.next(3).length);
+  }
+
+  /**
+   * Puts are buffered, but this tests when a delete (not-buffered) is applied
+   * before the actual Put that creates it.
+   * @throws Exception
+   */
+  @Test
+  public void testApplyDeleteBeforePut() throws Exception {
+    HLog.Entry[] entries = new HLog.Entry[5];
+    for(int i = 0; i < 2; i++) {
+      entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+    }
+    entries[2] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
+    for(int i = 3; i < 5; i++) {
+      entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+    }
+    SINK.replicateEntries(entries);
+    Get get = new Get(Bytes.toBytes(1));
+    Result res = table1.get(get);
+    assertEquals(0, res.size());
+  }
+
+  private HLog.Entry createEntry(byte [] table, int row,  KeyValue.Type type) {
+    byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
+    byte[] rowBytes = Bytes.toBytes(row);
+    // Just make sure we don't get the same ts for two consecutive rows with
+    // same key
+    try {
+      Thread.sleep(1);
+    } catch (InterruptedException e) {
+      LOG.info("Was interrupted while sleep, meh", e);
+    }
+    final long now = System.currentTimeMillis();
+    KeyValue kv = null;
+    if(type.getCode() == KeyValue.Type.Put.getCode()) {
+      kv = new KeyValue(rowBytes, fam, fam, now,
+          KeyValue.Type.Put, Bytes.toBytes(row));
+    } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
+        kv = new KeyValue(rowBytes, fam, fam,
+            now, KeyValue.Type.DeleteColumn);
+    } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
+        kv = new KeyValue(rowBytes, fam, null,
+            now, KeyValue.Type.DeleteFamily);
+    }
+
+    HLogKey key = new HLogKey(table, table, now, now);
+
+    WALEdit edit = new WALEdit();
+    edit.add(kv);
+
+    return new HLog.Entry(key, edit);
+  }
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Thu Jul  1 00:25:50 2010
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.URLEncoder;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestReplicationSourceManager {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
+
+  private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWrapper zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final byte[] test = Bytes.toBytes("test");
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+        ReplicationSourceDummy.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = ZooKeeperWrapper.createInstance(conf, "test");
+    zkw.writeZNode("/hbase", "replication", "");
+    zkw.writeZNode("/hbase/replication", "master",
+        conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+    conf.get("hbase.zookeeper.property.clientPort")+":/1");
+    zkw.writeZNode("/hbase/replication/peers", "1",
+          conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+          conf.get("hbase.zookeeper.property.clientPort")+":/1");
+
+    HRegionServer server = new HRegionServer(conf);
+    ReplicationZookeeperWrapper helper = new ReplicationZookeeperWrapper(
+        server.getZooKeeperWrapper(), conf,
+        REPLICATING, "123456789");
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getTestDir(),
+        HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getTestDir(),
+        HConstants.HREGION_LOGDIR_NAME);
+
+    manager = new ReplicationSourceManager(helper,
+        conf, STOPPER, fs, REPLICATING, oldLogDir);
+    manager.addSource("1");
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor("f1");
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor("f2");
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd, r1, r2);
+
+
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    manager.join();
+    utility.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    setUp();
+  }
+
+  @Test
+  public void testLogRoll() throws Exception {
+    long seq = 0;
+    long baseline = 1000;
+    long time = baseline;
+    KeyValue kv = new KeyValue(r1, f1, r1);
+    WALEdit edit = new WALEdit();
+    edit.add(kv);
+
+    HLog hlog = new HLog(fs, logDir, oldLogDir, conf, null, manager,
+      URLEncoder.encode("regionserver:60020", "UTF8"));
+
+    manager.init();
+
+    // Testing normal log rolling every 20
+    for(long i = 1; i < 101; i++) {
+      if(i > 1 && i % 20 == 0) {
+        hlog.rollWriter();
+      }
+      LOG.info(i);
+      HLogKey key = new HLogKey(hri.getRegionName(),
+        test, seq++, System.currentTimeMillis());
+      hlog.append(hri, key, edit);
+    }
+
+    // Simulate a rapid insert that's followed
+    // by a report that's still not totally complete (missing last one)
+    LOG.info(baseline + " and " + time);
+    baseline += 101;
+    time = baseline;
+    LOG.info(baseline + " and " + time);
+
+    for (int i = 0; i < 3; i++) {
+      HLogKey key = new HLogKey(hri.getRegionName(),
+        test, seq++, System.currentTimeMillis());
+      hlog.append(hri, key, edit);
+    }
+
+    assertEquals(6, manager.getHLogs().size());
+
+    hlog.rollWriter();
+
+    manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
+        "1", 0, false);
+
+    HLogKey key = new HLogKey(hri.getRegionName(),
+          test, seq++, System.currentTimeMillis());
+    hlog.append(hri, key, edit);
+
+    assertEquals(1, manager.getHLogs().size());
+
+
+    // TODO Need a case with only 2 HLogs and we only want to delete the first one
+  }
+
+}