You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2010/07/01 02:25:51 UTC

svn commit: r959479 [1/2] - in /hbase/trunk: ./ bin/replication/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/mai...

Author: jdcryans
Date: Thu Jul  1 00:25:50 2010
New Revision: 959479

URL: http://svn.apache.org/viewvc?rev=959479&view=rev
Log:
HBASE-2223  Handle 10min+ network partitions between clusters

Added:
    hbase/trunk/bin/replication/
    hbase/trunk/bin/replication/add_peer.rb
    hbase/trunk/bin/replication/copy_tables_desc.rb
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/pom.xml
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu Jul  1 00:25:50 2010
@@ -793,6 +793,7 @@ Release 0.21.0 - Unreleased
                (Jeff Hammerbacher via Ryan Rawson)
    HBASE-7     Provide a HBase checker and repair tool similar to fsck
                (dhruba borthakur via Stack)
+   HBASE-2223  Handle 10min+ network partitions between clusters
 
   OPTIMIZATIONS
    HBASE-410   [testing] Speed up the test suite

Added: hbase/trunk/bin/replication/add_peer.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/bin/replication/add_peer.rb?rev=959479&view=auto
==============================================================================
--- hbase/trunk/bin/replication/add_peer.rb (added)
+++ hbase/trunk/bin/replication/add_peer.rb Thu Jul  1 00:25:50 2010
@@ -0,0 +1,75 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Script to add a peer to a cluster
+# To see usage for this script, run:
+#
+#  ${HBASE_HOME}/bin/hbase org.jruby.Main add_peer.rb
+#
+
+include Java
+import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.hbase.EmptyWatcher
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
+
+# Name of this script
+NAME = "add_peer"
+
+# Print usage for this script
+def usage
+  puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
+  exit!
+end
+
+if ARGV.size != 2
+  usage
+end
+
+LOG = LogFactory.getLog(NAME)
+
+parts1 = ARGV[0].split(":")
+
+c2 = HBaseConfiguration.create()
+parts2 = ARGV[1].split(":")
+
+c1 = HBaseConfiguration.create()
+c1.set(HConstants::ZOOKEEPER_QUORUM, parts1[0])
+c1.set("hbase.zookeeper.property.clientPort", parts1[1])
+c1.set(HConstants::ZOOKEEPER_ZNODE_PARENT, parts1[2])
+
+zkw1 = ZooKeeperWrapper.createInstance(c1, "ZK1")
+zkw1.writeZNode(parts1[2], "replication", "a")
+zkw1.writeZNode(parts1[2] + "/replication", "master", ARGV[0]);
+zkw1.writeZNode(parts1[2] + "/replication", "state", "true");
+zkw1.writeZNode(parts1[2] + "/replication/peers", "test", ARGV[1]);
+
+
+c2.set(HConstants::ZOOKEEPER_QUORUM, parts2[0])
+c2.set("hbase.zookeeper.property.clientPort", parts2[1])
+c2.set(HConstants::ZOOKEEPER_ZNODE_PARENT, parts2[2])
+
+zkw2 = ZooKeeperWrapper.createInstance(c2, "ZK2")
+zkw2.writeZNode(parts2[2], "replication", "a")
+zkw2.writeZNode(parts2[2] + "/replication", "master", ARGV[0]);
+
+puts "Peer successfully added"

Added: hbase/trunk/bin/replication/copy_tables_desc.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/bin/replication/copy_tables_desc.rb?rev=959479&view=auto
==============================================================================
--- hbase/trunk/bin/replication/copy_tables_desc.rb (added)
+++ hbase/trunk/bin/replication/copy_tables_desc.rb Thu Jul  1 00:25:50 2010
@@ -0,0 +1,75 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Script to recreate all tables from one cluster to another
+# To see usage for this script, run:
+#
+#  ${HBASE_HOME}/bin/hbase org.jruby.Main copy_tables_desc.rb
+#
+
+include Java
+import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.hbase.EmptyWatcher
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import org.apache.hadoop.hbase.HTableDescriptor
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
+
+# Name of this script
+NAME = "copy_tables_desc"
+
+# Print usage for this script
+def usage
+  puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
+  exit!
+end
+
+if ARGV.size != 2
+  usage
+end
+
+LOG = LogFactory.getLog(NAME)
+
+parts1 = ARGV[0].split(":")
+
+parts2 = ARGV[1].split(":")
+
+c1 = HBaseConfiguration.create()
+c1.set(HConstants::ZOOKEEPER_QUORUM, parts1[0])
+c1.set("hbase.zookeeper.property.clientPort", parts1[1])
+c1.set(HConstants::ZOOKEEPER_ZNODE_PARENT, parts1[2])
+
+admin1 = HBaseAdmin.new(c1)
+
+c2 = HBaseConfiguration.create()
+c2.set(HConstants::ZOOKEEPER_QUORUM, parts2[0])
+c2.set("hbase.zookeeper.property.clientPort", parts2[1])
+c2.set(HConstants::ZOOKEEPER_ZNODE_PARENT, parts2[2])
+
+admin2 = HBaseAdmin.new(c2)
+
+for t in admin1.listTables()
+  admin2.createTable(t)
+end
+
+
+puts "All descriptions were copied"

Modified: hbase/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/pom.xml?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/pom.xml (original)
+++ hbase/trunk/pom.xml Thu Jul  1 00:25:50 2010
@@ -444,7 +444,7 @@
     <compileSource>1.6</compileSource>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <hbase.version>0.21.0-SNAPSHOT</hbase.version>
-    <hadoop.version>0.20.3-append-r956776+1240</hadoop.version>
+    <hadoop.version>0.20.3-append-r956776+1240+tail</hadoop.version>
 
     <commons-cli.version>1.2</commons-cli.version>
     <commons-logging.version>1.1.1</commons-logging.version>

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Jul  1 00:25:50 2010
@@ -345,6 +345,9 @@ public final class HConstants {
    */
   public static long DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD = 60000;
 
+  public static final String
+      REPLICATION_ENABLE_KEY = "hbase.replication";
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Thu Jul  1 00:25:50 2010
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
 
 import java.io.IOException;
 import java.util.List;
@@ -279,10 +280,22 @@ public interface HRegionInterface extend
    * @throws IOException e
    */
   public MultiPutResponse multiPut(MultiPut puts) throws IOException;
-  
+
   /**
    * Bulk load an HFile into an open region
    */
   public void bulkLoadHFile(String hfilePath,
       byte[] regionName, byte[] familyName) throws IOException;
+
+  /**
+   * Replicates the given entries. The guarantee is that the given entries
+   * will be durable on the slave cluster if this method returns without
+   * any exception.
+   * hbase.replication has to be set to true for this to work.
+   *
+   * @param entries entries to replicate
+   * @throws IOException
+   */
+  public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Thu Jul  1 00:25:50 2010
@@ -1023,6 +1023,7 @@ public class HMaster extends Thread impl
           byte [] rowKey = ((ImmutableBytesWritable)args[0]).get();
           pair = getTableRegionForRow(tableName, rowKey);
         }
+        LOG.info("About to " + op.toString() + " on " + Bytes.toString(tableName) + " and pair is " + pair);
         if (pair != null && pair.getSecond() != null) {
           this.regionManager.startAction(pair.getFirst().getRegionName(),
             pair.getFirst(), pair.getSecond(), op);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java Thu Jul  1 00:25:50 2010
@@ -27,7 +27,9 @@ import org.apache.hadoop.fs.Path;
  * Interface for the log cleaning function inside the master. Only 1 is called
  * so if the desired effect is the mix of many cleaners, do call them yourself
  * in order to control the flow.
- * HBase ships with OldLogsCleaner as the default implementation
+ * HBase ships with OldLogsCleaner as the default implementation.
+ * This interface extends Configurable, so setConf needs to be called once
+ * before using the cleaner.
  */
 public interface LogCleanerDelegate extends Configurable {
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java Thu Jul  1 00:25:50 2010
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 
@@ -62,6 +63,13 @@ public class OldLogsCleaner extends Chor
                         Configuration conf, FileSystem fs,
                         Path oldLogDir) {
     super("OldLogsCleaner", p, s);
+    // Use the log cleaner provided by replication if enabled, unless something
+    // was already provided
+    if (conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false) &&
+        conf.get("hbase.master.logcleanerplugin.impl") == null) {
+      conf.set("hbase.master.logcleanerplugin.impl",
+          "org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner");
+    }
     this.maxDeletedLogs =
         conf.getInt("hbase.master.logcleaner.maxdeletedlogs", 20);
     this.fs = fs;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Jul  1 00:25:50 2010
@@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.ipc.HMast
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.InfoServer;
@@ -236,6 +237,10 @@ public class HRegionServer implements HR
 
   private final String machineName;
 
+  // Replication-related attributes
+  private Replication replicationHandler;
+  // End of replication
+
   /**
    * Starts a HRegionServer at the default location
    * @param conf
@@ -913,15 +918,18 @@ public class HRegionServer implements HR
         "running at " + this.serverInfo.getServerName() +
         " because logdir " + logdir.toString() + " exists");
     }
-    HLog newlog = instantiateHLog(logdir, oldLogDir);
-    return newlog;
+    this.replicationHandler = new Replication(this.conf,this.serverInfo,
+        this.fs, oldLogDir, stopRequested);
+    HLog log = instantiateHLog(logdir, oldLogDir);
+    this.replicationHandler.addLogEntryVisitor(log);
+    return log;
   }
 
   // instantiate
   protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
-    HLog newlog = new HLog(fs, logdir, oldLogDir, conf, hlogRoller, null,
-        serverInfo.getServerAddress().toString());
-    return newlog;
+    return new HLog(this.fs, logdir, oldLogDir, this.conf, this.hlogRoller,
+      this.replicationHandler.getReplicationManager(),
+        this.serverInfo.getServerAddress().toString());
   }
 
 
@@ -1046,12 +1054,14 @@ public class HRegionServer implements HR
           port++;
           // update HRS server info port.
           this.serverInfo = new HServerInfo(this.serverInfo.getServerAddress(),
-            this.serverInfo.getStartCode(),  port,
+            this.serverInfo.getStartCode(), port,
             this.serverInfo.getHostname());
         }
       }
     }
 
+    this.replicationHandler.startReplicationServices();
+
     // Start Server.  This service is like leases in that it internally runs
     // a thread.
     this.server.start();
@@ -1140,7 +1150,7 @@ public class HRegionServer implements HR
     this.abortRequested = true;
     this.reservedSpace.clear();
     if (this.metrics != null) {
-      LOG.info("Dump of metrics: " + this.metrics.toString());
+      LOG.info("Dump of metrics: " + this.metrics);
     }
     stop();
   }
@@ -1172,6 +1182,7 @@ public class HRegionServer implements HR
     Threads.shutdown(this.cacheFlusher);
     Threads.shutdown(this.compactSplitThread);
     Threads.shutdown(this.hlogRoller);
+    this.replicationHandler.join();
   }
 
   private boolean getMaster() {
@@ -2444,6 +2455,11 @@ public class HRegionServer implements HR
     }
   }
 
+  @Override
+  public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+    this.replicationHandler.replicateLogEntries(entries);
+  }
+
   /**
    * Do class main.
    * @param args

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Thu Jul  1 00:25:50 2010
@@ -36,11 +36,13 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -117,6 +119,7 @@ import com.google.common.util.concurrent
  */
 public class HLog implements Syncable {
   static final Log LOG = LogFactory.getLog(HLog.class);
+  private static final String HLOG_DATFILE = "hlog.dat.";
   public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
   static final byte [] METAROW = Bytes.toBytes("METAROW");
   private final FileSystem fs;
@@ -219,6 +222,9 @@ public class HLog implements Syncable {
    */
   private final LogSyncer logSyncerThread;
 
+  private final List<LogEntryVisitor> logEntryVisitors =
+      new CopyOnWriteArrayList<LogEntryVisitor>();
+
   /**
    * Pattern used to validate a HLog file name
    */
@@ -1028,6 +1034,11 @@ public class HLog implements Syncable {
     if (!this.enabled) {
       return;
     }
+    if (!this.logEntryVisitors.isEmpty()) {
+      for (LogEntryVisitor visitor : this.logEntryVisitors) {
+        visitor.visitLogEntryBeforeWrite(info, logKey, logEdit);
+      }
+    }
     try {
       long now = System.currentTimeMillis();
       this.writer.append(new HLog.Entry(logKey, logEdit));
@@ -1179,8 +1190,16 @@ public class HLog implements Syncable {
       srcDir.toString());
     splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
     try {
-      LOG.info("Spliting is done. Removing old log dir "+srcDir);
-      fs.delete(srcDir, false);
+      FileStatus[] files = fs.listStatus(srcDir);
+      for(FileStatus file : files) {
+        Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
+        LOG.info("Moving " +  FSUtils.getPath(file.getPath()) + " to " +
+                   FSUtils.getPath(newPath));
+        fs.rename(file.getPath(), newPath);
+      }
+      LOG.debug("Moved " + files.length + " log files to " +
+        FSUtils.getPath(oldLogDir));
+      fs.delete(srcDir, true);
     } catch (IOException e) {
       e = RemoteExceptionHandler.checkIOException(e);
       IOException io = new IOException("Cannot delete: " + srcDir);
@@ -1632,11 +1651,21 @@ public class HLog implements Syncable {
     return new Path(regionDir, RECOVERED_EDITS);
    }
 
+  /**
+   *
+   * @param visitor
+   */
+  public void addLogEntryVisitor(LogEntryVisitor visitor) {
+    this.logEntryVisitors.add(visitor);
+  }
 
-
-
-
-
+  /**
+   * 
+   * @param visitor
+   */
+  public void removeLogEntryVisitor(LogEntryVisitor visitor) {
+    this.logEntryVisitors.remove(visitor);
+  }
 
 
   public void addLogActionsListerner(LogActionsListener list) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Thu Jul  1 00:25:50 2010
@@ -46,7 +46,6 @@ public class HLogKey implements Writable
   private long writeTime;
 
   private byte clusterId;
-  private int scope;
 
   /** Writable Consructor -- Do not use. */
   public HLogKey() {
@@ -70,7 +69,6 @@ public class HLogKey implements Writable
     this.logSeqNum = logSeqNum;
     this.writeTime = now;
     this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
-    this.scope = HConstants.REPLICATION_SCOPE_LOCAL;
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -119,22 +117,6 @@ public class HLogKey implements Writable
     this.clusterId = clusterId;
   }
 
-  /**
-   * Get the replication scope of this key
-   * @return replication scope
-   */
-  public int getScope() {
-    return this.scope;
-  }
-
-  /**
-   * Set the replication scope of this key
-   * @param scope The new scope
-   */
-  public void setScope(int scope) {
-    this.scope = scope;
-  }
-
   @Override
   public String toString() {
     return Bytes.toString(tablename) + "/" + Bytes.toString(regionName) + "/" +
@@ -158,7 +140,6 @@ public class HLogKey implements Writable
     result ^= this.logSeqNum;
     result ^= this.writeTime;
     result ^= this.clusterId;
-    result ^= this.scope;
     return result;
   }
 
@@ -187,7 +168,6 @@ public class HLogKey implements Writable
     out.writeLong(this.logSeqNum);
     out.writeLong(this.writeTime);
     out.writeByte(this.clusterId);
-    out.writeInt(this.scope);
   }
 
   public void readFields(DataInput in) throws IOException {
@@ -197,7 +177,6 @@ public class HLogKey implements Writable
     this.writeTime = in.readLong();
     try {
       this.clusterId = in.readByte();
-      this.scope = in.readInt();
     } catch(EOFException e) {
       // Means it's an old key, just continue
     }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java Thu Jul  1 00:25:50 2010
@@ -0,0 +1,15 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+
+public interface LogEntryVisitor {
+
+  /**
+   *
+   * @param info
+   * @param logKey
+   * @param logEdit
+   */
+  public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+                                       WALEdit logEdit);
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java Thu Jul  1 00:25:50 2010
@@ -0,0 +1,493 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class serves as a helper for all things related to zookeeper
+ * in replication.
+ * <p/>
+ * The layout looks something like this under zookeeper.znode.parent
+ * for the master cluster:
+ * <p/>
+ * <pre>
+ * replication/
+ *  master     {contains a full cluster address}
+ *  state      {contains true or false}
+ *  clusterId  {contains a byte}
+ *  peers/
+ *    1/   {contains a full cluster address}
+ *    2/
+ *    ...
+ *  rs/ {lists all RS that replicate}
+ *    startcode1/ {lists all peer clusters}
+ *      1/ {lists hlogs to process}
+ *        10.10.1.76%3A53488.123456789 {contains nothing or a position}
+ *        10.10.1.76%3A53488.123456790
+ *        ...
+ *      2/
+ *      ...
+ *    startcode2/
+ *    ...
+ * </pre>
+ */
+public class ReplicationZookeeperWrapper {
+
+  private static final Log LOG =
+      LogFactory.getLog(ReplicationZookeeperWrapper.class);
+  // Name of znode we use to lock when failover
+  private final static String RS_LOCK_ZNODE = "lock";
+  // Our handle on zookeeper
+  private final ZooKeeperWrapper zookeeperWrapper;
+  // Map of addresses of peer clusters with their ZKW
+  private final Map<String, ZooKeeperWrapper> peerClusters;
+  // Path to the root replication znode
+  private final String replicationZNode;
+  // Path to the peer clusters znode
+  private final String peersZNode;
+  // Path to the znode that contains all RS that replicates
+  private final String rsZNode;
+  // Path to this region server's name under rsZNode
+  private final String rsServerNameZnode;
+  // Name node if the replicationState znode
+  private final String replicationStateNodeName;
+  // If this RS is part of a master cluster
+  private final boolean replicationMaster;
+  private final Configuration conf;
+  // Is this cluster replicating at the moment?
+  private final AtomicBoolean replicating;
+  // Byte (stored as string here) that identifies this cluster
+  private final String clusterId;
+
+  /**
+   * Constructor used by region servers, connects to the peer cluster right away.
+   *
+   * @param zookeeperWrapper zkw to wrap
+   * @param conf             conf to use
+   * @param replicating    atomic boolean to start/stop replication
+   * @param rsName      the name of this region server, null if
+   *                         using RZH only to use the helping methods
+   * @throws IOException
+   */
+  public ReplicationZookeeperWrapper(
+      ZooKeeperWrapper zookeeperWrapper, Configuration conf,
+      final AtomicBoolean replicating, String rsName) throws IOException {
+    this.zookeeperWrapper = zookeeperWrapper;
+    this.conf = conf;
+    String replicationZNodeName =
+        conf.get("zookeeper.znode.replication", "replication");
+    String peersZNodeName =
+        conf.get("zookeeper.znode.replication.peers", "peers");
+    String repMasterZNodeName =
+        conf.get("zookeeper.znode.replication.master", "master");
+    this.replicationStateNodeName =
+        conf.get("zookeeper.znode.replication.state", "state");
+    String clusterIdZNodeName =
+        conf.get("zookeeper.znode.replication.clusterId", "clusterId");
+    String rsZNodeName =
+        conf.get("zookeeper.znode.replication.rs", "rs");
+    String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
+          this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
+          this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+
+    this.peerClusters = new HashMap<String, ZooKeeperWrapper>();
+    this.replicationZNode = zookeeperWrapper.getZNode(
+        zookeeperWrapper.getParentZNode(), replicationZNodeName);
+    this.peersZNode =
+        zookeeperWrapper.getZNode(replicationZNode, peersZNodeName);
+    this.rsZNode =
+        zookeeperWrapper.getZNode(replicationZNode, rsZNodeName);
+
+    this.replicating = replicating;
+    setReplicating();
+    String idResult = Bytes.toString(
+        this.zookeeperWrapper.getData(this.replicationZNode,
+        clusterIdZNodeName));
+    this.clusterId =
+        idResult == null ?
+            Byte.toString(HConstants.DEFAULT_CLUSTER_ID) : idResult;
+    String address = Bytes.toString(
+        this.zookeeperWrapper.getData(this.replicationZNode,
+          repMasterZNodeName));
+    this.replicationMaster = thisCluster.equals(address);
+    LOG.info("This cluster (" + thisCluster + ") is a "
+          + (this.replicationMaster ? "master" : "slave") + " for replication" +
+          ", compared with (" + address + ")");
+    if (rsName != null) {
+      this.rsServerNameZnode =
+          this.zookeeperWrapper.getZNode(rsZNode, rsName);
+      List<String> znodes = this.zookeeperWrapper.listZnodes(this.peersZNode,
+          new ReplicationStatusWatcher());
+      if (znodes != null) {
+        for (String znode : znodes) {
+          connectToPeer(znode);
+        }
+      }
+    } else {
+      this.rsServerNameZnode = null;
+    }
+
+  }
+
+  /**
+   * Returns all region servers from given peer
+   *
+   * @param peerClusterId (byte) the cluster to interrogate
+   * @return addresses of all region servers
+   */
+  public List<HServerAddress> getPeersAddresses(String peerClusterId) {
+    if (this.peerClusters.size() == 0) {
+      return new ArrayList<HServerAddress>(0);
+    }
+    ZooKeeperWrapper zkw = this.peerClusters.get(peerClusterId);
+    return zkw == null?
+        new ArrayList<HServerAddress>(0) : zkw.scanRSDirectory();
+  }
+
+  /**
+   * This method connects this cluster to another one and registers it
+   * in this region server's replication znode
+   * @param peerId id of the peer cluster
+   */
+  private void connectToPeer(String peerId) throws IOException {
+    String[] ensemble =
+        Bytes.toString(this.zookeeperWrapper.getData(this.peersZNode, peerId)).
+            split(":");
+    if (ensemble.length != 3) {
+      throw new IllegalArgumentException("Wrong format of cluster address: " +
+          this.zookeeperWrapper.getData(this.peersZNode, peerId));
+    }
+    Configuration otherConf = new Configuration(this.conf);
+    otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
+    otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
+    otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
+    ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
+        "connection to cluster: " + peerId);
+    zkw.registerListener(new ReplicationStatusWatcher());
+    this.peerClusters.put(peerId, zkw);
+    this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
+        this.rsServerNameZnode, peerId));
+    LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
+  }
+
+  /**
+   * This reads the state znode for replication and sets the atomic boolean
+   */
+  private void setReplicating() {
+    String value = Bytes.toString(this.zookeeperWrapper.getDataAndWatch(
+        this.replicationZNode, this.replicationStateNodeName,
+        new ReplicationStatusWatcher()));
+    if (value != null) {
+      this.replicating.set(value.equals("true"));
+      LOG.info("Replication is now " + (this.replicating.get() ?
+          "started" : "stopped"));
+    }
+  }
+
+  /**
+   * Add a new log to the list of hlogs in zookeeper
+   * @param filename name of the hlog's znode
+   * @param clusterId name of the cluster's znode
+   */
+  public void addLogToList(String filename, String clusterId) {
+    try {
+      this.zookeeperWrapper.writeZNode(
+          this.zookeeperWrapper.getZNode(
+              this.rsServerNameZnode, clusterId), filename, "");
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Remove a log from the list of hlogs in zookeeper
+   * @param filename name of the hlog's znode
+   * @param clusterId name of the cluster's znode
+   */
+  public void removeLogFromList(String filename, String clusterId) {
+    try {
+      this.zookeeperWrapper.deleteZNode(
+          this.zookeeperWrapper.getZNode(this.rsServerNameZnode,
+              this.zookeeperWrapper.getZNode(clusterId, filename)));
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Set the current position of the specified cluster in the current hlog
+   * @param filename filename name of the hlog's znode
+   * @param clusterId clusterId name of the cluster's znode
+   * @param position the position in the file
+   * @throws IOException
+   */
+  public void writeReplicationStatus(String filename, String clusterId,
+                                     long position) {
+    try {
+      String clusterZNode = this.zookeeperWrapper.getZNode(
+          this.rsServerNameZnode, clusterId);
+      this.zookeeperWrapper.writeZNode(clusterZNode, filename,
+          Long.toString(position));
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Get a list of all the other region servers in this cluster
+   * and set a watch
+   * @param watch the watch to set
+   * @return a list of server nanes
+   */
+  public List<String> getRegisteredRegionServers(Watcher watch) {
+    return this.zookeeperWrapper.listZnodes(
+        this.zookeeperWrapper.getRsZNode(), watch);
+  }
+
+  /**
+   * Get the list of the replicators that have queues, they can be alive, dead
+   * or simply from a previous run
+   * @param watch the watche to set
+   * @return a list of server names
+   */
+  public List<String> getListOfReplicators(Watcher watch) {
+    return this.zookeeperWrapper.listZnodes(rsZNode, watch);
+  }
+
+  /**
+   * Get the list of peer clusters for the specified server names
+   * @param rs server names of the rs
+   * @param watch the watch to set
+   * @return a list of peer cluster
+   */
+  public List<String> getListPeersForRS(String rs, Watcher watch) {
+    return this.zookeeperWrapper.listZnodes(
+        zookeeperWrapper.getZNode(rsZNode, rs), watch);
+  }
+
+  /**
+   * Get the list of hlogs for the specified region server and peer cluster
+   * @param rs server names of the rs
+   * @param id peer cluster
+   * @param watch the watch to set
+   * @return a list of hlogs
+   */
+  public List<String> getListHLogsForPeerForRS(String rs, String id, Watcher watch) {
+    return this.zookeeperWrapper.listZnodes(
+        zookeeperWrapper.getZNode(zookeeperWrapper.getZNode(rsZNode, rs), id), watch);
+  }
+
+  /**
+   * Try to set a lock in another server's znode.
+   * @param znode the server names of the other server
+   * @return true if the lock was acquired, false in every other cases
+   */
+  public boolean lockOtherRS(String znode) {
+    try {
+      this.zookeeperWrapper.writeZNode(
+          this.zookeeperWrapper.getZNode(this.rsZNode, znode),
+          RS_LOCK_ZNODE, rsServerNameZnode, true);
+
+    } catch (InterruptedException e) {
+      LOG.error(e);
+      return false;
+    } catch (KeeperException e) {
+      LOG.debug("Won't lock " + znode + " because " + e.getMessage());
+      // TODO see if the other still exists!!
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * This methods copies all the hlogs queues from another region server
+   * and returns them all sorted per peer cluster (appended with the dead
+   * server's znode)
+   * @param znode server names to copy
+   * @return all hlogs for all peers of that cluster, null if an error occurred
+   */
+  public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
+    // TODO this method isn't atomic enough, we could start copying and then
+    // TODO fail for some reason and we would end up with znodes we don't want.
+    SortedMap<String,SortedSet<String>> queues =
+        new TreeMap<String,SortedSet<String>>();
+    try {
+      String nodePath = this.zookeeperWrapper.getZNode(rsZNode, znode);
+      List<String> clusters = this.zookeeperWrapper.listZnodes(nodePath, null);
+      // We have a lock znode in there, it will count as one.
+      if (clusters == null || clusters.size() <= 1) {
+        return queues;
+      }
+      // The lock isn't a peer cluster, remove it
+      clusters.remove(RS_LOCK_ZNODE);
+      for (String cluster : clusters) {
+        // We add the name of the recovered RS to the new znode, we can even
+        // do that for queues that were recovered 10 times giving a znode like
+        // number-startcode-number-otherstartcode-number-anotherstartcode-etc
+        String newCluster = cluster+"-"+znode;
+        String newClusterZnode =
+            this.zookeeperWrapper.getZNode(rsServerNameZnode, newCluster);
+        this.zookeeperWrapper.ensureExists(newClusterZnode);
+        String clusterPath = this.zookeeperWrapper.getZNode(nodePath, cluster);
+        List<String> hlogs = this.zookeeperWrapper.listZnodes(clusterPath, null);
+        // That region server didn't have anything to replicate for this cluster
+        if (hlogs == null || hlogs.size() == 0) {
+          continue;
+        }
+        SortedSet<String> logQueue = new TreeSet<String>();
+        queues.put(newCluster, logQueue);
+        for (String hlog : hlogs) {
+          String position = Bytes.toString(
+              this.zookeeperWrapper.getData(clusterPath, hlog));
+          LOG.debug("Creating " + hlog + " with data " + position);
+          this.zookeeperWrapper.writeZNode(newClusterZnode, hlog, position);
+          logQueue.add(hlog);
+        }
+      }
+    } catch (InterruptedException e) {
+      LOG.warn(e);
+      return null;
+    } catch (KeeperException e) {
+      LOG.warn(e);
+      return null;
+    }
+    return queues;
+  }
+
+  /**
+   * Delete a complete queue of hlogs
+   * @param peerZnode znode of the peer cluster queue of hlogs to delete
+   */
+  public void deleteSource(String peerZnode) {
+    try {
+      this.zookeeperWrapper.deleteZNode(
+          this.zookeeperWrapper.getZNode(rsServerNameZnode, peerZnode), true);
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Recursive deletion of all znodes in specified rs' znode
+   * @param znode
+   */
+  public void deleteRsQueues(String znode) {
+    try {
+      this.zookeeperWrapper.deleteZNode(
+          this.zookeeperWrapper.getZNode(rsZNode, znode), true);
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Delete this cluster's queues
+   */
+  public void deleteOwnRSZNode() {
+    deleteRsQueues(this.rsServerNameZnode);
+  }
+
+  /**
+   * Get the position of the specified hlog in the specified peer znode
+   * @param peerId znode of the peer cluster
+   * @param hlog name of the hlog
+   * @return the position in that hlog
+   */
+  public long getHLogRepPosition(String peerId, String hlog) {
+    String clusterZnode =
+        this.zookeeperWrapper.getZNode(rsServerNameZnode, peerId);
+    String data = Bytes.toString(
+        this.zookeeperWrapper.getData(clusterZnode, hlog));
+    return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
+  }
+
+  /**
+   * Tells if this cluster replicates or not
+   *
+   * @return if this is a master
+   */
+  public boolean isReplicationMaster() {
+    return this.replicationMaster;
+  }
+
+  /**
+   * Get the identification of the cluster
+   *
+   * @return the id for the cluster
+   */
+  public String getClusterId() {
+    return this.clusterId;
+  }
+
+  /**
+   * Get a map of all peer clusters
+   * @return map of peer cluster, zk address to ZKW
+   */
+  public Map<String, ZooKeeperWrapper> getPeerClusters() {
+    return this.peerClusters;
+  }
+
+  /**
+   * Watcher for the status of the replication
+   */
+  public class ReplicationStatusWatcher implements Watcher {
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+      Event.EventType type = watchedEvent.getType();
+      LOG.info("Got event " + type + " with path " + watchedEvent.getPath());
+      if (type.equals(Event.EventType.NodeDataChanged)) {
+        setReplicating();
+      }
+    }
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Thu Jul  1 00:25:50 2010
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.LogCleanerDelegate;
+import org.apache.hadoop.hbase.master.TimeToLiveLogCleaner;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Implementation of a log cleaner that checks if a log is still scheduled for
+ * replication before deleting it when its TTL is over.
+ */
+public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
+
+  private static final Log LOG =
+      LogFactory.getLog(ReplicationLogCleaner.class);
+  private TimeToLiveLogCleaner ttlCleaner;
+  private Configuration conf;
+  private ReplicationZookeeperWrapper zkHelper;
+  private Set<String> hlogs = new HashSet<String>();
+
+  /**
+   * Instantiates the cleaner, does nothing more.
+   */
+  public ReplicationLogCleaner() {}
+
+  @Override
+  public boolean isLogDeletable(Path filePath) {
+
+    // Don't bother going further if the hlog isn't even expired
+    if (!ttlCleaner.isLogDeletable(filePath)) {
+      LOG.debug("Won't delete log since not past due " + filePath);
+      return false;
+    }
+    String log = filePath.getName();
+    // If we saw the hlog previously, let's consider it's still used
+    // At some point in the future we will refresh the list and it will be gone
+    if (this.hlogs.contains(log)) {
+      return false;
+    }
+
+    // Let's see it's still there
+    // This solution makes every miss very expensive to process since we
+    // almost completly refresh the cache each time
+    return !refreshHLogsAndSearch(log);
+  }
+
+  /**
+   * Search through all the hlogs we have in ZK to refresh the cache
+   * If a log is specified and found, then we early out and return true
+   * @param searchedLog log we are searching for, pass null to cache everything
+   *                    that's in zookeeper.
+   * @return false until a specified log is found.
+   */
+  private boolean refreshHLogsAndSearch(String searchedLog) {
+    this.hlogs.clear();
+    final boolean lookForLog = searchedLog != null;
+    List<String> rss = zkHelper.getListOfReplicators(this);
+    if (rss == null) {
+      LOG.debug("Didn't find any region server that replicates, deleting: " +
+          searchedLog);
+      return false;
+    }
+    for (String rs: rss) {
+      List<String> listOfPeers = zkHelper.getListPeersForRS(rs, this);
+      // if rs just died, this will be null
+      if (listOfPeers == null) {
+        continue;
+      }
+      for (String id : listOfPeers) {
+        List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id, this);
+        if (peersHlogs != null) {
+          this.hlogs.addAll(peersHlogs);
+        }
+        // early exit if we found the log
+        if(lookForLog && this.hlogs.contains(searchedLog)) {
+          LOG.debug("Found log in ZK, keeping: " + searchedLog);
+          return true;
+        }
+      }
+    }
+    LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
+    return false;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    this.ttlCleaner = new TimeToLiveLogCleaner();
+    this.ttlCleaner.setConf(conf);
+    try {
+      this.zkHelper = new ReplicationZookeeperWrapper(
+          ZooKeeperWrapper.createInstance(this.conf,
+              HMaster.class.getName()),
+          this.conf, new AtomicBoolean(true), null);
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+    refreshHLogsAndSearch(null);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void process(WatchedEvent watchedEvent) {}
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html Thu Jul  1 00:25:50 2010
@@ -0,0 +1,128 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
+<html>
+
+<!--
+   Copyright 2010 The Apache Software Foundation
+
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<head />
+<body bgcolor="white">
+<h1>Multi Cluster Replication</h1>
+This package provides replication between HBase clusters.
+<p>
+
+<h2>Table Of Contents</h2>
+<ol>
+    <li><a href="#status">Status</a></li>
+    <li><a href="#requirements">Requirements</a></li>
+    <li><a href="#deployment">Deployment</a></li>
+</ol>
+
+<p>
+<a name="status">
+<h2>Status</h2>
+</a>
+<p>
+This package is alpha quality software and is only meant to be a base
+for future developments. The current implementation offers the following
+features:
+
+<ol>
+    <li>Master/Slave replication limited to 1 slave cluster. </li>
+    <li>Replication of scoped families in user tables.</li>
+    <li>Start/stop replication stream.</li>
+    <li>Supports clusters of different sizes.</li>
+    <li>Handling of partitions longer than 10 minutes</li>
+</ol>
+Please report bugs on the project's Jira when found.
+<p>
+<a name="requirements">
+<h2>Requirements</h2>
+</a>
+<p>
+
+Before trying out replication, make sure to review the following requirements:
+
+<ol>
+    <li>Zookeeper should be handled by yourself, not by HBase, and should
+    always be available during the deployment.</li>
+    <li>All machines from both clusters should be able to reach every
+    other machine since replication goes from any region server to any
+    other one on the slave cluster. That also includes the
+    Zookeeper clusters.</li>
+    <li>Both clusters should have the same HBase and Hadoop major revision.
+    For example, having 0.21.1 on the master and 0.21.0 on the slave is
+    correct but not 0.21.1 and 0.22.0.</li>
+    <li>Every table that contains families that are scoped for replication
+    should exist on every cluster with the exact same name, same for those
+    replicated families.</li>
+</ol>
+
+<p>
+<a name="deployment">
+<h2>Deployment</h2>
+</a>
+<p>
+
+The following steps describe how to enable replication from a cluster
+to another. This must be done with both clusters offlined.
+<ol>
+    <li>Edit ${HBASE_HOME}/conf/hbase-site.xml on both cluster to add
+    the following configurations:
+        <pre>
+&lt;property&gt;
+  &lt;name&gt;hbase.replication.enabled&lt;/name&gt;
+  &lt;value&gt;true&lt;/value&gt;
+&lt;/property&gt;</pre>
+    </li>
+    <li>Run the following command on any cluster:
+    <pre>
+$HBASE_HOME/bin/hbase org.jruby.Main $HBASE_HOME/replication/bin/add_peer.tb</pre>
+    This will show you the help to setup the replication stream between
+    both clusters. If both clusters use the same Zookeeper cluster, you have
+    to use a different <b>zookeeper.znode.parent</b> since they can't
+    write in the same folder.
+    </li>
+    <li>You can now start and stop the clusters with your preferred method.</li>
+</ol>
+
+You can confirm that your setup works by looking at any region server's log
+on the master cluster and look for the following lines;
+
+<pre>
+Considering 1 rs, with ratio 0.1
+Getting 1 rs from peer cluster # 0
+Choosing peer 10.10.1.49:62020</pre>
+
+In this case it indicates that 1 region server from the slave cluster
+was chosen for replication.<br><br>
+
+Should you want to stop the replication while the clusters are running, open
+the shell on the master cluster and issue this command:
+<pre>
+hbase(main):001:0> zk 'set /zookeeper.znode.parent/replication/state false'</pre>
+
+Where you replace the znode parent with the one configured on your master
+cluster. Replication of already queued edits will still happen after you
+issued that command but new entries won't be. To start it back, simply replace
+"false" with "true" in the command.
+
+<p>
+
+</body>
+</html>

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Thu Jul  1 00:25:50 2010
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.LogEntryVisitor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+
+import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Replication serves as an umbrella over the setup of replication and
+ * is used by HRS.
+ */
+public class Replication implements LogEntryVisitor {
+
+  private final boolean replication;
+  private final ReplicationSourceManager replicationManager;
+  private boolean replicationMaster;
+  private final AtomicBoolean replicating = new AtomicBoolean(true);
+  private final ReplicationZookeeperWrapper zkHelper;
+  private final Configuration conf;
+  private final AtomicBoolean  stopRequested;
+  private ReplicationSink replicationSink;
+
+  /**
+   * Instantiate the replication management (if rep is enabled).
+   * @param conf conf to use
+   * @param hsi the info if this region server
+   * @param fs handle to the filesystem
+   * @param oldLogDir directory where logs are archived
+   * @param stopRequested boolean that tells us if we are shutting down
+   * @throws IOException
+   */
+  public Replication(Configuration conf, HServerInfo hsi,
+                     FileSystem fs, Path oldLogDir,
+                     AtomicBoolean stopRequested) throws IOException {
+    this.conf = conf;
+    this.stopRequested = stopRequested;
+    this.replication =
+        conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
+    if (replication) {
+      this.zkHelper = new ReplicationZookeeperWrapper(
+        ZooKeeperWrapper.getInstance(conf, hsi.getServerName()), conf,
+        this.replicating, hsi.getServerName());
+      this.replicationMaster = zkHelper.isReplicationMaster();
+      this.replicationManager = this.replicationMaster ?
+        new ReplicationSourceManager(zkHelper, conf, stopRequested,
+          fs, this.replicating, oldLogDir) : null;
+    } else {
+      replicationManager = null;
+      zkHelper = null;
+    }
+  }
+
+  /**
+   * Join with the replication threads
+   */
+  public void join() {
+    if (this.replication) {
+      if (this.replicationMaster) {
+        this.replicationManager.join();
+      }
+      this.zkHelper.deleteOwnRSZNode();
+    }
+  }
+
+  /**
+   * Carry on the list of log entries down to the sink
+   * @param entries list of entries to replicate
+   * @throws IOException
+   */
+  public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+    if (this.replication && !this.replicationMaster) {
+      this.replicationSink.replicateEntries(entries);
+    }
+  }
+
+  /**
+   * If replication is enabled and this cluster is a master,
+   * it starts
+   * @throws IOException
+   */
+  public void startReplicationServices() throws IOException {
+    if (this.replication) {
+      if (this.replicationMaster) {
+        this.replicationManager.init();
+      } else {
+        this.replicationSink =
+            new ReplicationSink(this.conf, this.stopRequested);
+      }
+    }
+  }
+
+  /**
+   * Get the replication sources manager
+   * @return the manager if replication is enabled, else returns false
+   */
+  public ReplicationSourceManager getReplicationManager() {
+    return replicationManager;
+  }
+
+  @Override
+  public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+                                       WALEdit logEdit) {
+    NavigableMap<byte[], Integer> scopes =
+        new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    byte[] family;
+    for (KeyValue kv : logEdit.getKeyValues()) {
+      family = kv.getFamily();
+      int scope = info.getTableDesc().getFamily(family).getScope();
+      if (scope != HConstants.REPLICATION_SCOPE_LOCAL &&
+          !scopes.containsKey(family)) {
+        scopes.put(family, scope);
+      }
+    }
+    if (!scopes.isEmpty()) {
+      logEdit.setScopes(scopes);
+    }
+  }
+
+  /**
+   * Add this class as a log entry visitor for HLog if replication is enabled
+   * @param hlog log that was add ourselves on
+   */
+  public void addLogEntryVisitor(HLog hlog) {
+    if (replication) {
+      hlog.addLogEntryVisitor(this);
+    }
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Thu Jul  1 00:25:50 2010
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This class is responsible for replicating the edits coming
+ * from another cluster.
+ * <p/>
+ * This replication process is currently waiting for the edits to be applied
+ * before the method can return. This means that the replication of edits
+ * is synchronized (after reading from HLogs in ReplicationSource) and that a
+ * single region server cannot receive edits from two sources at the same time
+ * <p/>
+ * This class uses the native HBase client in order to replicate entries.
+ * <p/>
+ *
+ * TODO make this class more like ReplicationSource wrt log handling
+ */
+public class ReplicationSink {
+
+  private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
+  // Name of the HDFS directory that contains the temporary rep logs
+  public static final String REPLICATION_LOG_DIR = ".replogs";
+  private final Configuration conf;
+  // Pool used to replicated
+  private final HTablePool pool;
+  // boolean coming from HRS to know when the process stops
+  private final AtomicBoolean stop;
+
+  /**
+   * Create a sink for replication
+   *
+   * @param conf                conf object
+   * @param stopper             boolean to tell this thread to stop
+   * @throws IOException thrown when HDFS goes bad or bad file name
+   */
+  public ReplicationSink(Configuration conf, AtomicBoolean stopper)
+      throws IOException {
+    this.conf = conf;
+    this.pool = new HTablePool(this.conf,
+        conf.getInt("replication.sink.htablepool.capacity", 10));
+    this.stop = stopper;
+  }
+
+  /**
+   * Replicate this array of entries directly into the local cluster
+   * using the native client.
+   *
+   * @param entries
+   * @throws IOException
+   */
+  public synchronized void replicateEntries(HLog.Entry[] entries)
+      throws IOException {
+    // Very simple optimization where we batch sequences of rows going
+    // to the same table.
+    try {
+      long totalReplicated = 0;
+      byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
+      List<Put> puts = new ArrayList<Put>();
+      for (HLog.Entry entry : entries) {
+        WALEdit edit = entry.getEdit();
+        List<KeyValue> kvs = edit.getKeyValues();
+        if (kvs.get(0).isDelete()) {
+          Delete delete = new Delete(kvs.get(0).getRow(),
+              kvs.get(0).getTimestamp(), null);
+          for (KeyValue kv : kvs) {
+            if (kv.isDeleteFamily()) {
+              delete.deleteFamily(kv.getFamily());
+            } else if (!kv.isEmptyColumn()) {
+              delete.deleteColumn(kv.getFamily(),
+                  kv.getQualifier());
+            }
+          }
+          delete(entry.getKey().getTablename(), delete);
+        } else {
+          // Switching table, flush
+          if (!Bytes.equals(lastTable, entry.getKey().getTablename())) {
+            put(lastTable, puts);
+          }
+          // With mini-batching, we need to expect multiple rows per edit
+          byte[] lastKey = kvs.get(0).getRow();
+          Put put = new Put(kvs.get(0).getRow(),
+              kvs.get(0).getTimestamp());
+          for (KeyValue kv : kvs) {
+            if (!Bytes.equals(lastKey, kv.getRow())) {
+              puts.add(put);
+              put = new Put(kv.getRow(), kv.getTimestamp());
+            }
+            put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
+            lastKey = kv.getRow();
+          }
+          puts.add(put);
+          lastTable = entry.getKey().getTablename();
+        }
+        totalReplicated++;
+      }
+      put(lastTable, puts);
+      LOG.info("Total replicated: " + totalReplicated);
+    } catch (IOException ex) {
+      if (ex.getCause() instanceof TableNotFoundException) {
+        LOG.warn("Losing edits because: ", ex);
+      } else {
+        // Should we log rejected edits in a file for replay?
+        LOG.error("Unable to accept edit because", ex);
+        this.stop.set(true);
+        throw ex;
+      }
+    } catch (RuntimeException re) {
+      if (re.getCause() instanceof TableNotFoundException) {
+        LOG.warn("Losing edits because: ", re);
+      } else {
+        this.stop.set(true);
+        throw re;
+      }
+    }
+  }
+
+  /**
+   * Do the puts and handle the pool
+   * @param tableName table to insert into
+   * @param puts list of puts
+   * @throws IOException
+   */
+  private void put(byte[] tableName, List<Put> puts) throws IOException {
+    if (puts.isEmpty()) {
+      return;
+    }
+    HTableInterface table = null;
+    try {
+      table = this.pool.getTable(tableName);
+      table.put(puts);
+      this.pool.putTable(table);
+      puts.clear();
+    } finally {
+      if (table != null) {
+        this.pool.putTable(table);
+      }
+    }
+  }
+
+  /**
+   * Do the delete and handle the pool
+   * @param tableName table to delete in
+   * @param delete the delete to use
+   * @throws IOException
+   */
+  private void delete(byte[] tableName, Delete delete) throws IOException {
+    HTableInterface table = null;
+    try {
+      table = this.pool.getTable(tableName);
+      table.delete(delete);
+      this.pool.putTable(table);
+    } finally {
+      if (table != null) {
+        this.pool.putTable(table);
+      }
+    }
+  }
+}