You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2010/07/01 02:25:51 UTC
svn commit: r959479 [1/2] - in /hbase/trunk: ./ bin/replication/
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/ipc/
src/main/java/org/apache/hadoop/hbase/master/
src/main/java/org/apache/hadoop/hbase/regionserver/ src/mai...
Author: jdcryans
Date: Thu Jul 1 00:25:50 2010
New Revision: 959479
URL: http://svn.apache.org/viewvc?rev=959479&view=rev
Log:
HBASE-2223 Handle 10min+ network partitions between clusters
Added:
hbase/trunk/bin/replication/
hbase/trunk/bin/replication/add_peer.rb
hbase/trunk/bin/replication/copy_tables_desc.rb
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/pom.xml
hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu Jul 1 00:25:50 2010
@@ -793,6 +793,7 @@ Release 0.21.0 - Unreleased
(Jeff Hammerbacher via Ryan Rawson)
HBASE-7 Provide a HBase checker and repair tool similar to fsck
(dhruba borthakur via Stack)
+ HBASE-2223 Handle 10min+ network partitions between clusters
OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite
Added: hbase/trunk/bin/replication/add_peer.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/bin/replication/add_peer.rb?rev=959479&view=auto
==============================================================================
--- hbase/trunk/bin/replication/add_peer.rb (added)
+++ hbase/trunk/bin/replication/add_peer.rb Thu Jul 1 00:25:50 2010
@@ -0,0 +1,75 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Script to add a peer to a cluster
+# To see usage for this script, run:
+#
+# ${HBASE_HOME}/bin/hbase org.jruby.Main add_peer.rb
+#
+
+include Java
+import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.hbase.EmptyWatcher
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
+
+# Name of this script
+NAME = "add_peer"
+
+# Print usage for this script
+def usage
+ puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
+ exit!
+end
+
+if ARGV.size != 2
+ usage
+end
+
+LOG = LogFactory.getLog(NAME)
+
+parts1 = ARGV[0].split(":")
+
+c2 = HBaseConfiguration.create()
+parts2 = ARGV[1].split(":")
+
+c1 = HBaseConfiguration.create()
+c1.set(HConstants::ZOOKEEPER_QUORUM, parts1[0])
+c1.set("hbase.zookeeper.property.clientPort", parts1[1])
+c1.set(HConstants::ZOOKEEPER_ZNODE_PARENT, parts1[2])
+
+zkw1 = ZooKeeperWrapper.createInstance(c1, "ZK1")
+zkw1.writeZNode(parts1[2], "replication", "a")
+zkw1.writeZNode(parts1[2] + "/replication", "master", ARGV[0]);
+zkw1.writeZNode(parts1[2] + "/replication", "state", "true");
+zkw1.writeZNode(parts1[2] + "/replication/peers", "test", ARGV[1]);
+
+
+c2.set(HConstants::ZOOKEEPER_QUORUM, parts2[0])
+c2.set("hbase.zookeeper.property.clientPort", parts2[1])
+c2.set(HConstants::ZOOKEEPER_ZNODE_PARENT, parts2[2])
+
+zkw2 = ZooKeeperWrapper.createInstance(c2, "ZK2")
+zkw2.writeZNode(parts2[2], "replication", "a")
+zkw2.writeZNode(parts2[2] + "/replication", "master", ARGV[0]);
+
+puts "Peer successfully added"
Added: hbase/trunk/bin/replication/copy_tables_desc.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/bin/replication/copy_tables_desc.rb?rev=959479&view=auto
==============================================================================
--- hbase/trunk/bin/replication/copy_tables_desc.rb (added)
+++ hbase/trunk/bin/replication/copy_tables_desc.rb Thu Jul 1 00:25:50 2010
@@ -0,0 +1,75 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Script to recreate all tables from one cluster to another
+# To see usage for this script, run:
+#
+# ${HBASE_HOME}/bin/hbase org.jruby.Main copy_tables_desc.rb
+#
+
+include Java
+import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.hbase.EmptyWatcher
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import org.apache.hadoop.hbase.HTableDescriptor
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
+
+# Name of this script
+NAME = "copy_tables_desc"
+
+# Print usage for this script
+def usage
+ puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
+ exit!
+end
+
+if ARGV.size != 2
+ usage
+end
+
+LOG = LogFactory.getLog(NAME)
+
+parts1 = ARGV[0].split(":")
+
+parts2 = ARGV[1].split(":")
+
+c1 = HBaseConfiguration.create()
+c1.set(HConstants::ZOOKEEPER_QUORUM, parts1[0])
+c1.set("hbase.zookeeper.property.clientPort", parts1[1])
+c1.set(HConstants::ZOOKEEPER_ZNODE_PARENT, parts1[2])
+
+admin1 = HBaseAdmin.new(c1)
+
+c2 = HBaseConfiguration.create()
+c2.set(HConstants::ZOOKEEPER_QUORUM, parts2[0])
+c2.set("hbase.zookeeper.property.clientPort", parts2[1])
+c2.set(HConstants::ZOOKEEPER_ZNODE_PARENT, parts2[2])
+
+admin2 = HBaseAdmin.new(c2)
+
+for t in admin1.listTables()
+ admin2.createTable(t)
+end
+
+
+puts "All descriptions were copied"
Modified: hbase/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/pom.xml?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/pom.xml (original)
+++ hbase/trunk/pom.xml Thu Jul 1 00:25:50 2010
@@ -444,7 +444,7 @@
<compileSource>1.6</compileSource>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hbase.version>0.21.0-SNAPSHOT</hbase.version>
- <hadoop.version>0.20.3-append-r956776+1240</hadoop.version>
+ <hadoop.version>0.20.3-append-r956776+1240+tail</hadoop.version>
<commons-cli.version>1.2</commons-cli.version>
<commons-logging.version>1.1.1</commons-logging.version>
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Jul 1 00:25:50 2010
@@ -345,6 +345,9 @@ public final class HConstants {
*/
public static long DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD = 60000;
+ public static final String
+ REPLICATION_ENABLE_KEY = "hbase.replication";
+
private HConstants() {
// Can't be instantiated with this ctor.
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Thu Jul 1 00:25:50 2010
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
import java.io.IOException;
import java.util.List;
@@ -279,10 +280,22 @@ public interface HRegionInterface extend
* @throws IOException e
*/
public MultiPutResponse multiPut(MultiPut puts) throws IOException;
-
+
/**
* Bulk load an HFile into an open region
*/
public void bulkLoadHFile(String hfilePath,
byte[] regionName, byte[] familyName) throws IOException;
+
+ /**
+ * Replicates the given entries. The guarantee is that the given entries
+ * will be durable on the slave cluster if this method returns without
+ * any exception.
+ * hbase.replication has to be set to true for this to work.
+ *
+ * @param entries entries to replicate
+ * @throws IOException
+ */
+ public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
+
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Thu Jul 1 00:25:50 2010
@@ -1023,6 +1023,7 @@ public class HMaster extends Thread impl
byte [] rowKey = ((ImmutableBytesWritable)args[0]).get();
pair = getTableRegionForRow(tableName, rowKey);
}
+ LOG.info("About to " + op.toString() + " on " + Bytes.toString(tableName) + " and pair is " + pair);
if (pair != null && pair.getSecond() != null) {
this.regionManager.startAction(pair.getFirst().getRegionName(),
pair.getFirst(), pair.getSecond(), op);
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java Thu Jul 1 00:25:50 2010
@@ -27,7 +27,9 @@ import org.apache.hadoop.fs.Path;
* Interface for the log cleaning function inside the master. Only 1 is called
* so if the desired effect is the mix of many cleaners, do call them yourself
* in order to control the flow.
- * HBase ships with OldLogsCleaner as the default implementation
+ * HBase ships with OldLogsCleaner as the default implementation.
+ * This interface extends Configurable, so setConf needs to be called once
+ * before using the cleaner.
*/
public interface LogCleanerDelegate extends Configurable {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java Thu Jul 1 00:25:50 2010
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -62,6 +63,13 @@ public class OldLogsCleaner extends Chor
Configuration conf, FileSystem fs,
Path oldLogDir) {
super("OldLogsCleaner", p, s);
+ // Use the log cleaner provided by replication if enabled, unless something
+ // was already provided
+ if (conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false) &&
+ conf.get("hbase.master.logcleanerplugin.impl") == null) {
+ conf.set("hbase.master.logcleanerplugin.impl",
+ "org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner");
+ }
this.maxDeletedLogs =
conf.getInt("hbase.master.logcleaner.maxdeletedlogs", 20);
this.fs = fs;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Jul 1 00:25:50 2010
@@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.ipc.HMast
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.InfoServer;
@@ -236,6 +237,10 @@ public class HRegionServer implements HR
private final String machineName;
+ // Replication-related attributes
+ private Replication replicationHandler;
+ // End of replication
+
/**
* Starts a HRegionServer at the default location
* @param conf
@@ -913,15 +918,18 @@ public class HRegionServer implements HR
"running at " + this.serverInfo.getServerName() +
" because logdir " + logdir.toString() + " exists");
}
- HLog newlog = instantiateHLog(logdir, oldLogDir);
- return newlog;
+ this.replicationHandler = new Replication(this.conf,this.serverInfo,
+ this.fs, oldLogDir, stopRequested);
+ HLog log = instantiateHLog(logdir, oldLogDir);
+ this.replicationHandler.addLogEntryVisitor(log);
+ return log;
}
// instantiate
protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
- HLog newlog = new HLog(fs, logdir, oldLogDir, conf, hlogRoller, null,
- serverInfo.getServerAddress().toString());
- return newlog;
+ return new HLog(this.fs, logdir, oldLogDir, this.conf, this.hlogRoller,
+ this.replicationHandler.getReplicationManager(),
+ this.serverInfo.getServerAddress().toString());
}
@@ -1046,12 +1054,14 @@ public class HRegionServer implements HR
port++;
// update HRS server info port.
this.serverInfo = new HServerInfo(this.serverInfo.getServerAddress(),
- this.serverInfo.getStartCode(), port,
+ this.serverInfo.getStartCode(), port,
this.serverInfo.getHostname());
}
}
}
+ this.replicationHandler.startReplicationServices();
+
// Start Server. This service is like leases in that it internally runs
// a thread.
this.server.start();
@@ -1140,7 +1150,7 @@ public class HRegionServer implements HR
this.abortRequested = true;
this.reservedSpace.clear();
if (this.metrics != null) {
- LOG.info("Dump of metrics: " + this.metrics.toString());
+ LOG.info("Dump of metrics: " + this.metrics);
}
stop();
}
@@ -1172,6 +1182,7 @@ public class HRegionServer implements HR
Threads.shutdown(this.cacheFlusher);
Threads.shutdown(this.compactSplitThread);
Threads.shutdown(this.hlogRoller);
+ this.replicationHandler.join();
}
private boolean getMaster() {
@@ -2444,6 +2455,11 @@ public class HRegionServer implements HR
}
}
+ @Override
+ public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+ this.replicationHandler.replicateLogEntries(entries);
+ }
+
/**
* Do class main.
* @param args
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Thu Jul 1 00:25:50 2010
@@ -36,11 +36,13 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -117,6 +119,7 @@ import com.google.common.util.concurrent
*/
public class HLog implements Syncable {
static final Log LOG = LogFactory.getLog(HLog.class);
+ private static final String HLOG_DATFILE = "hlog.dat.";
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
static final byte [] METAROW = Bytes.toBytes("METAROW");
private final FileSystem fs;
@@ -219,6 +222,9 @@ public class HLog implements Syncable {
*/
private final LogSyncer logSyncerThread;
+ private final List<LogEntryVisitor> logEntryVisitors =
+ new CopyOnWriteArrayList<LogEntryVisitor>();
+
/**
* Pattern used to validate a HLog file name
*/
@@ -1028,6 +1034,11 @@ public class HLog implements Syncable {
if (!this.enabled) {
return;
}
+ if (!this.logEntryVisitors.isEmpty()) {
+ for (LogEntryVisitor visitor : this.logEntryVisitors) {
+ visitor.visitLogEntryBeforeWrite(info, logKey, logEdit);
+ }
+ }
try {
long now = System.currentTimeMillis();
this.writer.append(new HLog.Entry(logKey, logEdit));
@@ -1179,8 +1190,16 @@ public class HLog implements Syncable {
srcDir.toString());
splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
try {
- LOG.info("Spliting is done. Removing old log dir "+srcDir);
- fs.delete(srcDir, false);
+ FileStatus[] files = fs.listStatus(srcDir);
+ for(FileStatus file : files) {
+ Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
+ LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to " +
+ FSUtils.getPath(newPath));
+ fs.rename(file.getPath(), newPath);
+ }
+ LOG.debug("Moved " + files.length + " log files to " +
+ FSUtils.getPath(oldLogDir));
+ fs.delete(srcDir, true);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
IOException io = new IOException("Cannot delete: " + srcDir);
@@ -1632,11 +1651,21 @@ public class HLog implements Syncable {
return new Path(regionDir, RECOVERED_EDITS);
}
+ /**
+ *
+ * @param visitor
+ */
+ public void addLogEntryVisitor(LogEntryVisitor visitor) {
+ this.logEntryVisitors.add(visitor);
+ }
-
-
-
-
+ /**
+ *
+ * @param visitor
+ */
+ public void removeLogEntryVisitor(LogEntryVisitor visitor) {
+ this.logEntryVisitors.remove(visitor);
+ }
public void addLogActionsListerner(LogActionsListener list) {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java?rev=959479&r1=959478&r2=959479&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Thu Jul 1 00:25:50 2010
@@ -46,7 +46,6 @@ public class HLogKey implements Writable
private long writeTime;
private byte clusterId;
- private int scope;
/** Writable Consructor -- Do not use. */
public HLogKey() {
@@ -70,7 +69,6 @@ public class HLogKey implements Writable
this.logSeqNum = logSeqNum;
this.writeTime = now;
this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
- this.scope = HConstants.REPLICATION_SCOPE_LOCAL;
}
//////////////////////////////////////////////////////////////////////////////
@@ -119,22 +117,6 @@ public class HLogKey implements Writable
this.clusterId = clusterId;
}
- /**
- * Get the replication scope of this key
- * @return replication scope
- */
- public int getScope() {
- return this.scope;
- }
-
- /**
- * Set the replication scope of this key
- * @param scope The new scope
- */
- public void setScope(int scope) {
- this.scope = scope;
- }
-
@Override
public String toString() {
return Bytes.toString(tablename) + "/" + Bytes.toString(regionName) + "/" +
@@ -158,7 +140,6 @@ public class HLogKey implements Writable
result ^= this.logSeqNum;
result ^= this.writeTime;
result ^= this.clusterId;
- result ^= this.scope;
return result;
}
@@ -187,7 +168,6 @@ public class HLogKey implements Writable
out.writeLong(this.logSeqNum);
out.writeLong(this.writeTime);
out.writeByte(this.clusterId);
- out.writeInt(this.scope);
}
public void readFields(DataInput in) throws IOException {
@@ -197,7 +177,6 @@ public class HLogKey implements Writable
this.writeTime = in.readLong();
try {
this.clusterId = in.readByte();
- this.scope = in.readInt();
} catch(EOFException e) {
// Means it's an old key, just continue
}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java Thu Jul 1 00:25:50 2010
@@ -0,0 +1,15 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+
+public interface LogEntryVisitor {
+
+ /**
+ *
+ * @param info
+ * @param logKey
+ * @param logEdit
+ */
+ public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+ WALEdit logEdit);
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java Thu Jul 1 00:25:50 2010
@@ -0,0 +1,493 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class serves as a helper for all things related to zookeeper
+ * in replication.
+ * <p/>
+ * The layout looks something like this under zookeeper.znode.parent
+ * for the master cluster:
+ * <p/>
+ * <pre>
+ * replication/
+ * master {contains a full cluster address}
+ * state {contains true or false}
+ * clusterId {contains a byte}
+ * peers/
+ * 1/ {contains a full cluster address}
+ * 2/
+ * ...
+ * rs/ {lists all RS that replicate}
+ * startcode1/ {lists all peer clusters}
+ * 1/ {lists hlogs to process}
+ * 10.10.1.76%3A53488.123456789 {contains nothing or a position}
+ * 10.10.1.76%3A53488.123456790
+ * ...
+ * 2/
+ * ...
+ * startcode2/
+ * ...
+ * </pre>
+ */
+public class ReplicationZookeeperWrapper {
+
+ private static final Log LOG =
+ LogFactory.getLog(ReplicationZookeeperWrapper.class);
+ // Name of znode we use to lock when failover
+ private final static String RS_LOCK_ZNODE = "lock";
+ // Our handle on zookeeper
+ private final ZooKeeperWrapper zookeeperWrapper;
+ // Map of addresses of peer clusters with their ZKW
+ private final Map<String, ZooKeeperWrapper> peerClusters;
+ // Path to the root replication znode
+ private final String replicationZNode;
+ // Path to the peer clusters znode
+ private final String peersZNode;
+ // Path to the znode that contains all RS that replicates
+ private final String rsZNode;
+ // Path to this region server's name under rsZNode
+ private final String rsServerNameZnode;
+ // Name node if the replicationState znode
+ private final String replicationStateNodeName;
+ // If this RS is part of a master cluster
+ private final boolean replicationMaster;
+ private final Configuration conf;
+ // Is this cluster replicating at the moment?
+ private final AtomicBoolean replicating;
+ // Byte (stored as string here) that identifies this cluster
+ private final String clusterId;
+
+ /**
+ * Constructor used by region servers, connects to the peer cluster right away.
+ *
+ * @param zookeeperWrapper zkw to wrap
+ * @param conf conf to use
+ * @param replicating atomic boolean to start/stop replication
+ * @param rsName the name of this region server, null if
+ * using RZH only to use the helping methods
+ * @throws IOException
+ */
+ public ReplicationZookeeperWrapper(
+ ZooKeeperWrapper zookeeperWrapper, Configuration conf,
+ final AtomicBoolean replicating, String rsName) throws IOException {
+ this.zookeeperWrapper = zookeeperWrapper;
+ this.conf = conf;
+ String replicationZNodeName =
+ conf.get("zookeeper.znode.replication", "replication");
+ String peersZNodeName =
+ conf.get("zookeeper.znode.replication.peers", "peers");
+ String repMasterZNodeName =
+ conf.get("zookeeper.znode.replication.master", "master");
+ this.replicationStateNodeName =
+ conf.get("zookeeper.znode.replication.state", "state");
+ String clusterIdZNodeName =
+ conf.get("zookeeper.znode.replication.clusterId", "clusterId");
+ String rsZNodeName =
+ conf.get("zookeeper.znode.replication.rs", "rs");
+ String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
+ this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
+ this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+
+ this.peerClusters = new HashMap<String, ZooKeeperWrapper>();
+ this.replicationZNode = zookeeperWrapper.getZNode(
+ zookeeperWrapper.getParentZNode(), replicationZNodeName);
+ this.peersZNode =
+ zookeeperWrapper.getZNode(replicationZNode, peersZNodeName);
+ this.rsZNode =
+ zookeeperWrapper.getZNode(replicationZNode, rsZNodeName);
+
+ this.replicating = replicating;
+ setReplicating();
+ String idResult = Bytes.toString(
+ this.zookeeperWrapper.getData(this.replicationZNode,
+ clusterIdZNodeName));
+ this.clusterId =
+ idResult == null ?
+ Byte.toString(HConstants.DEFAULT_CLUSTER_ID) : idResult;
+ String address = Bytes.toString(
+ this.zookeeperWrapper.getData(this.replicationZNode,
+ repMasterZNodeName));
+ this.replicationMaster = thisCluster.equals(address);
+ LOG.info("This cluster (" + thisCluster + ") is a "
+ + (this.replicationMaster ? "master" : "slave") + " for replication" +
+ ", compared with (" + address + ")");
+ if (rsName != null) {
+ this.rsServerNameZnode =
+ this.zookeeperWrapper.getZNode(rsZNode, rsName);
+ List<String> znodes = this.zookeeperWrapper.listZnodes(this.peersZNode,
+ new ReplicationStatusWatcher());
+ if (znodes != null) {
+ for (String znode : znodes) {
+ connectToPeer(znode);
+ }
+ }
+ } else {
+ this.rsServerNameZnode = null;
+ }
+
+ }
+
+ /**
+ * Returns all region servers from given peer
+ *
+ * @param peerClusterId (byte) the cluster to interrogate
+ * @return addresses of all region servers
+ */
+ public List<HServerAddress> getPeersAddresses(String peerClusterId) {
+ if (this.peerClusters.size() == 0) {
+ return new ArrayList<HServerAddress>(0);
+ }
+ ZooKeeperWrapper zkw = this.peerClusters.get(peerClusterId);
+ return zkw == null?
+ new ArrayList<HServerAddress>(0) : zkw.scanRSDirectory();
+ }
+
+ /**
+ * This method connects this cluster to another one and registers it
+ * in this region server's replication znode
+ * @param peerId id of the peer cluster
+ */
+ private void connectToPeer(String peerId) throws IOException {
+ String[] ensemble =
+ Bytes.toString(this.zookeeperWrapper.getData(this.peersZNode, peerId)).
+ split(":");
+ if (ensemble.length != 3) {
+ throw new IllegalArgumentException("Wrong format of cluster address: " +
+ this.zookeeperWrapper.getData(this.peersZNode, peerId));
+ }
+ Configuration otherConf = new Configuration(this.conf);
+ otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
+ otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
+ otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
+ ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
+ "connection to cluster: " + peerId);
+ zkw.registerListener(new ReplicationStatusWatcher());
+ this.peerClusters.put(peerId, zkw);
+ this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
+ this.rsServerNameZnode, peerId));
+ LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
+ }
+
+ /**
+ * This reads the state znode for replication and sets the atomic boolean
+ */
+ private void setReplicating() {
+ String value = Bytes.toString(this.zookeeperWrapper.getDataAndWatch(
+ this.replicationZNode, this.replicationStateNodeName,
+ new ReplicationStatusWatcher()));
+ if (value != null) {
+ this.replicating.set(value.equals("true"));
+ LOG.info("Replication is now " + (this.replicating.get() ?
+ "started" : "stopped"));
+ }
+ }
+
+ /**
+ * Add a new log to the list of hlogs in zookeeper
+ * @param filename name of the hlog's znode
+ * @param clusterId name of the cluster's znode
+ */
+ public void addLogToList(String filename, String clusterId) {
+ try {
+ this.zookeeperWrapper.writeZNode(
+ this.zookeeperWrapper.getZNode(
+ this.rsServerNameZnode, clusterId), filename, "");
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ }
+ }
+
+ /**
+ * Remove a log from the list of hlogs in zookeeper
+ * @param filename name of the hlog's znode
+ * @param clusterId name of the cluster's znode
+ */
+ public void removeLogFromList(String filename, String clusterId) {
+ try {
+ this.zookeeperWrapper.deleteZNode(
+ this.zookeeperWrapper.getZNode(this.rsServerNameZnode,
+ this.zookeeperWrapper.getZNode(clusterId, filename)));
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ }
+ }
+
+ /**
+ * Set the current position of the specified cluster in the current hlog
+ * @param filename filename name of the hlog's znode
+ * @param clusterId clusterId name of the cluster's znode
+ * @param position the position in the file
+ * @throws IOException
+ */
+ public void writeReplicationStatus(String filename, String clusterId,
+ long position) {
+ try {
+ String clusterZNode = this.zookeeperWrapper.getZNode(
+ this.rsServerNameZnode, clusterId);
+ this.zookeeperWrapper.writeZNode(clusterZNode, filename,
+ Long.toString(position));
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ }
+ }
+
+ /**
+ * Get a list of all the other region servers in this cluster
+ * and set a watch
+ * @param watch the watch to set
+ * @return a list of server nanes
+ */
+ public List<String> getRegisteredRegionServers(Watcher watch) {
+ return this.zookeeperWrapper.listZnodes(
+ this.zookeeperWrapper.getRsZNode(), watch);
+ }
+
+ /**
+ * Get the list of the replicators that have queues, they can be alive, dead
+ * or simply from a previous run
+ * @param watch the watche to set
+ * @return a list of server names
+ */
+ public List<String> getListOfReplicators(Watcher watch) {
+ return this.zookeeperWrapper.listZnodes(rsZNode, watch);
+ }
+
+ /**
+ * Get the list of peer clusters for the specified server names
+ * @param rs server names of the rs
+ * @param watch the watch to set
+ * @return a list of peer cluster
+ */
+ public List<String> getListPeersForRS(String rs, Watcher watch) {
+ return this.zookeeperWrapper.listZnodes(
+ zookeeperWrapper.getZNode(rsZNode, rs), watch);
+ }
+
+ /**
+ * Get the list of hlogs for the specified region server and peer cluster
+ * @param rs server names of the rs
+ * @param id peer cluster
+ * @param watch the watch to set
+ * @return a list of hlogs
+ */
+ public List<String> getListHLogsForPeerForRS(String rs, String id, Watcher watch) {
+ return this.zookeeperWrapper.listZnodes(
+ zookeeperWrapper.getZNode(zookeeperWrapper.getZNode(rsZNode, rs), id), watch);
+ }
+
+ /**
+ * Try to set a lock in another server's znode.
+ * @param znode the server names of the other server
+ * @return true if the lock was acquired, false in every other cases
+ */
+ public boolean lockOtherRS(String znode) {
+ try {
+ this.zookeeperWrapper.writeZNode(
+ this.zookeeperWrapper.getZNode(this.rsZNode, znode),
+ RS_LOCK_ZNODE, rsServerNameZnode, true);
+
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ return false;
+ } catch (KeeperException e) {
+ LOG.debug("Won't lock " + znode + " because " + e.getMessage());
+ // TODO see if the other still exists!!
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * This methods copies all the hlogs queues from another region server
+ * and returns them all sorted per peer cluster (appended with the dead
+ * server's znode)
+ * @param znode server names to copy
+ * @return all hlogs for all peers of that cluster, null if an error occurred
+ */
+ public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
+ // TODO this method isn't atomic enough, we could start copying and then
+ // TODO fail for some reason and we would end up with znodes we don't want.
+ SortedMap<String,SortedSet<String>> queues =
+ new TreeMap<String,SortedSet<String>>();
+ try {
+ String nodePath = this.zookeeperWrapper.getZNode(rsZNode, znode);
+ List<String> clusters = this.zookeeperWrapper.listZnodes(nodePath, null);
+ // We have a lock znode in there, it will count as one.
+ if (clusters == null || clusters.size() <= 1) {
+ return queues;
+ }
+ // The lock isn't a peer cluster, remove it
+ clusters.remove(RS_LOCK_ZNODE);
+ for (String cluster : clusters) {
+ // We add the name of the recovered RS to the new znode, we can even
+ // do that for queues that were recovered 10 times giving a znode like
+ // number-startcode-number-otherstartcode-number-anotherstartcode-etc
+ String newCluster = cluster+"-"+znode;
+ String newClusterZnode =
+ this.zookeeperWrapper.getZNode(rsServerNameZnode, newCluster);
+ this.zookeeperWrapper.ensureExists(newClusterZnode);
+ String clusterPath = this.zookeeperWrapper.getZNode(nodePath, cluster);
+ List<String> hlogs = this.zookeeperWrapper.listZnodes(clusterPath, null);
+ // That region server didn't have anything to replicate for this cluster
+ if (hlogs == null || hlogs.size() == 0) {
+ continue;
+ }
+ SortedSet<String> logQueue = new TreeSet<String>();
+ queues.put(newCluster, logQueue);
+ for (String hlog : hlogs) {
+ String position = Bytes.toString(
+ this.zookeeperWrapper.getData(clusterPath, hlog));
+ LOG.debug("Creating " + hlog + " with data " + position);
+ this.zookeeperWrapper.writeZNode(newClusterZnode, hlog, position);
+ logQueue.add(hlog);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.warn(e);
+ return null;
+ } catch (KeeperException e) {
+ LOG.warn(e);
+ return null;
+ }
+ return queues;
+ }
+
+ /**
+ * Delete a complete queue of hlogs
+ * @param peerZnode znode of the peer cluster queue of hlogs to delete
+ */
+ public void deleteSource(String peerZnode) {
+ try {
+ this.zookeeperWrapper.deleteZNode(
+ this.zookeeperWrapper.getZNode(rsServerNameZnode, peerZnode), true);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ }
+ }
+
+ /**
+ * Recursive deletion of all znodes in specified rs' znode
+ * @param znode
+ */
+ public void deleteRsQueues(String znode) {
+ try {
+ this.zookeeperWrapper.deleteZNode(
+ this.zookeeperWrapper.getZNode(rsZNode, znode), true);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ }
+ }
+
+ /**
+ * Delete this cluster's queues
+ */
+ public void deleteOwnRSZNode() {
+ deleteRsQueues(this.rsServerNameZnode);
+ }
+
+ /**
+ * Get the position of the specified hlog in the specified peer znode
+ * @param peerId znode of the peer cluster
+ * @param hlog name of the hlog
+ * @return the position in that hlog
+ */
+ public long getHLogRepPosition(String peerId, String hlog) {
+ String clusterZnode =
+ this.zookeeperWrapper.getZNode(rsServerNameZnode, peerId);
+ String data = Bytes.toString(
+ this.zookeeperWrapper.getData(clusterZnode, hlog));
+ return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
+ }
+
+ /**
+ * Tells if this cluster replicates or not
+ *
+ * @return if this is a master
+ */
+ public boolean isReplicationMaster() {
+ return this.replicationMaster;
+ }
+
+ /**
+ * Get the identification of the cluster
+ *
+ * @return the id for the cluster
+ */
+ public String getClusterId() {
+ return this.clusterId;
+ }
+
+ /**
+ * Get a map of all peer clusters
+ * @return map of peer cluster, zk address to ZKW
+ */
+ public Map<String, ZooKeeperWrapper> getPeerClusters() {
+ return this.peerClusters;
+ }
+
+ /**
+ * Watcher for the status of the replication
+ */
+ public class ReplicationStatusWatcher implements Watcher {
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ Event.EventType type = watchedEvent.getType();
+ LOG.info("Got event " + type + " with path " + watchedEvent.getPath());
+ if (type.equals(Event.EventType.NodeDataChanged)) {
+ setReplicating();
+ }
+ }
+ }
+
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Thu Jul 1 00:25:50 2010
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.LogCleanerDelegate;
+import org.apache.hadoop.hbase.master.TimeToLiveLogCleaner;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Implementation of a log cleaner that checks if a log is still scheduled for
+ * replication before deleting it when its TTL is over.
+ */
+public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
+
+ private static final Log LOG =
+ LogFactory.getLog(ReplicationLogCleaner.class);
+ private TimeToLiveLogCleaner ttlCleaner;
+ private Configuration conf;
+ private ReplicationZookeeperWrapper zkHelper;
+ private Set<String> hlogs = new HashSet<String>();
+
+ /**
+ * Instantiates the cleaner, does nothing more.
+ */
+ public ReplicationLogCleaner() {}
+
+ @Override
+ public boolean isLogDeletable(Path filePath) {
+
+ // Don't bother going further if the hlog isn't even expired
+ if (!ttlCleaner.isLogDeletable(filePath)) {
+ LOG.debug("Won't delete log since not past due " + filePath);
+ return false;
+ }
+ String log = filePath.getName();
+ // If we saw the hlog previously, let's consider it's still used
+ // At some point in the future we will refresh the list and it will be gone
+ if (this.hlogs.contains(log)) {
+ return false;
+ }
+
+ // Let's see it's still there
+ // This solution makes every miss very expensive to process since we
+ // almost completly refresh the cache each time
+ return !refreshHLogsAndSearch(log);
+ }
+
+ /**
+ * Search through all the hlogs we have in ZK to refresh the cache
+ * If a log is specified and found, then we early out and return true
+ * @param searchedLog log we are searching for, pass null to cache everything
+ * that's in zookeeper.
+ * @return false until a specified log is found.
+ */
+ private boolean refreshHLogsAndSearch(String searchedLog) {
+ this.hlogs.clear();
+ final boolean lookForLog = searchedLog != null;
+ List<String> rss = zkHelper.getListOfReplicators(this);
+ if (rss == null) {
+ LOG.debug("Didn't find any region server that replicates, deleting: " +
+ searchedLog);
+ return false;
+ }
+ for (String rs: rss) {
+ List<String> listOfPeers = zkHelper.getListPeersForRS(rs, this);
+ // if rs just died, this will be null
+ if (listOfPeers == null) {
+ continue;
+ }
+ for (String id : listOfPeers) {
+ List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id, this);
+ if (peersHlogs != null) {
+ this.hlogs.addAll(peersHlogs);
+ }
+ // early exit if we found the log
+ if(lookForLog && this.hlogs.contains(searchedLog)) {
+ LOG.debug("Found log in ZK, keeping: " + searchedLog);
+ return true;
+ }
+ }
+ }
+ LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
+ return false;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ this.ttlCleaner = new TimeToLiveLogCleaner();
+ this.ttlCleaner.setConf(conf);
+ try {
+ this.zkHelper = new ReplicationZookeeperWrapper(
+ ZooKeeperWrapper.createInstance(this.conf,
+ HMaster.class.getName()),
+ this.conf, new AtomicBoolean(true), null);
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ refreshHLogsAndSearch(null);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void process(WatchedEvent watchedEvent) {}
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html Thu Jul 1 00:25:50 2010
@@ -0,0 +1,128 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
+<html>
+
+<!--
+ Copyright 2010 The Apache Software Foundation
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<head />
+<body bgcolor="white">
+<h1>Multi Cluster Replication</h1>
+This package provides replication between HBase clusters.
+<p>
+
+<h2>Table Of Contents</h2>
+<ol>
+ <li><a href="#status">Status</a></li>
+ <li><a href="#requirements">Requirements</a></li>
+ <li><a href="#deployment">Deployment</a></li>
+</ol>
+
+<p>
+<a name="status">
+<h2>Status</h2>
+</a>
+<p>
+This package is alpha quality software and is only meant to be a base
+for future developments. The current implementation offers the following
+features:
+
+<ol>
+ <li>Master/Slave replication limited to 1 slave cluster. </li>
+ <li>Replication of scoped families in user tables.</li>
+ <li>Start/stop replication stream.</li>
+ <li>Supports clusters of different sizes.</li>
+ <li>Handling of partitions longer than 10 minutes</li>
+</ol>
+Please report bugs on the project's Jira when found.
+<p>
+<a name="requirements">
+<h2>Requirements</h2>
+</a>
+<p>
+
+Before trying out replication, make sure to review the following requirements:
+
+<ol>
+ <li>Zookeeper should be handled by yourself, not by HBase, and should
+ always be available during the deployment.</li>
+ <li>All machines from both clusters should be able to reach every
+ other machine since replication goes from any region server to any
+ other one on the slave cluster. That also includes the
+ Zookeeper clusters.</li>
+ <li>Both clusters should have the same HBase and Hadoop major revision.
+ For example, having 0.21.1 on the master and 0.21.0 on the slave is
+ correct but not 0.21.1 and 0.22.0.</li>
+ <li>Every table that contains families that are scoped for replication
+ should exist on every cluster with the exact same name, same for those
+ replicated families.</li>
+</ol>
+
+<p>
+<a name="deployment">
+<h2>Deployment</h2>
+</a>
+<p>
+
+The following steps describe how to enable replication from a cluster
+to another. This must be done with both clusters offlined.
+<ol>
+ <li>Edit ${HBASE_HOME}/conf/hbase-site.xml on both cluster to add
+ the following configurations:
+ <pre>
+<property>
+ <name>hbase.replication.enabled</name>
+ <value>true</value>
+</property></pre>
+ </li>
+ <li>Run the following command on any cluster:
+ <pre>
+$HBASE_HOME/bin/hbase org.jruby.Main $HBASE_HOME/replication/bin/add_peer.tb</pre>
+ This will show you the help to setup the replication stream between
+ both clusters. If both clusters use the same Zookeeper cluster, you have
+ to use a different <b>zookeeper.znode.parent</b> since they can't
+ write in the same folder.
+ </li>
+ <li>You can now start and stop the clusters with your preferred method.</li>
+</ol>
+
+You can confirm that your setup works by looking at any region server's log
+on the master cluster and look for the following lines;
+
+<pre>
+Considering 1 rs, with ratio 0.1
+Getting 1 rs from peer cluster # 0
+Choosing peer 10.10.1.49:62020</pre>
+
+In this case it indicates that 1 region server from the slave cluster
+was chosen for replication.<br><br>
+
+Should you want to stop the replication while the clusters are running, open
+the shell on the master cluster and issue this command:
+<pre>
+hbase(main):001:0> zk 'set /zookeeper.znode.parent/replication/state false'</pre>
+
+Where you replace the znode parent with the one configured on your master
+cluster. Replication of already queued edits will still happen after you
+issued that command but new entries won't be. To start it back, simply replace
+"false" with "true" in the command.
+
+<p>
+
+</body>
+</html>
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Thu Jul 1 00:25:50 2010
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.LogEntryVisitor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+
+import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Replication serves as an umbrella over the setup of replication and
+ * is used by HRS.
+ */
+public class Replication implements LogEntryVisitor {
+
+ private final boolean replication;
+ private final ReplicationSourceManager replicationManager;
+ private boolean replicationMaster;
+ private final AtomicBoolean replicating = new AtomicBoolean(true);
+ private final ReplicationZookeeperWrapper zkHelper;
+ private final Configuration conf;
+ private final AtomicBoolean stopRequested;
+ private ReplicationSink replicationSink;
+
+ /**
+ * Instantiate the replication management (if rep is enabled).
+ * @param conf conf to use
+ * @param hsi the info if this region server
+ * @param fs handle to the filesystem
+ * @param oldLogDir directory where logs are archived
+ * @param stopRequested boolean that tells us if we are shutting down
+ * @throws IOException
+ */
+ public Replication(Configuration conf, HServerInfo hsi,
+ FileSystem fs, Path oldLogDir,
+ AtomicBoolean stopRequested) throws IOException {
+ this.conf = conf;
+ this.stopRequested = stopRequested;
+ this.replication =
+ conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
+ if (replication) {
+ this.zkHelper = new ReplicationZookeeperWrapper(
+ ZooKeeperWrapper.getInstance(conf, hsi.getServerName()), conf,
+ this.replicating, hsi.getServerName());
+ this.replicationMaster = zkHelper.isReplicationMaster();
+ this.replicationManager = this.replicationMaster ?
+ new ReplicationSourceManager(zkHelper, conf, stopRequested,
+ fs, this.replicating, oldLogDir) : null;
+ } else {
+ replicationManager = null;
+ zkHelper = null;
+ }
+ }
+
+ /**
+ * Join with the replication threads
+ */
+ public void join() {
+ if (this.replication) {
+ if (this.replicationMaster) {
+ this.replicationManager.join();
+ }
+ this.zkHelper.deleteOwnRSZNode();
+ }
+ }
+
+ /**
+ * Carry on the list of log entries down to the sink
+ * @param entries list of entries to replicate
+ * @throws IOException
+ */
+ public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+ if (this.replication && !this.replicationMaster) {
+ this.replicationSink.replicateEntries(entries);
+ }
+ }
+
+ /**
+ * If replication is enabled and this cluster is a master,
+ * it starts
+ * @throws IOException
+ */
+ public void startReplicationServices() throws IOException {
+ if (this.replication) {
+ if (this.replicationMaster) {
+ this.replicationManager.init();
+ } else {
+ this.replicationSink =
+ new ReplicationSink(this.conf, this.stopRequested);
+ }
+ }
+ }
+
+ /**
+ * Get the replication sources manager
+ * @return the manager if replication is enabled, else returns false
+ */
+ public ReplicationSourceManager getReplicationManager() {
+ return replicationManager;
+ }
+
+ @Override
+ public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+ WALEdit logEdit) {
+ NavigableMap<byte[], Integer> scopes =
+ new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ byte[] family;
+ for (KeyValue kv : logEdit.getKeyValues()) {
+ family = kv.getFamily();
+ int scope = info.getTableDesc().getFamily(family).getScope();
+ if (scope != HConstants.REPLICATION_SCOPE_LOCAL &&
+ !scopes.containsKey(family)) {
+ scopes.put(family, scope);
+ }
+ }
+ if (!scopes.isEmpty()) {
+ logEdit.setScopes(scopes);
+ }
+ }
+
+ /**
+ * Add this class as a log entry visitor for HLog if replication is enabled
+ * @param hlog log that was add ourselves on
+ */
+ public void addLogEntryVisitor(HLog hlog) {
+ if (replication) {
+ hlog.addLogEntryVisitor(this);
+ }
+ }
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=959479&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Thu Jul 1 00:25:50 2010
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This class is responsible for replicating the edits coming
+ * from another cluster.
+ * <p/>
+ * This replication process is currently waiting for the edits to be applied
+ * before the method can return. This means that the replication of edits
+ * is synchronized (after reading from HLogs in ReplicationSource) and that a
+ * single region server cannot receive edits from two sources at the same time
+ * <p/>
+ * This class uses the native HBase client in order to replicate entries.
+ * <p/>
+ *
+ * TODO make this class more like ReplicationSource wrt log handling
+ */
+public class ReplicationSink {
+
+ private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
+ // Name of the HDFS directory that contains the temporary rep logs
+ public static final String REPLICATION_LOG_DIR = ".replogs";
+ private final Configuration conf;
+ // Pool used to replicated
+ private final HTablePool pool;
+ // boolean coming from HRS to know when the process stops
+ private final AtomicBoolean stop;
+
+ /**
+ * Create a sink for replication
+ *
+ * @param conf conf object
+ * @param stopper boolean to tell this thread to stop
+ * @throws IOException thrown when HDFS goes bad or bad file name
+ */
+ public ReplicationSink(Configuration conf, AtomicBoolean stopper)
+ throws IOException {
+ this.conf = conf;
+ this.pool = new HTablePool(this.conf,
+ conf.getInt("replication.sink.htablepool.capacity", 10));
+ this.stop = stopper;
+ }
+
+ /**
+ * Replicate this array of entries directly into the local cluster
+ * using the native client.
+ *
+ * @param entries
+ * @throws IOException
+ */
+ public synchronized void replicateEntries(HLog.Entry[] entries)
+ throws IOException {
+ // Very simple optimization where we batch sequences of rows going
+ // to the same table.
+ try {
+ long totalReplicated = 0;
+ byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
+ List<Put> puts = new ArrayList<Put>();
+ for (HLog.Entry entry : entries) {
+ WALEdit edit = entry.getEdit();
+ List<KeyValue> kvs = edit.getKeyValues();
+ if (kvs.get(0).isDelete()) {
+ Delete delete = new Delete(kvs.get(0).getRow(),
+ kvs.get(0).getTimestamp(), null);
+ for (KeyValue kv : kvs) {
+ if (kv.isDeleteFamily()) {
+ delete.deleteFamily(kv.getFamily());
+ } else if (!kv.isEmptyColumn()) {
+ delete.deleteColumn(kv.getFamily(),
+ kv.getQualifier());
+ }
+ }
+ delete(entry.getKey().getTablename(), delete);
+ } else {
+ // Switching table, flush
+ if (!Bytes.equals(lastTable, entry.getKey().getTablename())) {
+ put(lastTable, puts);
+ }
+ // With mini-batching, we need to expect multiple rows per edit
+ byte[] lastKey = kvs.get(0).getRow();
+ Put put = new Put(kvs.get(0).getRow(),
+ kvs.get(0).getTimestamp());
+ for (KeyValue kv : kvs) {
+ if (!Bytes.equals(lastKey, kv.getRow())) {
+ puts.add(put);
+ put = new Put(kv.getRow(), kv.getTimestamp());
+ }
+ put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
+ lastKey = kv.getRow();
+ }
+ puts.add(put);
+ lastTable = entry.getKey().getTablename();
+ }
+ totalReplicated++;
+ }
+ put(lastTable, puts);
+ LOG.info("Total replicated: " + totalReplicated);
+ } catch (IOException ex) {
+ if (ex.getCause() instanceof TableNotFoundException) {
+ LOG.warn("Losing edits because: ", ex);
+ } else {
+ // Should we log rejected edits in a file for replay?
+ LOG.error("Unable to accept edit because", ex);
+ this.stop.set(true);
+ throw ex;
+ }
+ } catch (RuntimeException re) {
+ if (re.getCause() instanceof TableNotFoundException) {
+ LOG.warn("Losing edits because: ", re);
+ } else {
+ this.stop.set(true);
+ throw re;
+ }
+ }
+ }
+
+ /**
+ * Do the puts and handle the pool
+ * @param tableName table to insert into
+ * @param puts list of puts
+ * @throws IOException
+ */
+ private void put(byte[] tableName, List<Put> puts) throws IOException {
+ if (puts.isEmpty()) {
+ return;
+ }
+ HTableInterface table = null;
+ try {
+ table = this.pool.getTable(tableName);
+ table.put(puts);
+ this.pool.putTable(table);
+ puts.clear();
+ } finally {
+ if (table != null) {
+ this.pool.putTable(table);
+ }
+ }
+ }
+
+ /**
+ * Do the delete and handle the pool
+ * @param tableName table to delete in
+ * @param delete the delete to use
+ * @throws IOException
+ */
+ private void delete(byte[] tableName, Delete delete) throws IOException {
+ HTableInterface table = null;
+ try {
+ table = this.pool.getTable(tableName);
+ table.delete(delete);
+ this.pool.putTable(table);
+ } finally {
+ if (table != null) {
+ this.pool.putTable(table);
+ }
+ }
+ }
+}