You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/31 00:50:04 UTC
svn commit: r991014 - in /hbase/branches/0.90_master_rewrite: ./
src/main/java/org/apache/hadoop/hbase/catalog/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/io/
src/main/java/org/apache/hadoop/hbase/ipc/ src/main/...
Author: stack
Date: Mon Aug 30 22:50:03 2010
New Revision: 991014
URL: http://svn.apache.org/viewvc?rev=991014&view=rev
Log:
Added move region command. Added balancer on/off command.
Brought forward the replication zk stuff (J-D offered to fix up
the mess I made). Fixes to make tests pass.
M BRANCH_TODO.txt
Added some TODOs
M src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
Fix to match new method args.
M src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
Changed constructor params for HLog.
M src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
Changed to take Stoppable.
M src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Changed to take Stoppable.
M src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
Formatting.
M src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
Use MetaEditor adding daughters. Use new open method on HRegion and the
postOpenDeployTasks. Run open of the daughters in parallel.
M src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Minor changes. Added flag to postOpenDeployTasks to denote whether or
not region is a daughter of a split.
M src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
postOpenDeployTasks takes a boolean.
M src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
Pass server and services instead of catalogtracker and server.
M src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Added an open method t hat does initialize and sequenceid handling.
M src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
Added utility method that gets HRI and HSI for passed encoded region name.
M src/main/java/org/apache/hadoop/hbase/master/HMaster.java
(balance, move): Added.
M src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
Remoe unused imports.
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
Reenabled bits of code.
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
Javadoc, let out KE.
M src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Reenabled code.
M src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
Take a Server instead of individual args.
M src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java
Refactored some. Added a addDaughter method.
M src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
Added move and balance.
M src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
Removed unused imports.
M src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
Added move and balance.
Modified:
hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Modified: hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt (original)
+++ hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt Mon Aug 30 22:50:03 2010
@@ -290,3 +290,6 @@ Later:
So on assign, if we fail -- say connection refused when we try open on the RS, regions state remains offline -- who comes along and finds all offline and assigns?
+
+TODO:
++ Add test to prove move region works.
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java Mon Aug 30 22:50:03 2010
@@ -55,7 +55,7 @@ public class MetaEditor {
Writables.getBytes(regionInfo));
catalogTracker.waitForMetaServerConnectionDefault().put(
CatalogTracker.META_REGION, put);
- LOG.info("Added region " + regionInfo + " to META");
+ LOG.info("Added region " + regionInfo.getRegionNameAsString() + " to META");
}
/**
@@ -71,9 +71,11 @@ public class MetaEditor {
public static void offlineParentInMeta(CatalogTracker catalogTracker,
HRegionInfo parent, final HRegionInfo a, final HRegionInfo b)
throws NotAllMetaRegionsOnlineException, IOException {
- Put put = new Put(parent.getRegionName());
- put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
- Writables.getBytes(parent));
+ HRegionInfo copyOfParent = new HRegionInfo(parent);
+ copyOfParent.setOffline(true);
+ copyOfParent.setSplit(true);
+ Put put = new Put(copyOfParent.getRegionName());
+ addRegionInfo(put, copyOfParent);
put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
HConstants.EMPTY_BYTE_ARRAY);
put.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
@@ -83,7 +85,23 @@ public class MetaEditor {
put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITB_QUALIFIER,
Writables.getBytes(b));
catalogTracker.waitForMetaServerConnectionDefault().put(CatalogTracker.META_REGION, put);
- LOG.info("Offlined parent region " + parent + " in META");
+ LOG.info("Offlined parent region " + parent.getRegionNameAsString() +
+ " in META");
+ }
+
+ public static void addDaughter(final CatalogTracker catalogTracker,
+ final HRegionInfo regionInfo, final HServerInfo serverInfo)
+ throws NotAllMetaRegionsOnlineException, IOException {
+ HRegionInterface server = catalogTracker.waitForRootServerConnectionDefault();
+ byte [] catalogRegionName = CatalogTracker.META_REGION;
+ Put put = new Put(regionInfo.getRegionName());
+ addRegionInfo(put, regionInfo);
+ addLocation(put, serverInfo);
+ server.put(catalogRegionName, put);
+ LOG.info("Added daughter " + regionInfo.getRegionNameAsString() +
+ " in region " + Bytes.toString(catalogRegionName) + " with " +
+ "server=" + serverInfo.getHostnamePort() + ", " +
+ "startcode=" + serverInfo.getStartCode());
}
/**
@@ -140,10 +158,7 @@ public class MetaEditor {
byte [] catalogRegionName, HRegionInfo regionInfo, HServerInfo serverInfo)
throws IOException {
Put put = new Put(regionInfo.getRegionName());
- put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
- Bytes.toBytes(serverInfo.getHostnamePort()));
- put.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
- Bytes.toBytes(serverInfo.getStartCode()));
+ addLocation(put, serverInfo);
server.put(catalogRegionName, put);
LOG.info("Updated row " + regionInfo.getRegionNameAsString() +
" in region " + Bytes.toString(catalogRegionName) + " with " +
@@ -164,7 +179,7 @@ public class MetaEditor {
catalogTracker.waitForMetaServerConnectionDefault().delete(
CatalogTracker.META_REGION, delete);
- LOG.info("Deleted region " + regionInfo + " from META");
+ LOG.info("Deleted region " + regionInfo.getRegionNameAsString() + " from META");
}
/**
@@ -177,10 +192,24 @@ public class MetaEditor {
HRegionInfo regionInfo)
throws IOException {
Put put = new Put(regionInfo.getRegionName());
- put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
- Writables.getBytes(regionInfo));
+ addRegionInfo(put, regionInfo);
catalogTracker.waitForMetaServerConnectionDefault().put(
CatalogTracker.META_REGION, put);
- LOG.info("Updated region " + regionInfo + " in META");
+ LOG.info("Updated region " + regionInfo.getRegionNameAsString() + " in META");
+ }
+
+ private static Put addRegionInfo(final Put p, final HRegionInfo hri)
+ throws IOException {
+ p.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
+ Writables.getBytes(hri));
+ return p;
+ }
+
+ private static Put addLocation(final Put p, final HServerInfo hsi) {
+ p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
+ Bytes.toBytes(hsi.getHostnamePort()));
+ p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
+ Bytes.toBytes(hsi.getStartCode()));
+ return p;
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Mon Aug 30 22:50:03 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.MasterNot
import org.apache.hadoop.hbase.RegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
@@ -854,6 +855,31 @@ public class HBaseAdmin implements Abort
}
/**
+ * Move the region <code>r</code> to <code>dest</code>.
+ * @param encodedRegionName The encoded region name.
+ * @param destServerName The servername of the destination regionserver
+ * @throws UnknownRegionException Thrown if we can't find a region named
+ * <code>encodedRegionName</code>
+ * @throws ZooKeeperConnectionException
+ * @throws MasterNotRunningException
+ */
+ public void move(final byte [] encodedRegionName, final byte [] destServerName)
+ throws UnknownRegionException, MasterNotRunningException, ZooKeeperConnectionException {
+ getMaster().move(encodedRegionName, destServerName);
+ }
+
+ /**
+ * @param b If true, enable balancer. If false, disable balancer.
+ * @return Previous balancer value
+ * @throws ZooKeeperConnectionException
+ * @throws MasterNotRunningException
+ */
+ public boolean balance(final boolean b)
+ throws MasterNotRunningException, ZooKeeperConnectionException {
+ return getMaster().balance(b);
+ }
+
+ /**
* Split a table or an individual region.
* Asynchronous operation.
*
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Mon Aug 30 22:50:03 2010
@@ -183,7 +183,6 @@ public class HbaseObjectWritable impleme
addToMap(HLog.Entry[].class, code++);
addToMap(HLogKey.class, code++);
- // List
addToMap(List.class, code++);
addToMap(NavigableSet.class, code++);
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java Mon Aug 30 22:50:03 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.UnknownRegionException;
/**
* Clients interact with the HMasterInterface to gain access to meta-level
@@ -130,4 +131,21 @@ public interface HMasterInterface extend
* @return status object
*/
public ClusterStatus getClusterStatus();
+
+
+ /**
+ * Move the region <code>r</code> to <code>dest</code>.
+ * @param encodedRegionName The encoded region name.
+ * @param destServerName The servername of the destination regionserver
+ * @throws UnknownRegionException Thrown if we can't find a region named
+ * <code>encodedRegionName</code>
+ */
+ public void move(final byte [] encodedRegionName, final byte [] destServerName)
+ throws UnknownRegionException;
+
+ /**
+ * @param b If true, enable balancer. If false, disable balancer.
+ * @return Previous balancer value
+ */
+ public boolean balance(final boolean b);
}
\ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Mon Aug 30 22:50:03 2010
@@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
-import java.util.Collection;
import java.util.List;
import java.util.NavigableSet;
@@ -36,7 +35,6 @@ import org.apache.hadoop.hbase.client.Mu
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
/**
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Mon Aug 30 22:50:03 2010
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.master.Lo
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
@@ -1059,6 +1060,23 @@ public class AssignmentManager extends Z
}
/**
+ * @param encodedRegionName Region encoded name.
+ * @return Null or a {@link Pair} instance that holds the full {@link HRegionInfo}
+ * and the hosting servers {@link HServerInfo}.
+ */
+ Pair<HRegionInfo, HServerInfo> getAssignment(final byte [] encodedRegionName) {
+ String name = Bytes.toString(encodedRegionName);
+ synchronized(this.regions) {
+ for (Map.Entry<HRegionInfo, HServerInfo> e: this.regions.entrySet()) {
+ if (e.getKey().getEncodedName().equals(name)) {
+ return new Pair<HRegionInfo, HServerInfo>(e.getKey(), e.getValue());
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
* @param plan Plan to execute.
*/
void balance(final RegionPlan plan) {
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Mon Aug 30 22:50:03 2010
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
@@ -169,6 +170,7 @@ implements HMasterInterface, HMasterRegi
private LoadBalancer balancer = new LoadBalancer();
private Chore balancerChore;
+ private volatile boolean balance = true;
/**
* Initializes the HMaster. The steps are as follows:
@@ -538,6 +540,8 @@ implements HMasterInterface, HMasterRegi
* Run the balancer.
*/
public void balance() {
+ // If balance not true, don't run balancer.
+ if (!this.balance) return;
synchronized (this.balancer) {
// Only allow one balance run at at time.
if (this.assignmentManager.isRegionsInTransition()) {
@@ -563,6 +567,25 @@ implements HMasterInterface, HMasterRegi
}
}
+ @Override
+ public boolean balance(final boolean b) {
+ boolean oldValue = this.balance;
+ this.balance = b;
+ LOG.info("Balance=" + b);
+ return oldValue;
+ }
+
+ @Override
+ public void move(final byte[] encodedRegionName, final byte[] destServerName)
+ throws UnknownRegionException {
+ Pair<HRegionInfo, HServerInfo> p =
+ this.assignmentManager.getAssignment(encodedRegionName);
+ if (p == null) throw new UnknownRegionException(Bytes.toString(encodedRegionName));
+ HServerInfo dest = this.serverManager.getServerInfo(new String(destServerName));
+ RegionPlan rp = new RegionPlan(p.getFirst(), p.getSecond(), dest);
+ this.assignmentManager.balance(rp);
+ }
+
public void createTable(HTableDescriptor desc, byte [][] splitKeys)
throws IOException {
createTable(desc, splitKeys, false);
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java Mon Aug 30 22:50:03 2010
@@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.master;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
@@ -37,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.io.Writable;
/**
* Makes decisions about the placement and movement of Regions across
@@ -520,11 +523,12 @@ public class LoadBalancer {
* information and not the source/dest server info.
*/
public static class RegionPlan implements Comparable<RegionPlan> {
-
private final HRegionInfo hri;
private final HServerInfo source;
private HServerInfo dest;
+
+
/**
* Instantiate a plan for a region move, moving the specified region from
* the specified source server to the specified destination server.
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Mon Aug 30 22:50:03 2010
@@ -149,7 +149,7 @@ public class CompactSplitThread extends
// the prepare call -- we are not ready to split just now. Just return.
if (!st.prepare()) return;
try {
- st.execute(this.server, this.server.getCatalogTracker());
+ st.execute(this.server, this.server);
} catch (IOException ioe) {
try {
LOG.info("Running rollback of failed split of " +
@@ -172,8 +172,9 @@ public class CompactSplitThread extends
this.server.reportSplit(parent.getRegionInfo(), st.getFirstDaughter(),
st.getSecondDaughter());
LOG.info("Region split, META updated, and report to master. Parent=" +
- parent.getRegionInfo() + ", new regions: " +
- st.getFirstDaughter() + ", " + st.getSecondDaughter() + ". Split took " +
+ parent.getRegionInfo().getRegionNameAsString() + ", new regions: " +
+ st.getFirstDaughter().getRegionNameAsString() + ", " +
+ st.getSecondDaughter().getRegionNameAsString() + ". Split took " +
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Aug 30 22:50:03 2010
@@ -2422,11 +2422,23 @@ public class HRegion implements HeapSize
info.getTableDesc().getName());
HRegion r = HRegion.newHRegion(dir, wal, FileSystem.get(conf), conf, info,
flusher);
- long seqid = r.initialize(reporter);
- if (wal != null) {
- wal.setSequenceNumber(seqid);
+ return r.openHRegion(reporter);
+ }
+
+ /**
+ * Open HRegion.
+ * Calls initialize and sets sequenceid.
+ * @param reporter
+ * @return Returns <code>this</code>
+ * @throws IOException
+ */
+ HRegion openHRegion(final Progressable reporter)
+ throws IOException {
+ long seqid = initialize(reporter);
+ if (this.log != null) {
+ this.log.setSequenceNumber(seqid);
}
- return r;
+ return this;
}
/**
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Aug 30 22:50:03 2010
@@ -905,8 +905,12 @@ public class HRegionServer implements HR
// Instantiate replication manager if replication enabled. Pass it the
// log directories.
- this.replicationHandler = Replication.isReplication(this.conf)?
- new Replication(this, this.fs, logdir, oldLogDir): null;
+ try {
+ this.replicationHandler = Replication.isReplication(this.conf)?
+ new Replication(this, this.fs, logdir, oldLogDir): null;
+ } catch (KeeperException e) {
+ throw new IOException("Failed replication handler create", e);
+ }
return instantiateHLog(logdir, oldLogDir);
}
@@ -1133,7 +1137,8 @@ public class HRegionServer implements HR
}
@Override
- public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct)
+ public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct,
+ final boolean daughter)
throws KeeperException, IOException {
// Do checks to see if we need to compact (references or too many files)
if (r.hasReferences() || r.hasTooManyStoreFiles()) {
@@ -1147,11 +1152,16 @@ public class HRegionServer implements HR
if (r.getRegionInfo().isRootRegion()) {
RootLocationEditor.setRootLocation(getZooKeeper(),
getServerInfo().getServerAddress());
- } else if(r.getRegionInfo().isMetaRegion()) {
+ } else if (r.getRegionInfo().isMetaRegion()) {
// TODO: doh, this has weird naming between RootEditor/MetaEditor
MetaEditor.updateMetaLocation(ct, r.getRegionInfo(), getServerInfo());
} else {
- MetaEditor.updateRegionLocation(ct, r.getRegionInfo(), getServerInfo());
+ if (daughter) {
+ // If daughter of a split, update whole row, not just location.
+ MetaEditor.addDaughter(ct, r.getRegionInfo(), getServerInfo());
+ } else {
+ MetaEditor.updateRegionLocation(ct, r.getRegionInfo(), getServerInfo());
+ }
}
}
@@ -2026,7 +2036,8 @@ public class HRegionServer implements HR
try {
region = getOnlineRegion(regionName);
if (region == null) {
- throw new NotServingRegionException("Region is not online: " + regionName);
+ throw new NotServingRegionException("Region is not online: " +
+ Bytes.toStringBinary(regionName));
}
return region;
} finally {
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java Mon Aug 30 22:50:03 2010
@@ -53,9 +53,11 @@ public interface RegionServerServices ex
* regionserver
* @param r Region to open.
* @param ct Instance of {@link CatalogTracker}
+ * @param daughter True if this is daughter of a split
* @throws KeeperException
* @throws IOException
*/
- public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct)
+ public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct,
+ final boolean daughter)
throws KeeperException, IOException;
}
\ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java Mon Aug 30 22:50:03 2010
@@ -32,7 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.Reference.Range;
@@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.hadoop.util.Progressable;
+import org.apache.zookeeper.KeeperException;
/**
* Executes region split as a "transaction". Call {@link #prepare()} to setup
@@ -112,6 +114,8 @@ class SplitTransaction {
/**
* Constructor
+ * @param services So we can online new servces. If null, we'll skip onlining
+ * (Useful testing).
* @param c Configuration to use running split
* @param r Region to split
* @param splitrow Row to split around
@@ -176,40 +180,23 @@ class SplitTransaction {
/**
* Run the transaction.
- * @param or Object that can online/offline parent region. Can be null
- * @param ct CatalogTracker instance.
+ * @param server Hosting server instance.
+ * @param services Used to online/offline regions.
* @throws IOException If thrown, transaction failed. Call {@link #rollback(OnlineRegions)}
* @return Regions created
* @see #rollback(OnlineRegions)
*/
- public PairOfSameType<HRegion> execute(final OnlineRegions or,
- final CatalogTracker ct)
- throws IOException {
- return execute(or, ct, or != null);
- }
-
- /**
- * Run the transaction.
- * @param or Object that can online/offline parent region. Can be null (Tests
- * will pass null).
- * @param ct CatalogTracker instance. Can be null (for testing)
- * @param updateMeta If <code>true</code>, update meta (set to false when testing).
- * @throws IOException If thrown, transaction failed. Call {@link #rollback(OnlineRegions)}
- * @return Regions created
- * @see #rollback(OnlineRegions)
- */
- PairOfSameType<HRegion> execute(final OnlineRegions or, final CatalogTracker ct,
- final boolean updateMeta)
+ PairOfSameType<HRegion> execute(final Server server,
+ final RegionServerServices services)
throws IOException {
LOG.info("Starting split of region " + this.parent);
if (!this.parent.lock.writeLock().isHeldByCurrentThread()) {
throw new SplitAndCloseWriteLockNotHeld();
}
- // We'll need one of these later but get it now because if we fail there
- // is nothing to undo.
- HTable t = null;
- if (updateMeta) t = getTable(this.parent.getConf());
+ // If true, no cluster to write meta edits into.
+ boolean testing =
+ server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
createSplitDir(this.parent.getFilesystem(), this.splitdir);
this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
@@ -217,8 +204,8 @@ class SplitTransaction {
List<StoreFile> hstoreFilesToSplit = this.parent.close(false);
this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
- if (or != null) {
- or.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
+ if (!testing) {
+ services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
}
this.journal.add(JournalEntry.OFFLINED_PARENT);
@@ -232,55 +219,39 @@ class SplitTransaction {
// stuff in fs that needs cleanup -- a storefile or two. Thats why we
// add entry to journal BEFORE rather than AFTER the change.
this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
- HRegion a = createDaughterRegion(this.hri_a);
+ HRegion a = createDaughterRegion(this.hri_a, this.parent.flushRequester);
// Ditto
this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
- HRegion b = createDaughterRegion(this.hri_b);
+ HRegion b = createDaughterRegion(this.hri_b, this.parent.flushRequester);
// Edit parent in meta
- if (ct != null) {
- MetaEditor.offlineParentInMeta(ct, this.parent.getRegionInfo(),
- a.getRegionInfo(), b.getRegionInfo());
- }
-
- // The is the point of no return. We are committed to the split now. Up to
- // a failure editing parent in meta or a crash of the hosting regionserver,
- // we could rollback (or, if crash, we could cleanup on redeploy) but now
- // meta has been changed, we can only go forward. If the below last steps
- // do not complete, repair has to be done by another agent. For example,
- // basescanner, at least up till master rewrite, would add daughter rows if
- // missing from meta. It could do this because the parent edit includes the
- // daughter specs. In Bigtable paper, they have another mechanism where
- // some feedback to the master somehow flags it that split is incomplete and
- // needs fixup. Whatever the mechanism, its a TODO that we have some fixup.
-
- // I looked at writing the put of the parent edit above out to the WAL log
- // before changing meta with the notion that should we fail, then on replay
- // the offlining of the parent and addition of daughters up into meta could
- // be reinserted. The edits would have to be 'special' and given how our
- // splits work, splitting by region, I think the replay would have to happen
- // inside in the split code -- as soon as it saw one of these special edits,
- // rather than write the edit out a file for the .META. region to replay or
- // somehow, write it out to this regions edits file for it to handle on
- // redeploy -- this'd be whacky, we'd be telling meta about a split during
- // the deploy of the parent -- instead we'd have to play the edit inside
- // in the split code somehow; this would involve a stop-the-splitting till
- // meta had been edited which might hold up splitting a good while.
-
- // Finish up the meta edits. If these fail, another agent needs to do fixup
- HRegionInfo hri = a.getRegionInfo();
- try {
- if (ct != null) MetaEditor.addRegionToMeta(ct, hri);
- hri = b.getRegionInfo();
- if (ct != null) MetaEditor.addRegionToMeta(ct, hri);
- } catch (IOException e) {
- // Don't let this out or we'll run rollback.
- LOG.warn("Failed adding daughter " + hri.toString());
- }
- // This should not fail because the HTable instance we are using is not
- // running a buffer -- its immediately flushing its puts.
- if (t != null) t.close();
+ if (!testing) {
+ MetaEditor.offlineParentInMeta(server.getCatalogTracker(),
+ this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo());
+ }
+
+ // The is the point of no return. We are committed to the split now. We
+ // have still the daughter regions to open but meta has been changed.
+ // If we fail from here on out, we can not rollback so, we'll just abort.
+ // The meta has been changed though so there will need to be a fixup run
+ // during processing of the crashed server by master (TODO: Verify this in place).
+
+ // TODO: Could we be smarter about the sequence in which we do these steps?
+
+ if (!testing) {
+ // Open daughters in parallel.
+ DaughterOpener aOpener = new DaughterOpener(server, services, a);
+ DaughterOpener bOpener = new DaughterOpener(server, services, b);
+ aOpener.start();
+ bOpener.start();
+ try {
+ aOpener.join();
+ bOpener.join();
+ } catch (InterruptedException e) {
+ server.abort("Exception running daughter opens", e);
+ }
+ }
// Unlock if successful split.
this.parent.lock.writeLock().unlock();
@@ -291,6 +262,68 @@ class SplitTransaction {
return new PairOfSameType<HRegion>(a, b);
}
+ class DaughterOpener extends Thread {
+ private final RegionServerServices services;
+ private final Server server;
+ private final HRegion r;
+
+ DaughterOpener(final Server s, final RegionServerServices services, final HRegion r) {
+ super(s.getServerName() + "-daughterOpener=" + r.getRegionInfo().getEncodedName());
+ this.services = services;
+ this.server = s;
+ this.r = r;
+ }
+
+ @Override
+ public void run() {
+ try {
+ openDaughterRegion(this.server, this.services, r);
+ } catch (Throwable t) {
+ this.server.abort("Failed open of daughter " +
+ this.r.getRegionInfo().getRegionNameAsString(), t);
+ }
+ }
+ }
+
+ /**
+ * Open daughter regions, add them to online list and update meta.
+ * @param server
+ * @param services
+ * @param daughter
+ * @throws IOException
+ * @throws KeeperException
+ */
+ void openDaughterRegion(final Server server,
+ final RegionServerServices services, final HRegion daughter)
+ throws IOException, KeeperException {
+ HRegionInfo hri = daughter.getRegionInfo();
+ LoggingProgressable reporter =
+ new LoggingProgressable(hri, server.getConfiguration());
+ HRegion r = daughter.openHRegion(reporter);
+ services.postOpenDeployTasks(r, server.getCatalogTracker(), true);
+ }
+
+ static class LoggingProgressable implements Progressable {
+ private final HRegionInfo hri;
+ private long lastLog = -1;
+ private final long interval;
+
+ LoggingProgressable(final HRegionInfo hri, final Configuration c) {
+ this.hri = hri;
+ this.interval = c.getLong("hbase.regionserver.split.daughter.open.log.interval",
+ 10000);
+ }
+
+ @Override
+ public void progress() {
+ long now = System.currentTimeMillis();
+ if (now - lastLog > this.interval) {
+ LOG.info("Opening " + this.hri.getRegionNameAsString());
+ this.lastLog = now;
+ }
+ }
+ }
+
private static Path getSplitDir(final HRegion r) {
return new Path(r.getRegionDir(), SPLITDIR);
}
@@ -358,12 +391,14 @@ class SplitTransaction {
}
/**
- * @param hri
+ * @param hri Spec. for daughter region to open.
+ * @param flusher Flusher this region should use.
* @return Created daughter HRegion.
* @throws IOException
* @see #cleanupDaughterRegion(FileSystem, Path, HRegionInfo)
*/
- HRegion createDaughterRegion(final HRegionInfo hri)
+ HRegion createDaughterRegion(final HRegionInfo hri,
+ final FlushRequester flusher)
throws IOException {
// Package private so unit tests have access.
FileSystem fs = this.parent.getFilesystem();
@@ -371,7 +406,7 @@ class SplitTransaction {
this.splitdir, hri);
HRegion r = HRegion.newHRegion(this.parent.getTableDir(),
this.parent.getLog(), fs, this.parent.getConf(),
- hri, null);
+ hri, flusher);
HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());
return r;
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java Mon Aug 30 22:50:03 2010
@@ -151,7 +151,7 @@ public class OpenRegionHandler extends E
// Update ZK, ROOT or META
try {
this.rsServices.postOpenDeployTasks(region,
- this.server.getCatalogTracker());
+ this.server.getCatalogTracker(), false);
} catch (IOException e) {
// TODO: rollback the open?
LOG.error("Error updating region location in catalog table", e);
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Mon Aug 30 22:50:03 2010
@@ -106,19 +106,15 @@ public class ReplicationZookeeper {
* Constructor used by region servers, connects to the peer cluster right away.
*
* @param zookeeper
- * @param conf conf to use
* @param replicating atomic boolean to start/stop replication
- * @param rsName the name of this region server, null if
- * using RZH only to use the helping methods
* @throws IOException
* @throws KeeperException
*/
- public ReplicationZookeeper(final Server server,
- final Configuration conf, final AtomicBoolean replicating, String rsName)
+ public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
throws IOException, KeeperException {
this.abortable = server;
this.zookeeper = server.getZooKeeper();
- this.conf = conf;
+ this.conf = server.getConfiguration();
String replicationZNodeName =
conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName =
@@ -157,8 +153,8 @@ public class ReplicationZookeeper {
(this.replicationMaster ? "master" : "slave") + " for replication" +
", compared with (" + address + ")");
- if (rsName != null) {
- this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, rsName);
+ if (server.getServerName() != null) {
+ this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
// Set a tracker on replicationStateNodeNode
ReplicationStatusTracker tracker =
new ReplicationStatusTracker(this.zookeeper, getRepStateNode(), server);
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Mon Aug 30 22:50:03 2010
@@ -37,10 +37,10 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
/**
- * Replication serves as an umbrella over the setup of replication and
- * is used by HRS.
+ * Gateway to Replication. Used by {@link HRegionServer}.
*/
public class Replication implements WALObserver {
private final boolean replication;
@@ -60,16 +60,16 @@ public class Replication implements WALO
* @param logDir
* @param oldLogDir directory where logs are archived
* @throws IOException
+ * @throws KeeperException
*/
public Replication(final Server server, final FileSystem fs,
final Path logDir, final Path oldLogDir)
- throws IOException {
+ throws IOException, KeeperException {
this.server = server;
this.conf = this.server.getConfiguration();
this.replication = isReplication(this.conf);
if (replication) {
- this.zkHelper = new ReplicationZookeeper(server.getZooKeeper(),
- this.conf, this.replicating, this.server.getServerName());
+ this.zkHelper = new ReplicationZookeeper(server, this.replicating);
this.replicationMaster = zkHelper.isReplicationMaster();
this.replicationManager = this.replicationMaster ?
new ReplicationSourceManager(zkHelper, conf, this.server,
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Mon Aug 30 22:50:03 2010
@@ -19,6 +19,22 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -28,6 +44,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
@@ -38,22 +55,6 @@ import org.apache.hadoop.hbase.replicati
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
-import java.io.EOFException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* Class that handles the source of a replication stream.
* Currently does not handle more than 1 slave
@@ -88,7 +89,7 @@ public class ReplicationSource extends T
// The manager of all sources to which we ping back our progress
private ReplicationSourceManager manager;
// Should we stop everything?
- private AtomicBoolean stop;
+ private Stoppable stopper;
// List of chosen sinks (region servers)
private List<HServerAddress> currentPeers;
// How long should we sleep for each retry
@@ -139,11 +140,11 @@ public class ReplicationSource extends T
public void init(final Configuration conf,
final FileSystem fs,
final ReplicationSourceManager manager,
- final AtomicBoolean stopper,
+ final Stoppable stopper,
final AtomicBoolean replicating,
final String peerClusterZnode)
throws IOException {
- this.stop = stopper;
+ this.stopper = stopper;
this.conf = conf;
this.replicationQueueSizeCapacity =
this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
@@ -160,7 +161,7 @@ public class ReplicationSource extends T
conf.getInt("hbase.regionserver.maxlogs", 32),
new LogsComparator());
this.conn = HConnectionManager.getConnection(conf);
- // REENABLE this.zkHelper = manager.getRepZkWrapper();
+ this.zkHelper = manager.getRepZkWrapper();
this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
this.currentPeers = new ArrayList<HServerAddress>();
this.random = new Random();
@@ -169,7 +170,7 @@ public class ReplicationSource extends T
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs;
- // REENALBE this.clusterId = Byte.valueOf(zkHelper.getClusterId());
+ this.clusterId = Byte.valueOf(zkHelper.getClusterId());
this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
// Finally look if this is a recovered queue
@@ -195,24 +196,23 @@ public class ReplicationSource extends T
* Select a number of peers at random using the ratio. Mininum 1.
*/
private void chooseSinks() {
- // REENABLE
-// this.currentPeers.clear();
-// List<HServerAddress> addresses =
-// this.zkHelper.getPeersAddresses(peerClusterId);
-// Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
-// int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
-// LOG.info("Getting " + nbPeers +
-// " rs from peer cluster # " + peerClusterId);
-// for (int i = 0; i < nbPeers; i++) {
-// HServerAddress address;
-// // Make sure we get one address that we don't already have
-// do {
-// address = addresses.get(this.random.nextInt(addresses.size()));
-// } while (setOfAddr.contains(address));
-// LOG.info("Choosing peer " + address);
-// setOfAddr.add(address);
-// }
-// this.currentPeers.addAll(setOfAddr);
+ this.currentPeers.clear();
+ List<HServerAddress> addresses =
+ this.zkHelper.getPeersAddresses(peerClusterId);
+ Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
+ int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
+ LOG.info("Getting " + nbPeers +
+ " rs from peer cluster # " + peerClusterId);
+ for (int i = 0; i < nbPeers; i++) {
+ HServerAddress address;
+ // Make sure we get one address that we don't already have
+ do {
+ address = addresses.get(this.random.nextInt(addresses.size()));
+ } while (setOfAddr.contains(address));
+ LOG.info("Choosing peer " + address);
+ setOfAddr.add(address);
+ }
+ this.currentPeers.addAll(setOfAddr);
}
@Override
@@ -225,7 +225,7 @@ public class ReplicationSource extends T
public void run() {
connectToPeers();
// We were stopped while looping to connect to sinks, just abort
- if (this.stop.get()) {
+ if (this.stopper.isStopped()) {
return;
}
// If this is recovered, the queue is already full and the first log
@@ -236,7 +236,7 @@ public class ReplicationSource extends T
}
int sleepMultiplier = 1;
// Loop until we close down
- while (!stop.get() && this.running) {
+ while (!stopper.isStopped() && this.running) {
// Get a new path
if (!getNextPath()) {
if (sleepForRetries("No log to process", sleepMultiplier)) {
@@ -312,7 +312,7 @@ public class ReplicationSource extends T
// If we didn't get anything to replicate, or if we hit a IOE,
// wait a bit and retry.
// But if we need to stop, don't bother sleeping
- if (!stop.get() && (gotIOE || currentNbEntries == 0)) {
+ if (!stopper.isStopped() && (gotIOE || currentNbEntries == 0)) {
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
sleepMultiplier++;
}
@@ -374,7 +374,7 @@ public class ReplicationSource extends T
private void connectToPeers() {
// Connect to peer cluster first, unless we have to stop
- while (!this.stop.get() && this.currentPeers.size() == 0) {
+ while (!this.stopper.isStopped() && this.currentPeers.size() == 0) {
try {
chooseSinks();
Thread.sleep(this.sleepForRetries);
@@ -407,58 +407,57 @@ public class ReplicationSource extends T
* @return true if we should continue with that file, false if we are over with it
*/
protected boolean openReader(int sleepMultiplier) {
- // REENABLE
-// try {
-// LOG.info("Opening log for replication " + this.currentPath.getName() +
-// " at " + this.position);
-// try {
-// this.reader = null;
-// this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
-// } catch (FileNotFoundException fnfe) {
-// if (this.queueRecovered) {
-// // We didn't find the log in the archive directory, look if it still
-// // exists in the dead RS folder (there could be a chain of failures
-// // to look at)
-// for (int i = this.deadRegionServers.length - 1; i > 0; i--) {
-// Path deadRsDirectory =
-// new Path(this.manager.getLogDir(), this.deadRegionServers[i]);
-// Path possibleLogLocation =
-// new Path(deadRsDirectory, currentPath.getName());
-// if (this.manager.getFs().exists(possibleLogLocation)) {
-// // We found the right new location
-// LOG.info("Log " + this.currentPath + " still exists at " +
-// possibleLogLocation);
-// // Breaking here will make us sleep since reader is null
-// break;
-// }
-// }
-// // TODO What happens if the log was missing from every single location?
-// // Although we need to check a couple of times as the log could have
-// // been moved by the master between the checks
-// } else {
-// // If the log was archived, continue reading from there
-// Path archivedLogLocation =
-// new Path(manager.getOldLogDir(), currentPath.getName());
-// if (this.manager.getFs().exists(archivedLogLocation)) {
-// currentPath = archivedLogLocation;
-// LOG.info("Log " + this.currentPath + " was moved to " +
-// archivedLogLocation);
-// // Open the log at the new location
-// this.openReader(sleepMultiplier);
-//
-// }
-// // TODO What happens the log is missing in both places?
-// }
-// }
-// } catch (IOException ioe) {
-// LOG.warn(peerClusterZnode + " Got: ", ioe);
-// // TODO Need a better way to determinate if a file is really gone but
-// // TODO without scanning all logs dir
-// if (sleepMultiplier == this.maxRetriesMultiplier) {
-// LOG.warn("Waited too long for this file, considering dumping");
-// return !processEndOfFile();
-// }
-// }
+ try {
+ LOG.info("Opening log for replication " + this.currentPath.getName() +
+ " at " + this.position);
+ try {
+ this.reader = null;
+ this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
+ } catch (FileNotFoundException fnfe) {
+ if (this.queueRecovered) {
+ // We didn't find the log in the archive directory, look if it still
+ // exists in the dead RS folder (there could be a chain of failures
+ // to look at)
+ for (int i = this.deadRegionServers.length - 1; i > 0; i--) {
+ Path deadRsDirectory =
+ new Path(this.manager.getLogDir(), this.deadRegionServers[i]);
+ Path possibleLogLocation =
+ new Path(deadRsDirectory, currentPath.getName());
+ if (this.manager.getFs().exists(possibleLogLocation)) {
+ // We found the right new location
+ LOG.info("Log " + this.currentPath + " still exists at " +
+ possibleLogLocation);
+ // Breaking here will make us sleep since reader is null
+ break;
+ }
+ }
+ // TODO What happens if the log was missing from every single location?
+ // Although we need to check a couple of times as the log could have
+ // been moved by the master between the checks
+ } else {
+ // If the log was archived, continue reading from there
+ Path archivedLogLocation =
+ new Path(manager.getOldLogDir(), currentPath.getName());
+ if (this.manager.getFs().exists(archivedLogLocation)) {
+ currentPath = archivedLogLocation;
+ LOG.info("Log " + this.currentPath + " was moved to " +
+ archivedLogLocation);
+ // Open the log at the new location
+ this.openReader(sleepMultiplier);
+
+ }
+ // TODO What happens the log is missing in both places?
+ }
+ }
+ } catch (IOException ioe) {
+ LOG.warn(peerClusterZnode + " Got: ", ioe);
+ // TODO Need a better way to determinate if a file is really gone but
+ // TODO without scanning all logs dir
+ if (sleepMultiplier == this.maxRetriesMultiplier) {
+ LOG.warn("Waited too long for this file, considering dumping");
+ return !processEndOfFile();
+ }
+ }
return true;
}
@@ -518,47 +517,46 @@ public class ReplicationSource extends T
* Do the shipping logic
*/
protected void shipEdits() {
- // REENABLE
-// int sleepMultiplier = 1;
-// while (!stop.get()) {
-// try {
-// HRegionInterface rrs = getRS();
-// LOG.debug("Replicating " + currentNbEntries);
-// rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
-// this.manager.logPositionAndCleanOldLogs(this.currentPath,
-// this.peerClusterZnode, this.position, queueRecovered);
-// this.totalReplicatedEdits += currentNbEntries;
-// this.metrics.shippedBatchesRate.inc(1);
-// this.metrics.shippedOpsRate.inc(
-// this.currentNbOperations);
-// this.metrics.setAgeOfLastShippedOp(
-// this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime());
-// LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
-// break;
-//
-// } catch (IOException ioe) {
-// LOG.warn("Unable to replicate because ", ioe);
-// try {
-// boolean down;
-// do {
-// down = isSlaveDown();
-// if (down) {
-// LOG.debug("The region server we tried to ping didn't answer, " +
-// "sleeping " + sleepForRetries + " times " + sleepMultiplier);
-// Thread.sleep(this.sleepForRetries * sleepMultiplier);
-// if (sleepMultiplier < maxRetriesMultiplier) {
-// sleepMultiplier++;
-// } else {
-// chooseSinks();
-// }
-// }
-// } while (!stop.get() && down);
-// } catch (InterruptedException e) {
-// LOG.debug("Interrupted while trying to contact the peer cluster");
-// }
-//
-// }
-// }
+ int sleepMultiplier = 1;
+ while (!this.stopper.isStopped()) {
+ try {
+ HRegionInterface rrs = getRS();
+ LOG.debug("Replicating " + currentNbEntries);
+ rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
+ this.manager.logPositionAndCleanOldLogs(this.currentPath,
+ this.peerClusterZnode, this.position, queueRecovered);
+ this.totalReplicatedEdits += currentNbEntries;
+ this.metrics.shippedBatchesRate.inc(1);
+ this.metrics.shippedOpsRate.inc(
+ this.currentNbOperations);
+ this.metrics.setAgeOfLastShippedOp(
+ this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime());
+ LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
+ break;
+
+ } catch (IOException ioe) {
+ LOG.warn("Unable to replicate because ", ioe);
+ try {
+ boolean down;
+ do {
+ down = isSlaveDown();
+ if (down) {
+ LOG.debug("The region server we tried to ping didn't answer, " +
+ "sleeping " + sleepForRetries + " times " + sleepMultiplier);
+ Thread.sleep(this.sleepForRetries * sleepMultiplier);
+ if (sleepMultiplier < maxRetriesMultiplier) {
+ sleepMultiplier++;
+ } else {
+ chooseSinks();
+ }
+ }
+ } while (!this.stopper.isStopped() && down);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while trying to contact the peer cluster");
+ }
+
+ }
+ }
}
/**
@@ -569,16 +567,15 @@ public class ReplicationSource extends T
* continue trying to read from it
*/
protected boolean processEndOfFile() {
- // REENABLE
-// if (this.queue.size() != 0) {
-// this.currentPath = null;
-// this.position = 0;
-// return true;
-// } else if (this.queueRecovered) {
-// this.manager.closeRecoveredQueue(this);
-// this.abort();
-// return true;
-// }
+ if (this.queue.size() != 0) {
+ this.currentPath = null;
+ this.position = 0;
+ return true;
+ } else if (this.queueRecovered) {
+ this.manager.closeRecoveredQueue(this);
+ this.abort();
+ return true;
+ }
return false;
}
@@ -692,5 +689,4 @@ public class ReplicationSource extends T
return Long.parseLong(parts[parts.length-1]);
}
}
-
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Mon Aug 30 22:50:03 2010
@@ -20,29 +20,25 @@
package org.apache.hadoop.hbase.replication.regionserver;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* This class is responsible to manage all the replication
* sources. There are two classes of sources:
@@ -149,7 +145,7 @@ public class ReplicationSourceManager {
ReplicationSourceInterface src = addSource(id);
src.startup();
}
- List<String> currentReplicators = this.zkHelper.getListOfReplicators(null);
+ List<String> currentReplicators = this.zkHelper.getListOfReplicators();
synchronized (otherRegionServers) {
LOG.info("Current list of replicators: " + currentReplicators
+ " other RSs: " + otherRegionServers);
@@ -201,7 +197,7 @@ public class ReplicationSourceManager {
* @return a sorted set of hlog names
*/
protected SortedSet<String> getHLogs() {
- return new TreeSet(this.hlogs);
+ return new TreeSet<String>(this.hlogs);
}
/**
@@ -255,6 +251,7 @@ public class ReplicationSourceManager {
final String peerClusterId) throws IOException {
ReplicationSourceInterface src;
try {
+ @SuppressWarnings("rawtypes")
Class c = Class.forName(conf.get("replication.replicationsource.implementation",
ReplicationSource.class.getCanonicalName()));
src = (ReplicationSourceInterface) c.newInstance();
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Mon Aug 30 22:50:03 2010
@@ -264,7 +264,8 @@ public class TestFromClientSide {
* @throws InterruptedException
*/
@Test
- public void testFilterAcrossMutlipleRegions() throws IOException, InterruptedException {
+ public void testFilterAcrossMultipleRegions()
+ throws IOException, InterruptedException {
byte [] name = Bytes.toBytes("testFilterAcrossMutlipleRegions");
HTable t = TEST_UTIL.createTable(name, FAMILY);
int rowCount = TEST_UTIL.loadTable(t, FAMILY);
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java Mon Aug 30 22:50:03 2010
@@ -67,7 +67,7 @@ public class TestSplitTransaction {
this.fs.delete(this.testdir, true);
this.wal = new HLog(fs, new Path(this.testdir, "logs"),
new Path(this.testdir, "archive"),
- TEST_UTIL.getConfiguration(), null);
+ TEST_UTIL.getConfiguration());
this.parent = createRegion(this.testdir, this.wal);
}
@@ -174,7 +174,7 @@ public class TestSplitTransaction {
// Start transaction.
SplitTransaction st = prepareGOOD_SPLIT_ROW();
SplitTransaction spiedUponSt = spy(st);
- when(spiedUponSt.createDaughterRegion(spiedUponSt.getSecondDaughter())).
+ when(spiedUponSt.createDaughterRegion(spiedUponSt.getSecondDaughter(), null)).
thenThrow(new MockedFailedDaughterCreation());
// Run the execute. Look at what it returns.
boolean expectedException = false;
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Mon Aug 30 22:50:03 2010
@@ -123,7 +123,7 @@ public class TestStore extends TestCase
HTableDescriptor htd = new HTableDescriptor(table);
htd.addFamily(hcd);
HRegionInfo info = new HRegionInfo(htd, null, null, false);
- HLog hlog = new HLog(fs, logdir, oldLogDir, conf, null);
+ HLog hlog = new HLog(fs, logdir, oldLogDir, conf);
HRegion region = new HRegion(basedir, hlog, fs, conf, info, null);
store = new Store(basedir, region, hcd, fs, conf);
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java Mon Aug 30 22:50:03 2010
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.replicat
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
@@ -39,7 +40,7 @@ public class ReplicationSourceDummy impl
@Override
public void init(Configuration conf, FileSystem fs,
- ReplicationSourceManager manager, AtomicBoolean stopper,
+ ReplicationSourceManager manager, Stoppable stopper,
AtomicBoolean replicating, String peerClusterId)
throws IOException {
this.manager = manager;
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=991014&r1=991013&r2=991014&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Mon Aug 30 22:50:03 2010
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
@@ -46,6 +47,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
@@ -59,8 +62,6 @@ public class TestReplicationSourceManage
private static HBaseTestingUtility utility;
- private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
-
private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
private static ReplicationSourceManager manager;
@@ -97,8 +98,8 @@ public class TestReplicationSourceManage
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
utility = new HBaseTestingUtility(conf);
utility.startMiniZKCluster();
-// REENALBE
-//
+
+ // REENABLE
// zkw = ZooKeeperWrapper.createInstance(conf, "test");
// zkw.writeZNode("/hbase", "replication", "");
// zkw.writeZNode("/hbase/replication", "master",
@@ -107,20 +108,18 @@ public class TestReplicationSourceManage
// zkw.writeZNode("/hbase/replication/peers", "1",
// conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
// conf.get("hbase.zookeeper.property.clientPort")+":/1");
-//
-// HRegionServer server = new HRegionServer(conf);
-// ReplicationZookeeperWrapper helper = new ReplicationZookeeperWrapper(
-// server.getZooKeeperWrapper(), conf,
-// REPLICATING, "123456789");
-// fs = FileSystem.get(conf);
-// oldLogDir = new Path(utility.getTestDir(),
-// HConstants.HREGION_OLDLOGDIR_NAME);
-// logDir = new Path(utility.getTestDir(),
-// HConstants.HREGION_LOGDIR_NAME);
-//
-// manager = new ReplicationSourceManager(helper,
-// conf, STOPPER, fs, REPLICATING, logDir, oldLogDir);
-// manager.addSource("1");
+
+ HRegionServer server = new HRegionServer(conf);
+ ReplicationZookeeper helper = new ReplicationZookeeper(server, REPLICATING);
+ fs = FileSystem.get(conf);
+ oldLogDir = new Path(utility.getTestDir(),
+ HConstants.HREGION_OLDLOGDIR_NAME);
+ logDir = new Path(utility.getTestDir(),
+ HConstants.HREGION_LOGDIR_NAME);
+
+ manager = new ReplicationSourceManager(helper,
+ conf, server, fs, REPLICATING, logDir, oldLogDir);
+ manager.addSource("1");
htd = new HTableDescriptor(test);
HColumnDescriptor col = new HColumnDescriptor("f1");
@@ -160,11 +159,12 @@ public class TestReplicationSourceManage
KeyValue kv = new KeyValue(r1, f1, r1);
WALEdit edit = new WALEdit();
edit.add(kv);
-
- HLog hlog = new HLog(fs, logDir, oldLogDir, conf, null, manager,
+ List<WALObserver> listeners = new ArrayList<WALObserver>();
+// REENABLE listeners.add(manager);
+ HLog hlog = new HLog(fs, logDir, oldLogDir, conf, listeners,
URLEncoder.encode("regionserver:60020", "UTF8"));
- // REENABLE manager.init();
+ manager.init();
// Testing normal log rolling every 20
for(long i = 1; i < 101; i++) {
@@ -190,7 +190,7 @@ public class TestReplicationSourceManage
hlog.append(hri, key, edit);
}
- // REENABLE assertEquals(6, manager.getHLogs().size());
+ assertEquals(6, manager.getHLogs().size());
hlog.rollWriter();