You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/27 22:53:16 UTC
svn commit: r990266 [2/2] - in /hbase/branches/0.90_master_rewrite/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/wal/ mai...
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Fri Aug 27 20:53:15 2010
@@ -19,82 +19,84 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.LogEntryVisitor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
import org.apache.hadoop.hbase.util.Bytes;
-// REENABLE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-
-import java.io.IOException;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Replication serves as an umbrella over the setup of replication and
* is used by HRS.
*/
-public class Replication implements LogEntryVisitor {
-
+public class Replication implements WALObserver {
private final boolean replication;
-// REENALBE private final ReplicationSourceManager replicationManager;
+ private final ReplicationSourceManager replicationManager;
private boolean replicationMaster;
private final AtomicBoolean replicating = new AtomicBoolean(true);
-// REENALBE private final ReplicationZookeeperWrapper zkHelper;
+ private final ReplicationZookeeperWrapper zkHelper;
private final Configuration conf;
private ReplicationSink replicationSink;
+ // Hosting server
private final Server server;
/**
* Instantiate the replication management (if rep is enabled).
- * @param conf conf to use
- * @param hsi the info if this region server
+ * @param server Hosting server
* @param fs handle to the filesystem
+ * @param logDir
* @param oldLogDir directory where logs are archived
- * @param stopper This is set when we are to shut down.
* @throws IOException
*/
- public Replication(final Server server, FileSystem fs, Path logDir,
- Path oldLogDir) throws IOException {
+ public Replication(final Server server, final FileSystem fs,
+ final Path logDir, final Path oldLogDir)
+ throws IOException {
this.server = server;
this.conf = this.server.getConfiguration();
- this.replication =
- conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
+ this.replication = isReplication(this.conf);
if (replication) {
- // REENALBE
-// this.zkHelper = new ReplicationZookeeperWrapper(
-// ZooKeeperWrapper.getInstance(conf, hsi.getServerName()), conf,
-// this.replicating, hsi.getServerName());
-// this.replicationMaster = zkHelper.isReplicationMaster();
-// this.replicationManager = this.replicationMaster ?
-// new ReplicationSourceManager(zkHelper, conf, stopRequested,
-// fs, this.replicating, logDir, oldLogDir) : null;
+ this.zkHelper = new ReplicationZookeeperWrapper(server.getZooKeeper(),
+ this.conf, this.replicating, this.server.getServerName());
+ this.replicationMaster = zkHelper.isReplicationMaster();
+ this.replicationManager = this.replicationMaster ?
+ new ReplicationSourceManager(zkHelper, conf, this.server,
+ fs, this.replicating, logDir, oldLogDir) : null;
} else {
- // REENABLE replicationManager = null;
- // REENALBE zkHelper = null;
+ this.replicationManager = null;
+ this.zkHelper = null;
}
}
/**
+ * @param c Configuration to look at
+ * @return True if replication is enabled.
+ */
+ public static boolean isReplication(final Configuration c) {
+ return c.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
+ }
+
+ /**
* Join with the replication threads
*/
public void join() {
if (this.replication) {
if (this.replicationMaster) {
-// REENABLE this.replicationManager.join();
+ this.replicationManager.join();
}
-// REENABLE this.zkHelper.deleteOwnRSZNode();
+ this.zkHelper.deleteOwnRSZNode();
}
}
@@ -117,10 +119,9 @@ public class Replication implements LogE
public void startReplicationServices() throws IOException {
if (this.replication) {
if (this.replicationMaster) {
- // REENALBE this.replicationManager.init();
+ this.replicationManager.init();
} else {
- this.replicationSink =
- new ReplicationSink(this.conf, this.server);
+ this.replicationSink = new ReplicationSink(this.conf, this.server);
}
}
}
@@ -130,12 +131,12 @@ public class Replication implements LogE
* @return the manager if replication is enabled, else returns false
*/
public ReplicationSourceManager getReplicationManager() {
- return null; // REENALBE replicationManager;
+ return this.replicationManager;
}
@Override
public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
- WALEdit logEdit) {
+ WALEdit logEdit) {
NavigableMap<byte[], Integer> scopes =
new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
byte[] family;
@@ -152,13 +153,13 @@ public class Replication implements LogE
}
}
- /**
- * Add this class as a log entry visitor for HLog if replication is enabled
- * @param hlog log that was add ourselves on
- */
- public void addLogEntryVisitor(HLog hlog) {
- if (replication) {
- hlog.addLogEntryVisitor(this);
- }
+ @Override
+ public void logRolled(Path p) {
+ getReplicationManager().logRolled(p);
+ }
+
+ @Override
+ public void logRollRequested() {
+ // Not interested
}
}
\ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java Fri Aug 27 20:53:15 2010
@@ -19,13 +19,13 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hbase.Stoppable;
/**
* Interface that defines a replication source
@@ -45,7 +45,7 @@ public interface ReplicationSourceInterf
public void init(final Configuration conf,
final FileSystem fs,
final ReplicationSourceManager manager,
- final AtomicBoolean stopper,
+ final Stoppable stopper,
final AtomicBoolean replicating,
final String peerClusterId) throws IOException;
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Fri Aug 27 20:53:15 2010
@@ -25,7 +25,11 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.regionserver.wal.LogActionsListener;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -50,334 +54,324 @@ import java.util.concurrent.atomic.Atomi
* tries to grab a lock in order to transfer all the queues in a local
* old source.
*/
-public class ReplicationSourceManager implements LogActionsListener {
+public class ReplicationSourceManager {
+ private static final Log LOG =
+ LogFactory.getLog(ReplicationSourceManager.class);
+ // List of all the sources that read this RS's logs
+ private final List<ReplicationSourceInterface> sources;
+ // List of all the sources we got from died RSs
+ private final List<ReplicationSourceInterface> oldsources;
+ // Indicates if we are currently replicating
+ private final AtomicBoolean replicating;
+ // Helper for zookeeper
+ private final ReplicationZookeeperWrapper zkHelper;
+ // All about stopping
+ private final Stoppable stopper;
+ // All logs we are currently trackign
+ private final SortedSet<String> hlogs;
+ private final Configuration conf;
+ private final FileSystem fs;
+ // The path to the latest log we saw, for new coming sources
+ private Path latestPath;
+ // List of all the other region servers in this cluster
+ private final List<String> otherRegionServers;
+ // Path to the hlogs directories
+ private final Path logDir;
+ // Path to the hlog archive
+ private final Path oldLogDir;
- @Override
- public void logRolled(Path newFile) {
- // TODO Auto-generated method stub
-
- }
- // REENABLE
-//
-// private static final Log LOG =
-// LogFactory.getLog(ReplicationSourceManager.class);
-// // List of all the sources that read this RS's logs
-// private final List<ReplicationSourceInterface> sources;
-// // List of all the sources we got from died RSs
-// private final List<ReplicationSourceInterface> oldsources;
-// // Indicates if we are currently replicating
-// private final AtomicBoolean replicating;
-// // Helper for zookeeper
-// private final ReplicationZookeeperWrapper zkHelper;
-// // Indicates if the region server is closing
-// private final AtomicBoolean stopper;
-// // All logs we are currently trackign
-// private final SortedSet<String> hlogs;
-// private final Configuration conf;
-// private final FileSystem fs;
-// // The path to the latest log we saw, for new coming sources
-// private Path latestPath;
-// // List of all the other region servers in this cluster
-// private final List<String> otherRegionServers;
-// // Path to the hlogs directories
-// private final Path logDir;
-// // Path to the hlog archive
-// private final Path oldLogDir;
-//
-// /**
-// * Creates a replication manager and sets the watch on all the other
-// * registered region servers
-// * @param zkHelper the zk helper for replication
-// * @param conf the configuration to use
-// * @param stopper the stopper object for this region server
-// * @param fs the file system to use
-// * @param replicating the status of the replication on this cluster
-// * @param logDir the directory that contains all hlog directories of live RSs
-// * @param oldLogDir the directory where old logs are archived
-// */
-// public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper,
-// final Configuration conf,
-// final AtomicBoolean stopper,
-// final FileSystem fs,
-// final AtomicBoolean replicating,
-// final Path logDir,
-// final Path oldLogDir) {
-// this.sources = new ArrayList<ReplicationSourceInterface>();
-// this.replicating = replicating;
-// this.zkHelper = zkHelper;
-// this.stopper = stopper;
-// this.hlogs = new TreeSet<String>();
-// this.oldsources = new ArrayList<ReplicationSourceInterface>();
-// this.conf = conf;
-// this.fs = fs;
-// this.logDir = logDir;
-// this.oldLogDir = oldLogDir;
-// List<String> otherRSs =
-// this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
-// this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
-// }
-//
-// /**
-// * Provide the id of the peer and a log key and this method will figure which
-// * hlog it belongs to and will log, for this region server, the current
-// * position. It will also clean old logs from the queue.
-// * @param log Path to the log currently being replicated from
-// * replication status in zookeeper. It will also delete older entries.
-// * @param id id of the peer cluster
-// * @param position current location in the log
-// * @param queueRecovered indicates if this queue comes from another region server
-// */
-// public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
-// String key = log.getName();
-// LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
-// this.zkHelper.writeReplicationStatus(key.toString(), id, position);
-// synchronized (this.hlogs) {
-// if (!queueRecovered && this.hlogs.first() != key) {
-// SortedSet<String> hlogSet = this.hlogs.headSet(key);
-// LOG.info("Removing " + hlogSet.size() +
-// " logs in the list: " + hlogSet);
-// for (String hlog : hlogSet) {
-// this.zkHelper.removeLogFromList(hlog.toString(), id);
-// }
-// hlogSet.clear();
-// }
-// }
-// }
-//
-// /**
-// * Adds a normal source per registered peer cluster and tries to process all
-// * old region server hlog queues
-// */
-// public void init() throws IOException {
-// for (String id : this.zkHelper.getPeerClusters().keySet()) {
-// ReplicationSourceInterface src = addSource(id);
-// src.startup();
-// }
-// List<String> currentReplicators = this.zkHelper.getListOfReplicators(null);
-// synchronized (otherRegionServers) {
-// LOG.info("Current list of replicators: " + currentReplicators
-// + " other RSs: " + otherRegionServers);
-// }
-// // Look if there's anything to process after a restart
-// for (String rs : currentReplicators) {
-// synchronized (otherRegionServers) {
-// if (!this.otherRegionServers.contains(rs)) {
-// transferQueues(rs);
-// }
-// }
-// }
-// }
-//
-// /**
-// * Add a new normal source to this region server
-// * @param id the id of the peer cluster
-// * @return the created source
-// * @throws IOException
-// */
-// public ReplicationSourceInterface addSource(String id) throws IOException {
-// ReplicationSourceInterface src =
-// getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
-// this.sources.add(src);
-// synchronized (this.hlogs) {
-// if (this.hlogs.size() > 0) {
-// this.zkHelper.addLogToList(this.hlogs.first(),
-// this.sources.get(0).getPeerClusterZnode());
-// src.enqueueLog(this.latestPath);
-// }
-// }
-// return src;
-// }
-//
-// /**
-// * Terminate the replication on this region server
-// */
-// public void join() {
-// if (this.sources.size() == 0) {
-// this.zkHelper.deleteOwnRSZNode();
-// }
-// for (ReplicationSourceInterface source : this.sources) {
-// source.terminate();
-// }
-// }
-//
-// /**
-// * Get a copy of the hlogs of the first source on this rs
-// * @return a sorted set of hlog names
-// */
-// protected SortedSet<String> getHLogs() {
-// return new TreeSet(this.hlogs);
-// }
-//
-// /**
-// * Get a list of all the normal sources of this rs
-// * @return lis of all sources
-// */
-// public List<ReplicationSourceInterface> getSources() {
-// return this.sources;
-// }
-//
-// @Override
-// public void logRolled(Path newLog) {
-// if (this.sources.size() > 0) {
-// this.zkHelper.addLogToList(newLog.getName(),
-// this.sources.get(0).getPeerClusterZnode());
-// }
-// synchronized (this.hlogs) {
-// this.hlogs.add(newLog.getName());
-// }
-// this.latestPath = newLog;
-// // This only update the sources we own, not the recovered ones
-// for (ReplicationSourceInterface source : this.sources) {
-// source.enqueueLog(newLog);
-// }
-// }
-//
-// /**
-// * Get the ZK help of this manager
-// * @return the helper
-// */
-// public ReplicationZookeeperWrapper getRepZkWrapper() {
-// return zkHelper;
-// }
-//
-// /**
-// * Factory method to create a replication source
-// * @param conf the configuration to use
-// * @param fs the file system to use
-// * @param manager the manager to use
-// * @param stopper the stopper object for this region server
-// * @param replicating the status of the replication on this cluster
-// * @param peerClusterId the id of the peer cluster
-// * @return the created source
-// * @throws IOException
-// */
-// public ReplicationSourceInterface getReplicationSource(
-// final Configuration conf,
-// final FileSystem fs,
-// final ReplicationSourceManager manager,
-// final AtomicBoolean stopper,
-// final AtomicBoolean replicating,
-// final String peerClusterId) throws IOException {
-// ReplicationSourceInterface src;
-// try {
-// Class c = Class.forName(conf.get("replication.replicationsource.implementation",
-// ReplicationSource.class.getCanonicalName()));
-// src = (ReplicationSourceInterface) c.newInstance();
-// } catch (Exception e) {
-// LOG.warn("Passed replication source implemention throws errors, " +
-// "defaulting to ReplicationSource", e);
-// src = new ReplicationSource();
-//
-// }
-// src.init(conf, fs, manager, stopper, replicating, peerClusterId);
-// return src;
-// }
-//
-// /**
-// * Transfer all the queues of the specified to this region server.
-// * First it tries to grab a lock and if it works it will move the
-// * znodes and finally will delete the old znodes.
-// *
-// * It creates one old source for any type of source of the old rs.
-// * @param rsZnode
-// */
-// public void transferQueues(String rsZnode) {
-// // We try to lock that rs' queue directory
-// if (this.stopper.get()) {
-// LOG.info("Not transferring queue since we are shutting down");
-// return;
-// }
-// if (!this.zkHelper.lockOtherRS(rsZnode)) {
-// return;
-// }
-// LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
-// SortedMap<String, SortedSet<String>> newQueues =
-// this.zkHelper.copyQueuesFromRS(rsZnode);
-// if (newQueues == null || newQueues.size() == 0) {
-// return;
-// }
-// this.zkHelper.deleteRsQueues(rsZnode);
-//
-// for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
-// String peerId = entry.getKey();
-// try {
-// ReplicationSourceInterface src = getReplicationSource(this.conf,
-// this.fs, this, this.stopper, this.replicating, peerId);
-// this.oldsources.add(src);
-// for (String hlog : entry.getValue()) {
-// src.enqueueLog(new Path(this.oldLogDir, hlog));
-// }
-// src.startup();
-// } catch (IOException e) {
-// // TODO manage it
-// LOG.error("Failed creating a source", e);
-// }
-// }
-// }
-//
-// /**
-// * Clear the references to the specified old source
-// * @param src source to clear
-// */
-// public void closeRecoveredQueue(ReplicationSourceInterface src) {
-// LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
-// this.oldsources.remove(src);
-// this.zkHelper.deleteSource(src.getPeerClusterZnode());
-// }
-//
-// /**
-// * Watcher used to be notified of the other region server's death
-// * in the local cluster. It initiates the process to transfer the queues
-// * if it is able to grab the lock.
-// */
-// public class OtherRegionServerWatcher implements Watcher {
-// @Override
-// public void process(WatchedEvent watchedEvent) {
-// LOG.info(" event " + watchedEvent);
-// if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
-// watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
-// return;
-// }
-//
-// List<String> newRsList = (zkHelper.getRegisteredRegionServers(this));
-// if (newRsList == null) {
-// return;
-// } else {
-// synchronized (otherRegionServers) {
-// otherRegionServers.clear();
-// otherRegionServers.addAll(newRsList);
-// }
-// }
-// if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
-// LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
-// String[] rsZnodeParts = watchedEvent.getPath().split("/");
-// transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
-// }
-// }
-// }
-//
-// /**
-// * Get the directory where hlogs are archived
-// * @return the directory where hlogs are archived
-// */
-// public Path getOldLogDir() {
-// return this.oldLogDir;
-// }
-//
-// /**
-// * Get the directory where hlogs are stored by their RSs
-// * @return the directory where hlogs are stored by their RSs
-// */
-// public Path getLogDir() {
-// return this.logDir;
-// }
-//
-// /**
-// * Get the handle on the local file system
-// * @returnthe handle on the local file system
-// */
-// public FileSystem getFs() {
-// return this.fs;
-// }
+ /**
+ * Creates a replication manager and sets the watch on all the other
+ * registered region servers
+ * @param zkHelper the zk helper for replication
+ * @param conf the configuration to use
+ * @param stopper the stopper object for this region server
+ * @param fs the file system to use
+ * @param replicating the status of the replication on this cluster
+ * @param logDir the directory that contains all hlog directories of live RSs
+ * @param oldLogDir the directory where old logs are archived
+ */
+ public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper,
+ final Configuration conf,
+ final Stoppable stopper,
+ final FileSystem fs,
+ final AtomicBoolean replicating,
+ final Path logDir,
+ final Path oldLogDir) {
+ this.sources = new ArrayList<ReplicationSourceInterface>();
+ this.replicating = replicating;
+ this.zkHelper = zkHelper;
+ this.stopper = stopper;
+ this.hlogs = new TreeSet<String>();
+ this.oldsources = new ArrayList<ReplicationSourceInterface>();
+ this.conf = conf;
+ this.fs = fs;
+ this.logDir = logDir;
+ this.oldLogDir = oldLogDir;
+ List<String> otherRSs =
+ this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
+ this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
+ }
+
+ /**
+ * Provide the id of the peer and a log key and this method will figure which
+ * hlog it belongs to and will log, for this region server, the current
+ * position. It will also clean old logs from the queue.
+ * @param log Path to the log currently being replicated from
+ * replication status in zookeeper. It will also delete older entries.
+ * @param id id of the peer cluster
+ * @param position current location in the log
+ * @param queueRecovered indicates if this queue comes from another region server
+ */
+ public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
+ String key = log.getName();
+ LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
+ this.zkHelper.writeReplicationStatus(key.toString(), id, position);
+ synchronized (this.hlogs) {
+ if (!queueRecovered && this.hlogs.first() != key) {
+ SortedSet<String> hlogSet = this.hlogs.headSet(key);
+ LOG.info("Removing " + hlogSet.size() +
+ " logs in the list: " + hlogSet);
+ for (String hlog : hlogSet) {
+ this.zkHelper.removeLogFromList(hlog.toString(), id);
+ }
+ hlogSet.clear();
+ }
+ }
+ }
+
+ /**
+ * Adds a normal source per registered peer cluster and tries to process all
+ * old region server hlog queues
+ */
+ public void init() throws IOException {
+ for (String id : this.zkHelper.getPeerClusters().keySet()) {
+ ReplicationSourceInterface src = addSource(id);
+ src.startup();
+ }
+ List<String> currentReplicators = this.zkHelper.getListOfReplicators(null);
+ synchronized (otherRegionServers) {
+ LOG.info("Current list of replicators: " + currentReplicators
+ + " other RSs: " + otherRegionServers);
+ }
+ // Look if there's anything to process after a restart
+ for (String rs : currentReplicators) {
+ synchronized (otherRegionServers) {
+ if (!this.otherRegionServers.contains(rs)) {
+ transferQueues(rs);
+ }
+ }
+ }
+ }
+
+ /**
+ * Add a new normal source to this region server
+ * @param id the id of the peer cluster
+ * @return the created source
+ * @throws IOException
+ */
+ public ReplicationSourceInterface addSource(String id) throws IOException {
+ ReplicationSourceInterface src =
+ getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
+ this.sources.add(src);
+ synchronized (this.hlogs) {
+ if (this.hlogs.size() > 0) {
+ this.zkHelper.addLogToList(this.hlogs.first(),
+ this.sources.get(0).getPeerClusterZnode());
+ src.enqueueLog(this.latestPath);
+ }
+ }
+ return src;
+ }
+
+ /**
+ * Terminate the replication on this region server
+ */
+ public void join() {
+ if (this.sources.size() == 0) {
+ this.zkHelper.deleteOwnRSZNode();
+ }
+ for (ReplicationSourceInterface source : this.sources) {
+ source.terminate();
+ }
+ }
+
+ /**
+ * Get a copy of the hlogs of the first source on this rs
+ * @return a sorted set of hlog names
+ */
+ protected SortedSet<String> getHLogs() {
+ return new TreeSet(this.hlogs);
+ }
+
+ /**
+ * Get a list of all the normal sources of this rs
+ * @return lis of all sources
+ */
+ public List<ReplicationSourceInterface> getSources() {
+ return this.sources;
+ }
+
+ void logRolled(Path newLog) {
+ if (this.sources.size() > 0) {
+ this.zkHelper.addLogToList(newLog.getName(),
+ this.sources.get(0).getPeerClusterZnode());
+ }
+ synchronized (this.hlogs) {
+ this.hlogs.add(newLog.getName());
+ }
+ this.latestPath = newLog;
+ // This only update the sources we own, not the recovered ones
+ for (ReplicationSourceInterface source : this.sources) {
+ source.enqueueLog(newLog);
+ }
+ }
+
+ /**
+ * Get the ZK help of this manager
+ * @return the helper
+ */
+ public ReplicationZookeeperWrapper getRepZkWrapper() {
+ return zkHelper;
+ }
+ /**
+ * Factory method to create a replication source
+ * @param conf the configuration to use
+ * @param fs the file system to use
+ * @param manager the manager to use
+ * @param stopper the stopper object for this region server
+ * @param replicating the status of the replication on this cluster
+ * @param peerClusterId the id of the peer cluster
+ * @return the created source
+ * @throws IOException
+ */
+ public ReplicationSourceInterface getReplicationSource(
+ final Configuration conf,
+ final FileSystem fs,
+ final ReplicationSourceManager manager,
+ final Stoppable stopper,
+ final AtomicBoolean replicating,
+ final String peerClusterId) throws IOException {
+ ReplicationSourceInterface src;
+ try {
+ Class c = Class.forName(conf.get("replication.replicationsource.implementation",
+ ReplicationSource.class.getCanonicalName()));
+ src = (ReplicationSourceInterface) c.newInstance();
+ } catch (Exception e) {
+ LOG.warn("Passed replication source implemention throws errors, " +
+ "defaulting to ReplicationSource", e);
+ src = new ReplicationSource();
+
+ }
+ src.init(conf, fs, manager, stopper, replicating, peerClusterId);
+ return src;
+ }
+
+ /**
+ * Transfer all the queues of the specified to this region server.
+ * First it tries to grab a lock and if it works it will move the
+ * znodes and finally will delete the old znodes.
+ *
+ * It creates one old source for any type of source of the old rs.
+ * @param rsZnode
+ */
+ public void transferQueues(String rsZnode) {
+ // We try to lock that rs' queue directory
+ if (this.stopper.isStopped()) {
+ LOG.info("Not transferring queue since we are shutting down");
+ return;
+ }
+ if (!this.zkHelper.lockOtherRS(rsZnode)) {
+ return;
+ }
+ LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
+ SortedMap<String, SortedSet<String>> newQueues =
+ this.zkHelper.copyQueuesFromRS(rsZnode);
+ if (newQueues == null || newQueues.size() == 0) {
+ return;
+ }
+ this.zkHelper.deleteRsQueues(rsZnode);
+
+ for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
+ String peerId = entry.getKey();
+ try {
+ ReplicationSourceInterface src = getReplicationSource(this.conf,
+ this.fs, this, this.stopper, this.replicating, peerId);
+ this.oldsources.add(src);
+ for (String hlog : entry.getValue()) {
+ src.enqueueLog(new Path(this.oldLogDir, hlog));
+ }
+ src.startup();
+ } catch (IOException e) {
+ // TODO manage it
+ LOG.error("Failed creating a source", e);
+ }
+ }
+ }
+
+ /**
+ * Clear the references to the specified old source
+ * @param src source to clear
+ */
+ public void closeRecoveredQueue(ReplicationSourceInterface src) {
+ LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
+ this.oldsources.remove(src);
+ this.zkHelper.deleteSource(src.getPeerClusterZnode());
+ }
+
+ /**
+ * Watcher used to be notified of the other region server's death
+ * in the local cluster. It initiates the process to transfer the queues
+ * if it is able to grab the lock.
+ */
+ public class OtherRegionServerWatcher implements Watcher {
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ LOG.info(" event " + watchedEvent);
+ if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
+ watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
+ return;
+ }
+
+ List<String> newRsList = (zkHelper.getRegisteredRegionServers(this));
+ if (newRsList == null) {
+ return;
+ } else {
+ synchronized (otherRegionServers) {
+ otherRegionServers.clear();
+ otherRegionServers.addAll(newRsList);
+ }
+ }
+ if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
+ LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
+ String[] rsZnodeParts = watchedEvent.getPath().split("/");
+ transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
+ }
+ }
+ }
+
+ /**
+ * Get the directory where hlogs are archived
+ * @return the directory where hlogs are archived
+ */
+ public Path getOldLogDir() {
+ return this.oldLogDir;
+ }
+
+ /**
+ * Get the directory where hlogs are stored by their RSs
+ * @return the directory where hlogs are stored by their RSs
+ */
+ public Path getLogDir() {
+ return this.logDir;
+ }
+
+ /**
+ * Get the handle on the local file system
+ * @returnthe handle on the local file system
+ */
+ public FileSystem getFs() {
+ return this.fs;
+ }
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java Fri Aug 27 20:53:15 2010
@@ -149,8 +149,7 @@ class HMerge {
Path logdir = new Path(tabledir, "merge_" + System.currentTimeMillis() +
HConstants.HREGION_LOGDIR_NAME);
Path oldLogDir = new Path(tabledir, HConstants.HREGION_OLDLOGDIR_NAME);
- this.hlog =
- new HLog(fs, logdir, oldLogDir, conf, null);
+ this.hlog = new HLog(fs, logdir, oldLogDir, conf);
}
void process() throws IOException {
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java Fri Aug 27 20:53:15 2010
@@ -103,7 +103,7 @@ public class MetaUtils {
HConstants.HREGION_LOGDIR_NAME + "_" + System.currentTimeMillis());
Path oldLogDir = new Path(this.fs.getHomeDirectory(),
HConstants.HREGION_OLDLOGDIR_NAME);
- this.log = new HLog(this.fs, logdir, oldLogDir, this.conf, null);
+ this.log = new HLog(this.fs, logdir, oldLogDir, this.conf);
}
return this.log;
}
Modified: hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp (original)
+++ hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp Fri Aug 27 20:53:15 2010
@@ -50,7 +50,7 @@
<table>
<tr><th>Region Name</th><th>Start Key</th><th>End Key</th><th>Metrics</th></tr>
<% for (HRegionInfo r: onlineRegions) {
- HServerLoad.RegionLoad load = regionServer.createRegionLoad(r.getRegionName());
+ HServerLoad.RegionLoad load = regionServer.createRegionLoad(r.getEncodedName());
%>
<tr><td><%= r.getRegionNameAsString() %></td>
<td><%= Bytes.toStringBinary(r.getStartKey()) %></td><td><%= Bytes.toStringBinary(r.getEndKey()) %></td>
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLogsCleaner.java Fri Aug 27 20:53:15 2010
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import java.net.URLEncoder;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -31,7 +30,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
import org.junit.After;
import org.junit.AfterClass;
@@ -102,7 +100,7 @@ public class TestLogsCleaner {
return this.stopped;
}
};
- LogsCleaner cleaner = new LogsCleaner(1000, stoppable,c, fs, oldLogDir);
+ LogCleaner cleaner = new LogCleaner(1000, stoppable, c, fs, oldLogDir);
// Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files
long now = System.currentTimeMillis();
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java Fri Aug 27 20:53:15 2010
@@ -29,7 +29,7 @@ public class InstrumentedSequenceFileLog
@Override
public void append(HLog.Entry entry) throws IOException {
super.append(entry);
- if (activateFailure && Bytes.equals(entry.getKey().getRegionName(), "break".getBytes())) {
+ if (activateFailure && Bytes.equals(entry.getKey().getEncodedRegionName(), "break".getBytes())) {
System.out.println(getClass().getName() + ": I will throw an exception now...");
throw(new IOException("This exception is instrumented and should only be thrown for testing"));
}
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Fri Aug 27 20:53:15 2010
@@ -133,7 +133,7 @@ public class TestHLog {
final byte [] tableName = Bytes.toBytes(getName());
final byte [] rowName = tableName;
Path logdir = new Path(dir, HConstants.HREGION_LOGDIR_NAME);
- HLog log = new HLog(fs, logdir, oldLogDir, conf, null);
+ HLog log = new HLog(fs, logdir, oldLogDir, conf);
final int howmany = 3;
HRegionInfo[] infos = new HRegionInfo[3];
for(int i = 0; i < howmany; i++) {
@@ -192,7 +192,7 @@ public class TestHLog {
out.close();
in.close();
Path subdir = new Path(dir, "hlogdir");
- HLog wal = new HLog(fs, subdir, oldLogDir, conf, null);
+ HLog wal = new HLog(fs, subdir, oldLogDir, conf);
final int total = 20;
HRegionInfo info = new HRegionInfo(new HTableDescriptor(bytes),
@@ -295,7 +295,7 @@ public class TestHLog {
HLog.Entry entry = new HLog.Entry();
while((entry = reader.next(entry)) != null) {
HLogKey key = entry.getKey();
- String region = Bytes.toString(key.getRegionName());
+ String region = Bytes.toString(key.getEncodedRegionName());
// Assert that all edits are for same region.
if (previousRegion != null) {
assertEquals(previousRegion, region);
@@ -325,7 +325,7 @@ public class TestHLog {
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
Path subdir = new Path(dir, "hlogdir");
Path archdir = new Path(dir, "hlogdir_archive");
- HLog wal = new HLog(fs, subdir, archdir, conf, null);
+ HLog wal = new HLog(fs, subdir, archdir, conf);
final int total = 20;
for (int i = 0; i < total; i++) {
@@ -429,7 +429,7 @@ public class TestHLog {
final byte [] tableName = Bytes.toBytes("tablename");
final byte [] row = Bytes.toBytes("row");
HLog.Reader reader = null;
- HLog log = new HLog(fs, dir, oldLogDir, conf, null);
+ HLog log = new HLog(fs, dir, oldLogDir, conf);
try {
// Write columns named 1, 2, 3, etc. and then values of single byte
// 1, 2, 3...
@@ -442,10 +442,9 @@ public class TestHLog {
}
HRegionInfo info = new HRegionInfo(new HTableDescriptor(tableName),
row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
- final byte [] regionName = info.getRegionName();
log.append(info, tableName, cols, System.currentTimeMillis());
long logSeqId = log.startCacheFlush();
- log.completeCacheFlush(regionName, tableName, logSeqId, info.isMetaRegion());
+ log.completeCacheFlush(info.getEncodedNameAsBytes(), tableName, logSeqId, info.isMetaRegion());
log.close();
Path filename = log.computeFilename();
log = null;
@@ -458,7 +457,7 @@ public class TestHLog {
if (entry == null) break;
HLogKey key = entry.getKey();
WALEdit val = entry.getEdit();
- assertTrue(Bytes.equals(regionName, key.getRegionName()));
+ assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
assertTrue(Bytes.equals(tableName, key.getTablename()));
KeyValue kv = val.getKeyValues().get(0);
assertTrue(Bytes.equals(row, kv.getRow()));
@@ -470,7 +469,7 @@ public class TestHLog {
HLogKey key = entry.getKey();
WALEdit val = entry.getEdit();
// Assert only one more row... the meta flushed row.
- assertTrue(Bytes.equals(regionName, key.getRegionName()));
+ assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
assertTrue(Bytes.equals(tableName, key.getTablename()));
KeyValue kv = val.getKeyValues().get(0);
assertTrue(Bytes.equals(HLog.METAROW, kv.getRow()));
@@ -498,7 +497,7 @@ public class TestHLog {
final byte [] tableName = Bytes.toBytes("tablename");
final byte [] row = Bytes.toBytes("row");
Reader reader = null;
- HLog log = new HLog(fs, dir, oldLogDir, conf, null);
+ HLog log = new HLog(fs, dir, oldLogDir, conf);
try {
// Write columns named 1, 2, 3, etc. and then values of single byte
// 1, 2, 3...
@@ -513,7 +512,7 @@ public class TestHLog {
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
log.append(hri, tableName, cols, System.currentTimeMillis());
long logSeqId = log.startCacheFlush();
- log.completeCacheFlush(hri.getRegionName(), tableName, logSeqId, false);
+ log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, logSeqId, false);
log.close();
Path filename = log.computeFilename();
log = null;
@@ -524,7 +523,7 @@ public class TestHLog {
int idx = 0;
for (KeyValue val : entry.getEdit().getKeyValues()) {
assertTrue(Bytes.equals(hri.getRegionName(),
- entry.getKey().getRegionName()));
+ entry.getKey().getEncodedRegionName()));
assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
assertTrue(Bytes.equals(row, val.getRow()));
assertEquals((byte)(idx + '0'), val.getValue()[0]);
@@ -537,7 +536,7 @@ public class TestHLog {
assertEquals(1, entry.getEdit().size());
for (KeyValue val : entry.getEdit().getKeyValues()) {
assertTrue(Bytes.equals(hri.getRegionName(),
- entry.getKey().getRegionName()));
+ entry.getKey().getEncodedRegionName()));
assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
@@ -564,9 +563,9 @@ public class TestHLog {
final int COL_COUNT = 10;
final byte [] tableName = Bytes.toBytes("tablename");
final byte [] row = Bytes.toBytes("row");
- HLog log = new HLog(fs, dir, oldLogDir, conf, null);
- DumbLogEntriesVisitor visitor = new DumbLogEntriesVisitor();
- log.addLogEntryVisitor(visitor);
+ HLog log = new HLog(fs, dir, oldLogDir, conf);
+ DumbWALObserver visitor = new DumbWALObserver();
+ log.registerWALActionsListener(visitor);
long timestamp = System.currentTimeMillis();
HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
@@ -578,7 +577,7 @@ public class TestHLog {
log.append(hri, tableName, cols, System.currentTimeMillis());
}
assertEquals(COL_COUNT, visitor.increments);
- log.removeLogEntryVisitor(visitor);
+ log.unregisterWALActionsListener(visitor);
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(11)),
@@ -587,8 +586,7 @@ public class TestHLog {
assertEquals(COL_COUNT, visitor.increments);
}
- static class DumbLogEntriesVisitor implements LogEntryVisitor {
-
+ static class DumbWALObserver implements WALObserver {
int increments = 0;
@Override
@@ -596,5 +594,17 @@ public class TestHLog {
WALEdit logEdit) {
increments++;
}
+
+ @Override
+ public void logRolled(Path newFile) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void logRollRequested() {
+ // TODO Auto-generated method stub
+
+ }
}
}
Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALObserver.java?rev=990266&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALObserver.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALObserver.java Fri Aug 27 20:53:15 2010
@@ -0,0 +1,135 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Test that the actions are called while playing with an HLog
+ */
+public class TestWALObserver {
+ protected static final Log LOG = LogFactory.getLog(TestWALObserver.class);
+
+ private final static HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+
+ private final static byte[] SOME_BYTES = Bytes.toBytes("t");
+ private static FileSystem fs;
+ private static Path oldLogDir;
+ private static Path logDir;
+ private static Configuration conf;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf = TEST_UTIL.getConfiguration();
+ conf.setInt("hbase.regionserver.maxlogs", 5);
+ fs = FileSystem.get(conf);
+ oldLogDir = new Path(HBaseTestingUtility.getTestDir(),
+ HConstants.HREGION_OLDLOGDIR_NAME);
+ logDir = new Path(HBaseTestingUtility.getTestDir(),
+ HConstants.HREGION_LOGDIR_NAME);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ fs.delete(logDir, true);
+ fs.delete(oldLogDir, true);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ setUp();
+ }
+
+ /**
+ * Add a bunch of dummy data and roll the logs every two insert. We
+ * should end up with 10 rolled files (plus the roll called in
+ * the constructor). Also test adding a listener while it's running.
+ */
+ @Test
+ public void testActionListener() throws Exception {
+ DummyWALObserver observer = new DummyWALObserver();
+ List<WALObserver> list = new ArrayList<WALObserver>();
+ list.add(observer);
+ DummyWALObserver laterobserver = new DummyWALObserver();
+ HLog hlog = new HLog(fs, logDir, oldLogDir, conf, list, null);
+ HRegionInfo hri = new HRegionInfo(new HTableDescriptor(SOME_BYTES),
+ SOME_BYTES, SOME_BYTES, false);
+
+ for (int i = 0; i < 20; i++) {
+ byte[] b = Bytes.toBytes(i+"");
+ KeyValue kv = new KeyValue(b,b,b);
+ WALEdit edit = new WALEdit();
+ edit.add(kv);
+ HLogKey key = new HLogKey(b,b, 0, 0);
+ hlog.append(hri, key, edit);
+ if (i == 10) {
+ hlog.registerWALActionsListener(laterobserver);
+ }
+ if (i % 2 == 0) {
+ hlog.rollWriter();
+ }
+ }
+ assertEquals(11, observer.logRollCounter);
+ assertEquals(5, laterobserver.logRollCounter);
+ }
+
+ /**
+ * Just counts when methods are called
+ */
+ static class DummyWALObserver implements WALObserver {
+ public int logRollCounter = 0;
+
+ @Override
+ public void logRolled(Path newFile) {
+ logRollCounter++;
+ }
+
+ @Override
+ public void logRollRequested() {
+ // Not interested
+ }
+
+ @Override
+ public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+ WALEdit logEdit) {
+ // Not interested
+
+ }
+ }
+}
\ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Fri Aug 27 20:53:15 2010
@@ -482,10 +482,10 @@ public class TestWALReplay {
* @throws IOException
*/
private HLog createWAL(final Configuration c) throws IOException {
- HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c, null);
+ HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c);
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
return wal;
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java?rev=990266&r1=990265&r2=990266&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java Fri Aug 27 20:53:15 2010
@@ -248,7 +248,7 @@ public class TestMergeTool extends HBase
System.currentTimeMillis());
LOG.info("Creating log " + logPath.toString());
Path oldLogDir = new Path("/tmp", HConstants.HREGION_OLDLOGDIR_NAME);
- HLog log = new HLog(this.fs, logPath, oldLogDir, this.conf, null);
+ HLog log = new HLog(this.fs, logPath, oldLogDir, this.conf);
try {
// Merge Region 0 and Region 1
HRegion merged = mergeAndVerify("merging regions 0 and 1",