You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/31 00:50:04 UTC

svn commit: r991014 - in /hbase/branches/0.90_master_rewrite: ./ src/main/java/org/apache/hadoop/hbase/catalog/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/io/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main/...

Author: stack
Date: Mon Aug 30 22:50:03 2010
New Revision: 991014

URL: http://svn.apache.org/viewvc?rev=991014&view=rev
Log:
Added move region command.  Added balancer on/off command.
Brought forward the replication zk stuff (J-D offered to fix up
the mess I made).  Fixes to make tests pass.

M BRANCH_TODO.txt
  Added some TODOs
M src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
  Fix to match new method args.
M src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
  Changed constructor params for HLog.
M src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
  Changed to take Stoppable.
M src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
  Changed to take Stoppable.
M src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
  Formatting.
M src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
  Use MetaEditor adding daughters. Use new open method on HRegion and the
  postOpenDeployTasks.  Run open of the daughters in parallel.
M src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
  Minor changes.  Added flag to postOpenDeployTasks to denote whether or
  not region is a daughter of a split.
M src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
  postOpenDeployTasks takes a boolean.
M src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
  Pass server and services instead of catalogtracker and server.
M src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
  Added an open method t hat does initialize and sequenceid handling.
M src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
  Added utility method that gets HRI and HSI for passed encoded region name. 
M src/main/java/org/apache/hadoop/hbase/master/HMaster.java
  (balance, move): Added.
M src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
  Remoe unused imports.
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
  Reenabled bits of code.
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
  Javadoc, let out KE.
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
  Reenabled code.
M src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
  Take a Server instead of individual args.
M src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java
  Refactored some.  Added a addDaughter method.
M src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
  Added move and balance.
M src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
  Removed unused imports.
M src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
  Added move and balance.

Modified:
    hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java

Modified: hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt (original)
+++ hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt Mon Aug 30 22:50:03 2010
@@ -290,3 +290,6 @@ Later:
 
  So on assign, if we fail -- say connection refused when we try open on the RS, regions state remains offline -- who comes along and finds all offline and assigns?
 
+
+TODO:
++ Add test to prove move region works.

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java Mon Aug 30 22:50:03 2010
@@ -55,7 +55,7 @@ public class MetaEditor {
         Writables.getBytes(regionInfo));
     catalogTracker.waitForMetaServerConnectionDefault().put(
         CatalogTracker.META_REGION, put);
-    LOG.info("Added region " + regionInfo + " to META");
+    LOG.info("Added region " + regionInfo.getRegionNameAsString() + " to META");
   }
 
   /**
@@ -71,9 +71,11 @@ public class MetaEditor {
   public static void offlineParentInMeta(CatalogTracker catalogTracker,
       HRegionInfo parent, final HRegionInfo a, final HRegionInfo b)
   throws NotAllMetaRegionsOnlineException, IOException {
-    Put put = new Put(parent.getRegionName());
-    put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
-      Writables.getBytes(parent));
+    HRegionInfo copyOfParent = new HRegionInfo(parent);
+    copyOfParent.setOffline(true);
+    copyOfParent.setSplit(true);
+    Put put = new Put(copyOfParent.getRegionName());
+    addRegionInfo(put, copyOfParent);
     put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
         HConstants.EMPTY_BYTE_ARRAY);
     put.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
@@ -83,7 +85,23 @@ public class MetaEditor {
     put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITB_QUALIFIER,
       Writables.getBytes(b));
     catalogTracker.waitForMetaServerConnectionDefault().put(CatalogTracker.META_REGION, put);
-    LOG.info("Offlined parent region " + parent + " in META");
+    LOG.info("Offlined parent region " + parent.getRegionNameAsString() +
+      " in META");
+  }
+
+  public static void addDaughter(final CatalogTracker catalogTracker,
+      final HRegionInfo regionInfo, final HServerInfo serverInfo)
+  throws NotAllMetaRegionsOnlineException, IOException {
+    HRegionInterface server = catalogTracker.waitForRootServerConnectionDefault();
+    byte [] catalogRegionName = CatalogTracker.META_REGION;
+    Put put = new Put(regionInfo.getRegionName());
+    addRegionInfo(put, regionInfo);
+    addLocation(put, serverInfo);
+    server.put(catalogRegionName, put);
+    LOG.info("Added daughter " + regionInfo.getRegionNameAsString() +
+      " in region " + Bytes.toString(catalogRegionName) + " with " +
+      "server=" + serverInfo.getHostnamePort() + ", " +
+      "startcode=" + serverInfo.getStartCode());
   }
 
   /**
@@ -140,10 +158,7 @@ public class MetaEditor {
       byte [] catalogRegionName, HRegionInfo regionInfo, HServerInfo serverInfo)
   throws IOException {
     Put put = new Put(regionInfo.getRegionName());
-    put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
-      Bytes.toBytes(serverInfo.getHostnamePort()));
-    put.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
-      Bytes.toBytes(serverInfo.getStartCode()));
+    addLocation(put, serverInfo);
     server.put(catalogRegionName, put);
     LOG.info("Updated row " + regionInfo.getRegionNameAsString() +
       " in region " + Bytes.toString(catalogRegionName) + " with " +
@@ -164,7 +179,7 @@ public class MetaEditor {
     catalogTracker.waitForMetaServerConnectionDefault().delete(
         CatalogTracker.META_REGION, delete);
 
-    LOG.info("Deleted region " + regionInfo + " from META");
+    LOG.info("Deleted region " + regionInfo.getRegionNameAsString() + " from META");
   }
 
   /**
@@ -177,10 +192,24 @@ public class MetaEditor {
       HRegionInfo regionInfo)
   throws IOException {
     Put put = new Put(regionInfo.getRegionName());
-    put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
-        Writables.getBytes(regionInfo));
+    addRegionInfo(put, regionInfo);
     catalogTracker.waitForMetaServerConnectionDefault().put(
         CatalogTracker.META_REGION, put);
-    LOG.info("Updated region " + regionInfo + " in META");
+    LOG.info("Updated region " + regionInfo.getRegionNameAsString() + " in META");
+  }
+
+  private static Put addRegionInfo(final Put p, final HRegionInfo hri)
+  throws IOException {
+    p.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
+        Writables.getBytes(hri));
+    return p;
+  }
+
+  private static Put addLocation(final Put p, final HServerInfo hsi) {
+    p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
+      Bytes.toBytes(hsi.getHostnamePort()));
+    p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
+      Bytes.toBytes(hsi.getStartCode()));
+    return p;
   }
-}
+}
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Mon Aug 30 22:50:03 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.MasterNot
 import org.apache.hadoop.hbase.RegionException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.catalog.MetaReader;
@@ -854,6 +855,31 @@ public class HBaseAdmin implements Abort
   }
 
   /**
+   * Move the region <code>r</code> to <code>dest</code>.
+   * @param encodedRegionName The encoded region name.
+   * @param destServerName The servername of the destination regionserver
+   * @throws UnknownRegionException Thrown if we can't find a region named
+   * <code>encodedRegionName</code>
+   * @throws ZooKeeperConnectionException 
+   * @throws MasterNotRunningException 
+   */
+  public void move(final byte [] encodedRegionName, final byte [] destServerName)
+  throws UnknownRegionException, MasterNotRunningException, ZooKeeperConnectionException {
+    getMaster().move(encodedRegionName, destServerName);
+  }
+
+  /**
+   * @param b If true, enable balancer. If false, disable balancer.
+   * @return Previous balancer value
+   * @throws ZooKeeperConnectionException 
+   * @throws MasterNotRunningException 
+   */
+  public boolean balance(final boolean b)
+  throws MasterNotRunningException, ZooKeeperConnectionException {
+    return getMaster().balance(b);
+  }
+
+  /**
    * Split a table or an individual region.
    * Asynchronous operation.
    *

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Mon Aug 30 22:50:03 2010
@@ -183,7 +183,6 @@ public class HbaseObjectWritable impleme
     addToMap(HLog.Entry[].class, code++);
     addToMap(HLogKey.class, code++);
 
-    // List
     addToMap(List.class, code++);
 
     addToMap(NavigableSet.class, code++);

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java Mon Aug 30 22:50:03 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.UnknownRegionException;
 
 /**
  * Clients interact with the HMasterInterface to gain access to meta-level
@@ -130,4 +131,21 @@ public interface HMasterInterface extend
    * @return status object
    */
   public ClusterStatus getClusterStatus();
+
+
+  /**
+   * Move the region <code>r</code> to <code>dest</code>.
+   * @param encodedRegionName The encoded region name.
+   * @param destServerName The servername of the destination regionserver
+   * @throws UnknownRegionException Thrown if we can't find a region named
+   * <code>encodedRegionName</code>
+   */
+  public void move(final byte [] encodedRegionName, final byte [] destServerName)
+  throws UnknownRegionException;
+
+  /**
+   * @param b If true, enable balancer. If false, disable balancer.
+   * @return Previous balancer value
+   */
+  public boolean balance(final boolean b);
 }
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Mon Aug 30 22:50:03 2010
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 import java.util.NavigableSet;
 
@@ -36,7 +35,6 @@ import org.apache.hadoop.hbase.client.Mu
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 
 /**

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Mon Aug 30 22:50:03 2010
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.master.Lo
 import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
@@ -1059,6 +1060,23 @@ public class AssignmentManager extends Z
   }
 
   /**
+   * @param encodedRegionName Region encoded name.
+   * @return Null or a {@link Pair} instance that holds the full {@link HRegionInfo}
+   * and the hosting servers {@link HServerInfo}.
+   */
+  Pair<HRegionInfo, HServerInfo> getAssignment(final byte [] encodedRegionName) {
+    String name = Bytes.toString(encodedRegionName);
+    synchronized(this.regions) {
+      for (Map.Entry<HRegionInfo, HServerInfo> e: this.regions.entrySet()) {
+        if (e.getKey().getEncodedName().equals(name)) {
+          return new Pair<HRegionInfo, HServerInfo>(e.getKey(), e.getValue());
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
    * @param plan Plan to execute.
    */
   void balance(final RegionPlan plan) {

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Mon Aug 30 22:50:03 2010
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.catalog.MetaEditor;
@@ -169,6 +170,7 @@ implements HMasterInterface, HMasterRegi
 
   private LoadBalancer balancer = new LoadBalancer();
   private Chore balancerChore;
+  private volatile boolean balance = true;
 
   /**
    * Initializes the HMaster. The steps are as follows:
@@ -538,6 +540,8 @@ implements HMasterInterface, HMasterRegi
    * Run the balancer.
    */
   public void balance() {
+    // If balance not true, don't run balancer.
+    if (!this.balance) return;
     synchronized (this.balancer) {
       // Only allow one balance run at at time.
       if (this.assignmentManager.isRegionsInTransition()) {
@@ -563,6 +567,25 @@ implements HMasterInterface, HMasterRegi
     }
   }
 
+  @Override
+  public boolean balance(final boolean b) {
+    boolean oldValue = this.balance;
+    this.balance = b;
+    LOG.info("Balance=" + b);
+    return oldValue;
+  }
+
+  @Override
+  public void move(final byte[] encodedRegionName, final byte[] destServerName)
+  throws UnknownRegionException {
+    Pair<HRegionInfo, HServerInfo> p =
+      this.assignmentManager.getAssignment(encodedRegionName);
+    if (p == null) throw new UnknownRegionException(Bytes.toString(encodedRegionName));
+    HServerInfo dest = this.serverManager.getServerInfo(new String(destServerName));
+    RegionPlan rp = new RegionPlan(p.getFirst(), p.getSecond(), dest);
+    this.assignmentManager.balance(rp);
+  }
+
   public void createTable(HTableDescriptor desc, byte [][] splitKeys)
   throws IOException {
     createTable(desc, splitKeys, false);

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java Mon Aug 30 22:50:03 2010
@@ -19,6 +19,8 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -37,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.io.Writable;
 
 /**
  * Makes decisions about the placement and movement of Regions across
@@ -520,11 +523,12 @@ public class LoadBalancer {
    * information and not the source/dest server info.
    */
   public static class RegionPlan implements Comparable<RegionPlan> {
-
     private final HRegionInfo hri;
     private final HServerInfo source;
     private HServerInfo dest;
 
+    
+
     /**
      * Instantiate a plan for a region move, moving the specified region from
      * the specified source server to the specified destination server.

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Mon Aug 30 22:50:03 2010
@@ -149,7 +149,7 @@ public class CompactSplitThread extends 
     // the prepare call -- we are not ready to split just now.  Just return.
     if (!st.prepare()) return;
     try {
-      st.execute(this.server, this.server.getCatalogTracker());
+      st.execute(this.server, this.server);
     } catch (IOException ioe) {
       try {
         LOG.info("Running rollback of failed split of " +
@@ -172,8 +172,9 @@ public class CompactSplitThread extends 
     this.server.reportSplit(parent.getRegionInfo(), st.getFirstDaughter(),
       st.getSecondDaughter());
     LOG.info("Region split, META updated, and report to master. Parent=" +
-      parent.getRegionInfo() + ", new regions: " +
-      st.getFirstDaughter() + ", " + st.getSecondDaughter() + ". Split took " +
+      parent.getRegionInfo().getRegionNameAsString() + ", new regions: " +
+      st.getFirstDaughter().getRegionNameAsString() + ", " +
+      st.getSecondDaughter().getRegionNameAsString() + ". Split took " +
       StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
   }
 

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Aug 30 22:50:03 2010
@@ -2422,11 +2422,23 @@ public class HRegion implements HeapSize
       info.getTableDesc().getName());
     HRegion r = HRegion.newHRegion(dir, wal, FileSystem.get(conf), conf, info,
       flusher);
-    long seqid = r.initialize(reporter);
-    if (wal != null) {
-      wal.setSequenceNumber(seqid);
+    return r.openHRegion(reporter);
+  }
+
+  /**
+   * Open HRegion.
+   * Calls initialize and sets sequenceid.
+   * @param reporter
+   * @return Returns <code>this</code>
+   * @throws IOException
+   */
+  HRegion openHRegion(final Progressable reporter)
+  throws IOException {
+    long seqid = initialize(reporter);
+    if (this.log != null) {
+      this.log.setSequenceNumber(seqid);
     }
-    return r;
+    return this;
   }
 
   /**

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Aug 30 22:50:03 2010
@@ -905,8 +905,12 @@ public class HRegionServer implements HR
 
     // Instantiate replication manager if replication enabled.  Pass it the
     // log directories.
-    this.replicationHandler = Replication.isReplication(this.conf)?
-      new Replication(this, this.fs, logdir, oldLogDir): null;
+    try {
+      this.replicationHandler = Replication.isReplication(this.conf)?
+        new Replication(this, this.fs, logdir, oldLogDir): null;
+    } catch (KeeperException e) {
+      throw new IOException("Failed replication handler create", e);
+    }
     return instantiateHLog(logdir, oldLogDir);
   }
 
@@ -1133,7 +1137,8 @@ public class HRegionServer implements HR
   }
 
   @Override
-  public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct)
+  public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct,
+      final boolean daughter)
   throws KeeperException, IOException {
     // Do checks to see if we need to compact (references or too many files)
     if (r.hasReferences() || r.hasTooManyStoreFiles()) {
@@ -1147,11 +1152,16 @@ public class HRegionServer implements HR
     if (r.getRegionInfo().isRootRegion()) {
       RootLocationEditor.setRootLocation(getZooKeeper(),
         getServerInfo().getServerAddress());
-    } else if(r.getRegionInfo().isMetaRegion()) {
+    } else if (r.getRegionInfo().isMetaRegion()) {
       // TODO: doh, this has weird naming between RootEditor/MetaEditor
       MetaEditor.updateMetaLocation(ct, r.getRegionInfo(), getServerInfo());
     } else {
-      MetaEditor.updateRegionLocation(ct, r.getRegionInfo(), getServerInfo());
+      if (daughter) {
+        // If daughter of a split, update whole row, not just location.
+        MetaEditor.addDaughter(ct, r.getRegionInfo(), getServerInfo());
+      } else {
+        MetaEditor.updateRegionLocation(ct, r.getRegionInfo(), getServerInfo());
+      }
     }
   }
 
@@ -2026,7 +2036,8 @@ public class HRegionServer implements HR
     try {
       region = getOnlineRegion(regionName);
       if (region == null) {
-        throw new NotServingRegionException("Region is not online: " + regionName);
+        throw new NotServingRegionException("Region is not online: " +
+          Bytes.toStringBinary(regionName));
       }
       return region;
     } finally {

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java Mon Aug 30 22:50:03 2010
@@ -53,9 +53,11 @@ public interface RegionServerServices ex
    * regionserver
    * @param r Region to open.
    * @param ct Instance of {@link CatalogTracker}
+   * @param daughter True if this is daughter of a split
    * @throws KeeperException
    * @throws IOException
    */
-  public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct)
+  public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct,
+      final boolean daughter)
   throws KeeperException, IOException;
 }
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java Mon Aug 30 22:50:03 2010
@@ -32,7 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.catalog.MetaEditor;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.io.Reference.Range;
@@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.hadoop.util.Progressable;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Executes region split as a "transaction".  Call {@link #prepare()} to setup
@@ -112,6 +114,8 @@ class SplitTransaction {
 
   /**
    * Constructor
+   * @param services So we can online new servces.  If null, we'll skip onlining
+   * (Useful testing).
    * @param c Configuration to use running split
    * @param r Region to split
    * @param splitrow Row to split around
@@ -176,40 +180,23 @@ class SplitTransaction {
 
   /**
    * Run the transaction.
-   * @param or Object that can online/offline parent region. Can be null
-   * @param ct CatalogTracker instance.
+   * @param server Hosting server instance.
+   * @param services Used to online/offline regions.
    * @throws IOException If thrown, transaction failed. Call {@link #rollback(OnlineRegions)}
    * @return Regions created
    * @see #rollback(OnlineRegions)
    */
-  public PairOfSameType<HRegion> execute(final OnlineRegions or,
-      final CatalogTracker ct)
-  throws IOException {
-    return execute(or, ct, or != null);
-  }
-
-  /**
-   * Run the transaction.
-   * @param or Object that can online/offline parent region.  Can be null (Tests
-   * will pass null).
-   * @param ct CatalogTracker instance.  Can be null (for testing)
-   * @param updateMeta If <code>true</code>, update meta (set to false when testing).
-   * @throws IOException If thrown, transaction failed. Call {@link #rollback(OnlineRegions)}
-   * @return Regions created
-   * @see #rollback(OnlineRegions)
-   */
-  PairOfSameType<HRegion> execute(final OnlineRegions or, final CatalogTracker ct,
-      final boolean updateMeta)
+  PairOfSameType<HRegion> execute(final Server server,
+      final RegionServerServices services)
   throws IOException {
     LOG.info("Starting split of region " + this.parent);
     if (!this.parent.lock.writeLock().isHeldByCurrentThread()) {
       throw new SplitAndCloseWriteLockNotHeld();
     }
 
-    // We'll need one of these later but get it now because if we fail there
-    // is nothing to undo.
-    HTable t = null;
-    if (updateMeta) t = getTable(this.parent.getConf());
+    // If true, no cluster to write meta edits into.
+    boolean testing =
+      server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
 
     createSplitDir(this.parent.getFilesystem(), this.splitdir);
     this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
@@ -217,8 +204,8 @@ class SplitTransaction {
     List<StoreFile> hstoreFilesToSplit = this.parent.close(false);
     this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
 
-    if (or != null) {
-      or.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
+    if (!testing) {
+      services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
     }
     this.journal.add(JournalEntry.OFFLINED_PARENT);
 
@@ -232,55 +219,39 @@ class SplitTransaction {
     // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
     // add entry to journal BEFORE rather than AFTER the change.
     this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
-    HRegion a = createDaughterRegion(this.hri_a);
+    HRegion a = createDaughterRegion(this.hri_a, this.parent.flushRequester);
 
     // Ditto
     this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
-    HRegion b = createDaughterRegion(this.hri_b);
+    HRegion b = createDaughterRegion(this.hri_b, this.parent.flushRequester);
 
     // Edit parent in meta
-    if (ct != null) {
-      MetaEditor.offlineParentInMeta(ct, this.parent.getRegionInfo(),
-        a.getRegionInfo(), b.getRegionInfo());
-    }
-
-    // The is the point of no return.  We are committed to the split now.  Up to
-    // a failure editing parent in meta or a crash of the hosting regionserver,
-    // we could rollback (or, if crash, we could cleanup on redeploy) but now
-    // meta has been changed, we can only go forward.  If the below last steps
-    // do not complete, repair has to be done by another agent.  For example,
-    // basescanner, at least up till master rewrite, would add daughter rows if
-    // missing from meta.  It could do this because the parent edit includes the
-    // daughter specs.  In Bigtable paper, they have another mechanism where
-    // some feedback to the master somehow flags it that split is incomplete and
-    // needs fixup.  Whatever the mechanism, its a TODO that we have some fixup.
-    
-    // I looked at writing the put of the parent edit above out to the WAL log
-    // before changing meta with the notion that should we fail, then on replay
-    // the offlining of the parent and addition of daughters up into meta could
-    // be reinserted.  The edits would have to be 'special' and given how our
-    // splits work, splitting by region, I think the replay would have to happen
-    // inside in the split code -- as soon as it saw one of these special edits,
-    // rather than write the edit out a file for the .META. region to replay or
-    // somehow, write it out to this regions edits file for it to handle on
-    // redeploy -- this'd be whacky, we'd be telling meta about a split during
-    // the deploy of the parent -- instead we'd have to play the edit inside
-    // in the split code somehow; this would involve a stop-the-splitting till
-    // meta had been edited which might hold up splitting a good while.
-
-    // Finish up the meta edits.  If these fail, another agent needs to do fixup
-    HRegionInfo hri = a.getRegionInfo();
-    try {
-      if (ct != null) MetaEditor.addRegionToMeta(ct, hri);
-      hri = b.getRegionInfo();
-      if (ct != null) MetaEditor.addRegionToMeta(ct, hri);
-    } catch (IOException e) {
-      // Don't let this out or we'll run rollback.
-      LOG.warn("Failed adding daughter " + hri.toString());
-    }
-    // This should not fail because the HTable instance we are using is not
-    // running a buffer -- its immediately flushing its puts.
-    if (t != null) t.close();
+    if (!testing) {
+      MetaEditor.offlineParentInMeta(server.getCatalogTracker(),
+        this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo());
+    }
+
+    // The is the point of no return.  We are committed to the split now.  We
+    // have still the daughter regions to open but meta has been changed.
+    // If we fail from here on out, we can not rollback so, we'll just abort.
+    // The meta has been changed though so there will need to be a fixup run
+    // during processing of the crashed server by master (TODO: Verify this in place).
+
+    // TODO: Could we be smarter about the sequence in which we do these steps?
+
+    if (!testing) {
+      // Open daughters in parallel.
+      DaughterOpener aOpener = new DaughterOpener(server, services, a);
+      DaughterOpener bOpener = new DaughterOpener(server, services, b);
+      aOpener.start();
+      bOpener.start();
+      try {
+        aOpener.join();
+        bOpener.join();
+      } catch (InterruptedException e) {
+        server.abort("Exception running daughter opens", e);
+      }
+    }
 
     // Unlock if successful split.
     this.parent.lock.writeLock().unlock();
@@ -291,6 +262,68 @@ class SplitTransaction {
     return new PairOfSameType<HRegion>(a, b);
   }
 
+  class DaughterOpener extends Thread {
+    private final RegionServerServices services;
+    private final Server server;
+    private final HRegion r;
+
+    DaughterOpener(final Server s, final RegionServerServices services, final HRegion r) {
+      super(s.getServerName() + "-daughterOpener=" + r.getRegionInfo().getEncodedName());
+      this.services = services;
+      this.server = s;
+      this.r = r;
+    }
+
+    @Override
+    public void run() {
+      try {
+        openDaughterRegion(this.server, this.services, r);
+      } catch (Throwable t) {
+        this.server.abort("Failed open of daughter " +
+          this.r.getRegionInfo().getRegionNameAsString(), t);
+      }
+    }
+  }
+
+  /**
+   * Open daughter regions, add them to online list and update meta.
+   * @param server
+   * @param services
+   * @param daughter
+   * @throws IOException
+   * @throws KeeperException
+   */
+  void openDaughterRegion(final Server server,
+      final RegionServerServices services, final HRegion daughter)
+  throws IOException, KeeperException {
+    HRegionInfo hri = daughter.getRegionInfo();
+    LoggingProgressable reporter =
+      new LoggingProgressable(hri, server.getConfiguration());
+    HRegion r = daughter.openHRegion(reporter);
+    services.postOpenDeployTasks(r, server.getCatalogTracker(), true);
+  }
+
+  static class LoggingProgressable implements Progressable {
+    private final HRegionInfo hri;
+    private long lastLog = -1;
+    private final long interval;
+
+    LoggingProgressable(final HRegionInfo hri, final Configuration c) {
+      this.hri = hri;
+      this.interval = c.getLong("hbase.regionserver.split.daughter.open.log.interval",
+        10000);
+    }
+
+    @Override
+    public void progress() {
+      long now = System.currentTimeMillis();
+      if (now - lastLog > this.interval) {
+        LOG.info("Opening " + this.hri.getRegionNameAsString());
+        this.lastLog = now;
+      }
+    }
+  }
+
   private static Path getSplitDir(final HRegion r) {
     return new Path(r.getRegionDir(), SPLITDIR);
   }
@@ -358,12 +391,14 @@ class SplitTransaction {
   }
 
   /**
-   * @param hri
+   * @param hri Spec. for daughter region to open.
+   * @param flusher Flusher this region should use.
    * @return Created daughter HRegion.
    * @throws IOException
    * @see #cleanupDaughterRegion(FileSystem, Path, HRegionInfo)
    */
-  HRegion createDaughterRegion(final HRegionInfo hri)
+  HRegion createDaughterRegion(final HRegionInfo hri,
+      final FlushRequester flusher)
   throws IOException {
     // Package private so unit tests have access.
     FileSystem fs = this.parent.getFilesystem();
@@ -371,7 +406,7 @@ class SplitTransaction {
       this.splitdir, hri);
     HRegion r = HRegion.newHRegion(this.parent.getTableDir(),
       this.parent.getLog(), fs, this.parent.getConf(),
-      hri, null);
+      hri, flusher);
     HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());
     return r;
   }

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java Mon Aug 30 22:50:03 2010
@@ -151,7 +151,7 @@ public class OpenRegionHandler extends E
     // Update ZK, ROOT or META
     try {
       this.rsServices.postOpenDeployTasks(region,
-        this.server.getCatalogTracker());
+        this.server.getCatalogTracker(), false);
     } catch (IOException e) {
       // TODO: rollback the open?
       LOG.error("Error updating region location in catalog table", e);

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Mon Aug 30 22:50:03 2010
@@ -106,19 +106,15 @@ public class ReplicationZookeeper {
    * Constructor used by region servers, connects to the peer cluster right away.
    *
    * @param zookeeper
-   * @param conf             conf to use
    * @param replicating    atomic boolean to start/stop replication
-   * @param rsName      the name of this region server, null if
-   *                         using RZH only to use the helping methods
    * @throws IOException
    * @throws KeeperException 
    */
-  public ReplicationZookeeper(final Server server,
-      final Configuration conf, final AtomicBoolean replicating, String rsName)
+  public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
   throws IOException, KeeperException {
     this.abortable = server;
     this.zookeeper = server.getZooKeeper();
-    this.conf = conf;
+    this.conf = server.getConfiguration();
     String replicationZNodeName =
         conf.get("zookeeper.znode.replication", "replication");
     String peersZNodeName =
@@ -157,8 +153,8 @@ public class ReplicationZookeeper {
       (this.replicationMaster ? "master" : "slave") + " for replication" +
         ", compared with (" + address + ")");
 
-    if (rsName != null) {
-      this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, rsName);
+    if (server.getServerName() != null) {
+      this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
       // Set a tracker on replicationStateNodeNode
       ReplicationStatusTracker tracker =
         new ReplicationStatusTracker(this.zookeeper, getRepStateNode(), server);

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Mon Aug 30 22:50:03 2010
@@ -37,10 +37,10 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
 
 /**
- * Replication serves as an umbrella over the setup of replication and
- * is used by HRS.
+ * Gateway to Replication.  Used by {@link HRegionServer}.
  */
 public class Replication implements WALObserver {
   private final boolean replication;
@@ -60,16 +60,16 @@ public class Replication implements WALO
    * @param logDir
    * @param oldLogDir directory where logs are archived
    * @throws IOException
+   * @throws KeeperException 
    */
   public Replication(final Server server, final FileSystem fs,
       final Path logDir, final Path oldLogDir)
-  throws IOException {
+  throws IOException, KeeperException {
     this.server = server;
     this.conf = this.server.getConfiguration();
     this.replication = isReplication(this.conf);
     if (replication) {
-      this.zkHelper = new ReplicationZookeeper(server.getZooKeeper(),
-        this.conf, this.replicating, this.server.getServerName());
+      this.zkHelper = new ReplicationZookeeper(server, this.replicating);
       this.replicationMaster = zkHelper.isReplicationMaster();
       this.replicationManager = this.replicationMaster ?
         new ReplicationSourceManager(zkHelper, conf, this.server,

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Mon Aug 30 22:50:03 2010
@@ -19,6 +19,22 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -28,6 +44,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
@@ -38,22 +55,6 @@ import org.apache.hadoop.hbase.replicati
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 
-import java.io.EOFException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * Class that handles the source of a replication stream.
  * Currently does not handle more than 1 slave
@@ -88,7 +89,7 @@ public class ReplicationSource extends T
   // The manager of all sources to which we ping back our progress
   private ReplicationSourceManager manager;
   // Should we stop everything?
-  private AtomicBoolean stop;
+  private Stoppable stopper;
   // List of chosen sinks (region servers)
   private List<HServerAddress> currentPeers;
   // How long should we sleep for each retry
@@ -139,11 +140,11 @@ public class ReplicationSource extends T
   public void init(final Configuration conf,
                    final FileSystem fs,
                    final ReplicationSourceManager manager,
-                   final AtomicBoolean stopper,
+                   final Stoppable stopper,
                    final AtomicBoolean replicating,
                    final String peerClusterZnode)
       throws IOException {
-    this.stop = stopper;
+    this.stopper = stopper;
     this.conf = conf;
     this.replicationQueueSizeCapacity =
         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
@@ -160,7 +161,7 @@ public class ReplicationSource extends T
             conf.getInt("hbase.regionserver.maxlogs", 32),
             new LogsComparator());
     this.conn = HConnectionManager.getConnection(conf);
- // REENABLE   this.zkHelper = manager.getRepZkWrapper();
+    this.zkHelper = manager.getRepZkWrapper();
     this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
     this.currentPeers = new ArrayList<HServerAddress>();
     this.random = new Random();
@@ -169,7 +170,7 @@ public class ReplicationSource extends T
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);
     this.fs = fs;
- // REENALBE   this.clusterId = Byte.valueOf(zkHelper.getClusterId());
+    this.clusterId = Byte.valueOf(zkHelper.getClusterId());
     this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
 
     // Finally look if this is a recovered queue
@@ -195,24 +196,23 @@ public class ReplicationSource extends T
    * Select a number of peers at random using the ratio. Mininum 1.
    */
   private void chooseSinks() {
-    // REENABLE
-//    this.currentPeers.clear();
-//    List<HServerAddress> addresses =
-//        this.zkHelper.getPeersAddresses(peerClusterId);
-//    Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
-//    int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
-//    LOG.info("Getting " + nbPeers +
-//        " rs from peer cluster # " + peerClusterId);
-//    for (int i = 0; i < nbPeers; i++) {
-//      HServerAddress address;
-//      // Make sure we get one address that we don't already have
-//      do {
-//        address = addresses.get(this.random.nextInt(addresses.size()));
-//      } while (setOfAddr.contains(address));
-//      LOG.info("Choosing peer " + address);
-//      setOfAddr.add(address);
-//    }
-//    this.currentPeers.addAll(setOfAddr);
+    this.currentPeers.clear();
+    List<HServerAddress> addresses =
+        this.zkHelper.getPeersAddresses(peerClusterId);
+    Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
+    int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
+    LOG.info("Getting " + nbPeers +
+        " rs from peer cluster # " + peerClusterId);
+    for (int i = 0; i < nbPeers; i++) {
+      HServerAddress address;
+      // Make sure we get one address that we don't already have
+      do {
+        address = addresses.get(this.random.nextInt(addresses.size()));
+      } while (setOfAddr.contains(address));
+      LOG.info("Choosing peer " + address);
+      setOfAddr.add(address);
+    }
+    this.currentPeers.addAll(setOfAddr);
   }
 
   @Override
@@ -225,7 +225,7 @@ public class ReplicationSource extends T
   public void run() {
     connectToPeers();
     // We were stopped while looping to connect to sinks, just abort
-    if (this.stop.get()) {
+    if (this.stopper.isStopped()) {
       return;
     }
     // If this is recovered, the queue is already full and the first log
@@ -236,7 +236,7 @@ public class ReplicationSource extends T
     }
     int sleepMultiplier = 1;
     // Loop until we close down
-    while (!stop.get() && this.running) {
+    while (!stopper.isStopped() && this.running) {
       // Get a new path
       if (!getNextPath()) {
         if (sleepForRetries("No log to process", sleepMultiplier)) {
@@ -312,7 +312,7 @@ public class ReplicationSource extends T
       // If we didn't get anything to replicate, or if we hit a IOE,
       // wait a bit and retry.
       // But if we need to stop, don't bother sleeping
-      if (!stop.get() && (gotIOE || currentNbEntries == 0)) {
+      if (!stopper.isStopped() && (gotIOE || currentNbEntries == 0)) {
         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
           sleepMultiplier++;
         }
@@ -374,7 +374,7 @@ public class ReplicationSource extends T
 
   private void connectToPeers() {
     // Connect to peer cluster first, unless we have to stop
-    while (!this.stop.get() && this.currentPeers.size() == 0) {
+    while (!this.stopper.isStopped() && this.currentPeers.size() == 0) {
       try {
         chooseSinks();
         Thread.sleep(this.sleepForRetries);
@@ -407,58 +407,57 @@ public class ReplicationSource extends T
    * @return true if we should continue with that file, false if we are over with it
    */
   protected boolean openReader(int sleepMultiplier) {
-    // REENABLE
-//    try {
-//      LOG.info("Opening log for replication " + this.currentPath.getName() +
-//          " at " + this.position);
-//      try {
-//       this.reader = null;
-//       this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
-//      } catch (FileNotFoundException fnfe) {
-//        if (this.queueRecovered) {
-//          // We didn't find the log in the archive directory, look if it still
-//          // exists in the dead RS folder (there could be a chain of failures
-//          // to look at)
-//          for (int i = this.deadRegionServers.length - 1; i > 0; i--) {
-//            Path deadRsDirectory =
-//                new Path(this.manager.getLogDir(), this.deadRegionServers[i]);
-//            Path possibleLogLocation =
-//                new Path(deadRsDirectory, currentPath.getName());
-//            if (this.manager.getFs().exists(possibleLogLocation)) {
-//              // We found the right new location
-//              LOG.info("Log " + this.currentPath + " still exists at " +
-//                  possibleLogLocation);
-//              // Breaking here will make us sleep since reader is null
-//              break;
-//            }
-//          }
-//          // TODO What happens if the log was missing from every single location?
-//          // Although we need to check a couple of times as the log could have
-//          // been moved by the master between the checks
-//        } else {
-//          // If the log was archived, continue reading from there
-//          Path archivedLogLocation =
-//              new Path(manager.getOldLogDir(), currentPath.getName());
-//          if (this.manager.getFs().exists(archivedLogLocation)) {
-//            currentPath = archivedLogLocation;
-//            LOG.info("Log " + this.currentPath + " was moved to " +
-//                archivedLogLocation);
-//            // Open the log at the new location
-//            this.openReader(sleepMultiplier);
-//
-//          }
-//          // TODO What happens the log is missing in both places?
-//        }
-//      }
-//    } catch (IOException ioe) {
-//      LOG.warn(peerClusterZnode + " Got: ", ioe);
-//      // TODO Need a better way to determinate if a file is really gone but
-//      // TODO without scanning all logs dir
-//      if (sleepMultiplier == this.maxRetriesMultiplier) {
-//        LOG.warn("Waited too long for this file, considering dumping");
-//        return !processEndOfFile();
-//      }
-//    }
+    try {
+      LOG.info("Opening log for replication " + this.currentPath.getName() +
+          " at " + this.position);
+      try {
+       this.reader = null;
+       this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
+      } catch (FileNotFoundException fnfe) {
+        if (this.queueRecovered) {
+          // We didn't find the log in the archive directory, look if it still
+          // exists in the dead RS folder (there could be a chain of failures
+          // to look at)
+          for (int i = this.deadRegionServers.length - 1; i > 0; i--) {
+            Path deadRsDirectory =
+                new Path(this.manager.getLogDir(), this.deadRegionServers[i]);
+            Path possibleLogLocation =
+                new Path(deadRsDirectory, currentPath.getName());
+            if (this.manager.getFs().exists(possibleLogLocation)) {
+              // We found the right new location
+              LOG.info("Log " + this.currentPath + " still exists at " +
+                  possibleLogLocation);
+              // Breaking here will make us sleep since reader is null
+              break;
+            }
+          }
+          // TODO What happens if the log was missing from every single location?
+          // Although we need to check a couple of times as the log could have
+          // been moved by the master between the checks
+        } else {
+          // If the log was archived, continue reading from there
+          Path archivedLogLocation =
+              new Path(manager.getOldLogDir(), currentPath.getName());
+          if (this.manager.getFs().exists(archivedLogLocation)) {
+            currentPath = archivedLogLocation;
+            LOG.info("Log " + this.currentPath + " was moved to " +
+                archivedLogLocation);
+            // Open the log at the new location
+            this.openReader(sleepMultiplier);
+
+          }
+          // TODO What happens the log is missing in both places?
+        }
+      }
+    } catch (IOException ioe) {
+      LOG.warn(peerClusterZnode + " Got: ", ioe);
+      // TODO Need a better way to determinate if a file is really gone but
+      // TODO without scanning all logs dir
+      if (sleepMultiplier == this.maxRetriesMultiplier) {
+        LOG.warn("Waited too long for this file, considering dumping");
+        return !processEndOfFile();
+      }
+    }
     return true;
   }
 
@@ -518,47 +517,46 @@ public class ReplicationSource extends T
    * Do the shipping logic
    */
   protected void shipEdits() {
-    // REENABLE
-//    int sleepMultiplier = 1;
-//    while (!stop.get()) {
-//      try {
-//        HRegionInterface rrs = getRS();
-//        LOG.debug("Replicating " + currentNbEntries);
-//        rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
-//        this.manager.logPositionAndCleanOldLogs(this.currentPath,
-//            this.peerClusterZnode, this.position, queueRecovered);
-//        this.totalReplicatedEdits += currentNbEntries;
-//        this.metrics.shippedBatchesRate.inc(1);
-//        this.metrics.shippedOpsRate.inc(
-//            this.currentNbOperations);
-//        this.metrics.setAgeOfLastShippedOp(
-//            this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime());
-//        LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
-//        break;
-//
-//      } catch (IOException ioe) {
-//        LOG.warn("Unable to replicate because ", ioe);
-//        try {
-//          boolean down;
-//          do {
-//            down = isSlaveDown();
-//            if (down) {
-//              LOG.debug("The region server we tried to ping didn't answer, " +
-//                  "sleeping " + sleepForRetries + " times " + sleepMultiplier);
-//              Thread.sleep(this.sleepForRetries * sleepMultiplier);
-//              if (sleepMultiplier < maxRetriesMultiplier) {
-//                sleepMultiplier++;
-//              } else {
-//                chooseSinks();
-//              }
-//            }
-//          } while (!stop.get() && down);
-//        } catch (InterruptedException e) {
-//          LOG.debug("Interrupted while trying to contact the peer cluster");
-//        }
-//
-//      }
-//    }
+    int sleepMultiplier = 1;
+    while (!this.stopper.isStopped()) {
+      try {
+        HRegionInterface rrs = getRS();
+        LOG.debug("Replicating " + currentNbEntries);
+        rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
+        this.manager.logPositionAndCleanOldLogs(this.currentPath,
+            this.peerClusterZnode, this.position, queueRecovered);
+        this.totalReplicatedEdits += currentNbEntries;
+        this.metrics.shippedBatchesRate.inc(1);
+        this.metrics.shippedOpsRate.inc(
+            this.currentNbOperations);
+        this.metrics.setAgeOfLastShippedOp(
+            this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime());
+        LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
+        break;
+
+      } catch (IOException ioe) {
+        LOG.warn("Unable to replicate because ", ioe);
+        try {
+          boolean down;
+          do {
+            down = isSlaveDown();
+            if (down) {
+              LOG.debug("The region server we tried to ping didn't answer, " +
+                  "sleeping " + sleepForRetries + " times " + sleepMultiplier);
+              Thread.sleep(this.sleepForRetries * sleepMultiplier);
+              if (sleepMultiplier < maxRetriesMultiplier) {
+                sleepMultiplier++;
+              } else {
+                chooseSinks();
+              }
+            }
+          } while (!this.stopper.isStopped() && down);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted while trying to contact the peer cluster");
+        }
+
+      }
+    }
   }
 
   /**
@@ -569,16 +567,15 @@ public class ReplicationSource extends T
    * continue trying to read from it
    */
   protected boolean processEndOfFile() {
-    // REENABLE
-//    if (this.queue.size() != 0) {
-//      this.currentPath = null;
-//      this.position = 0;
-//      return true;
-//    } else if (this.queueRecovered) {
-//      this.manager.closeRecoveredQueue(this);
-//      this.abort();
-//      return true;
-//    }
+    if (this.queue.size() != 0) {
+      this.currentPath = null;
+      this.position = 0;
+      return true;
+    } else if (this.queueRecovered) {
+      this.manager.closeRecoveredQueue(this);
+      this.abort();
+      return true;
+    }
     return false;
   }
 
@@ -692,5 +689,4 @@ public class ReplicationSource extends T
       return Long.parseLong(parts[parts.length-1]);
     }
   }
-
 }

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Mon Aug 30 22:50:03 2010
@@ -20,29 +20,25 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * This class is responsible to manage all the replication
  * sources. There are two classes of sources:
@@ -149,7 +145,7 @@ public class ReplicationSourceManager {
       ReplicationSourceInterface src = addSource(id);
       src.startup();
     }
-    List<String> currentReplicators = this.zkHelper.getListOfReplicators(null);
+    List<String> currentReplicators = this.zkHelper.getListOfReplicators();
     synchronized (otherRegionServers) {
       LOG.info("Current list of replicators: " + currentReplicators
           + " other RSs: " + otherRegionServers);
@@ -201,7 +197,7 @@ public class ReplicationSourceManager {
    * @return a sorted set of hlog names
    */
   protected SortedSet<String> getHLogs() {
-    return new TreeSet(this.hlogs);
+    return new TreeSet<String>(this.hlogs);
   }
 
   /**
@@ -255,6 +251,7 @@ public class ReplicationSourceManager {
       final String peerClusterId) throws IOException {
     ReplicationSourceInterface src;
     try {
+      @SuppressWarnings("rawtypes")
       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
           ReplicationSource.class.getCanonicalName()));
       src = (ReplicationSourceInterface) c.newInstance();

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Mon Aug 30 22:50:03 2010
@@ -264,7 +264,8 @@ public class TestFromClientSide {
    * @throws InterruptedException 
    */
   @Test
-  public void testFilterAcrossMutlipleRegions() throws IOException, InterruptedException {
+  public void testFilterAcrossMultipleRegions()
+  throws IOException, InterruptedException {
     byte [] name = Bytes.toBytes("testFilterAcrossMutlipleRegions");
     HTable t = TEST_UTIL.createTable(name, FAMILY);
     int rowCount = TEST_UTIL.loadTable(t, FAMILY);

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java Mon Aug 30 22:50:03 2010
@@ -67,7 +67,7 @@ public class TestSplitTransaction {
     this.fs.delete(this.testdir, true);
     this.wal = new HLog(fs, new Path(this.testdir, "logs"),
       new Path(this.testdir, "archive"),
-      TEST_UTIL.getConfiguration(), null);
+      TEST_UTIL.getConfiguration());
     this.parent = createRegion(this.testdir, this.wal);
   }
 
@@ -174,7 +174,7 @@ public class TestSplitTransaction {
     // Start transaction.
     SplitTransaction st = prepareGOOD_SPLIT_ROW();
     SplitTransaction spiedUponSt = spy(st);
-    when(spiedUponSt.createDaughterRegion(spiedUponSt.getSecondDaughter())).
+    when(spiedUponSt.createDaughterRegion(spiedUponSt.getSecondDaughter(), null)).
       thenThrow(new MockedFailedDaughterCreation());
     // Run the execute.  Look at what it returns.
     boolean expectedException = false;

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Mon Aug 30 22:50:03 2010
@@ -123,7 +123,7 @@ public class TestStore extends TestCase 
     HTableDescriptor htd = new HTableDescriptor(table);
     htd.addFamily(hcd);
     HRegionInfo info = new HRegionInfo(htd, null, null, false);
-    HLog hlog = new HLog(fs, logdir, oldLogDir, conf, null);
+    HLog hlog = new HLog(fs, logdir, oldLogDir, conf);
     HRegion region = new HRegion(basedir, hlog, fs, conf, info, null);
 
     store = new Store(basedir, region, hcd, fs, conf);

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java Mon Aug 30 22:50:03 2010
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.replicat
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 
@@ -39,7 +40,7 @@ public class ReplicationSourceDummy impl
 
   @Override
   public void init(Configuration conf, FileSystem fs,
-                   ReplicationSourceManager manager, AtomicBoolean stopper,
+                   ReplicationSourceManager manager, Stoppable stopper,
                    AtomicBoolean replicating, String peerClusterId)
       throws IOException {
     this.manager = manager;

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Mon Aug 30 22:50:03 2010
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -46,6 +47,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
@@ -59,8 +62,6 @@ public class TestReplicationSourceManage
 
   private static HBaseTestingUtility utility;
 
-  private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
-
   private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
 
   private static ReplicationSourceManager manager;
@@ -97,8 +98,8 @@ public class TestReplicationSourceManage
     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
     utility = new HBaseTestingUtility(conf);
     utility.startMiniZKCluster();
-// REENALBE
-//
+
+    // REENABLE
 //    zkw = ZooKeeperWrapper.createInstance(conf, "test");
 //    zkw.writeZNode("/hbase", "replication", "");
 //    zkw.writeZNode("/hbase/replication", "master",
@@ -107,20 +108,18 @@ public class TestReplicationSourceManage
 //    zkw.writeZNode("/hbase/replication/peers", "1",
 //          conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
 //          conf.get("hbase.zookeeper.property.clientPort")+":/1");
-//
-//    HRegionServer server = new HRegionServer(conf);
-//    ReplicationZookeeperWrapper helper = new ReplicationZookeeperWrapper(
-//        server.getZooKeeperWrapper(), conf,
-//        REPLICATING, "123456789");
-//    fs = FileSystem.get(conf);
-//    oldLogDir = new Path(utility.getTestDir(),
-//        HConstants.HREGION_OLDLOGDIR_NAME);
-//    logDir = new Path(utility.getTestDir(),
-//        HConstants.HREGION_LOGDIR_NAME);
-//
-//    manager = new ReplicationSourceManager(helper,
-//        conf, STOPPER, fs, REPLICATING, logDir, oldLogDir);
-//    manager.addSource("1");
+
+    HRegionServer server = new HRegionServer(conf);
+    ReplicationZookeeper helper = new ReplicationZookeeper(server, REPLICATING);
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getTestDir(),
+        HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getTestDir(),
+        HConstants.HREGION_LOGDIR_NAME);
+
+    manager = new ReplicationSourceManager(helper,
+        conf, server, fs, REPLICATING, logDir, oldLogDir);
+    manager.addSource("1");
 
     htd = new HTableDescriptor(test);
     HColumnDescriptor col = new HColumnDescriptor("f1");
@@ -160,11 +159,12 @@ public class TestReplicationSourceManage
     KeyValue kv = new KeyValue(r1, f1, r1);
     WALEdit edit = new WALEdit();
     edit.add(kv);
-
-    HLog hlog = new HLog(fs, logDir, oldLogDir, conf, null, manager,
+    List<WALObserver> listeners = new ArrayList<WALObserver>();
+// REENABLE    listeners.add(manager);
+    HLog hlog = new HLog(fs, logDir, oldLogDir, conf, listeners,
       URLEncoder.encode("regionserver:60020", "UTF8"));
 
- // REENABLE     manager.init();
+    manager.init();
 
     // Testing normal log rolling every 20
     for(long i = 1; i < 101; i++) {
@@ -190,7 +190,7 @@ public class TestReplicationSourceManage
       hlog.append(hri, key, edit);
     }
 
- // REENABLE    assertEquals(6, manager.getHLogs().size());
+    assertEquals(6, manager.getHLogs().size());
 
     hlog.rollWriter();