You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2010/07/15 00:11:47 UTC

svn commit: r964220 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/replication/ src/main/javadoc/org/apache/hadoop/hbase/replication/ src/site/ src/site/resources/images/ src/site/xdoc/

Author: jdcryans
Date: Wed Jul 14 22:11:47 2010
New Revision: 964220

URL: http://svn.apache.org/viewvc?rev=964220&view=rev
Log:
HBASE-2808  Document the implementation of replication

Added:
    hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/
    hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
      - copied, changed from r963805, hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html
    hbase/trunk/src/site/resources/images/replication_overview.png   (with props)
    hbase/trunk/src/site/xdoc/replication.xml
Removed:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/site/site.xml

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=964220&r1=964219&r2=964220&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Jul 14 22:11:47 2010
@@ -765,6 +765,7 @@ Release 0.21.0 - Unreleased
    HBASE-2265  HFile and Memstore should maintain minimum and maximum timestamps
                (Pranav via Ryan)
    HBASE-2836  Speed mvn site building by removing generation of useless reports
+   HBASE-2808  Document the implementation of replication
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Copied: hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html (from r963805, hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html)
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html?p2=hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html&p1=hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html&r1=963805&r2=964220&rev=964220&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/package.html (original)
+++ hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html Wed Jul 14 22:11:47 2010
@@ -38,7 +38,7 @@ This package provides replication betwee
 <h2>Status</h2>
 </a>
 <p>
-This package is alpha quality software and is only meant to be a base
+This package is experimental quality software and is only meant to be a base
 for future developments. The current implementation offers the following
 features:
 
@@ -66,8 +66,8 @@ Before trying out replication, make sure
     other one on the slave cluster. That also includes the
     Zookeeper clusters.</li>
     <li>Both clusters should have the same HBase and Hadoop major revision.
-    For example, having 0.21.1 on the master and 0.21.0 on the slave is
-    correct but not 0.21.1 and 0.22.0.</li>
+    For example, having 0.90.1 on the master and 0.90.0 on the slave is
+    correct but not 0.90.1 and 0.89.20100725.</li>
     <li>Every table that contains families that are scoped for replication
     should exist on every cluster with the exact same name, same for those
     replicated families.</li>
@@ -86,13 +86,13 @@ to another. This must be done with both 
     the following configurations:
         <pre>
 &lt;property&gt;
-  &lt;name&gt;hbase.replication.enabled&lt;/name&gt;
+  &lt;name&gt;hbase.replication&lt;/name&gt;
   &lt;value&gt;true&lt;/value&gt;
 &lt;/property&gt;</pre>
     </li>
     <li>Run the following command on any cluster:
     <pre>
-$HBASE_HOME/bin/hbase org.jruby.Main $HBASE_HOME/replication/bin/add_peer.tb</pre>
+$HBASE_HOME/bin/hbase org.jruby.Main $HBASE_HOME/bin/replication/add_peer.tb</pre>
     This will show you the help to setup the replication stream between
     both clusters. If both clusters use the same Zookeeper cluster, you have
     to use a different <b>zookeeper.znode.parent</b> since they can't

Added: hbase/trunk/src/site/resources/images/replication_overview.png
URL: http://svn.apache.org/viewvc/hbase/trunk/src/site/resources/images/replication_overview.png?rev=964220&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hbase/trunk/src/site/resources/images/replication_overview.png
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: hbase/trunk/src/site/site.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/site/site.xml?rev=964220&r1=964219&r2=964220&view=diff
==============================================================================
--- hbase/trunk/src/site/site.xml (original)
+++ hbase/trunk/src/site/site.xml Wed Jul 14 22:11:47 2010
@@ -35,6 +35,7 @@
       <item name="Bulk Loads" href="bulk-loads.html" />
       <item name="Metrics"      href="metrics.html" />
       <item name="HBase on Windows"      href="cygwin.html" />
+      <item name="Cluster replication"      href="replication.html" />
     </menu>
   </body>
     <skin>

Added: hbase/trunk/src/site/xdoc/replication.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/site/xdoc/replication.xml?rev=964220&view=auto
==============================================================================
--- hbase/trunk/src/site/xdoc/replication.xml (added)
+++ hbase/trunk/src/site/xdoc/replication.xml Wed Jul 14 22:11:47 2010
@@ -0,0 +1,429 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Copyright 2010 The Apache Software Foundation
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN"
+          "http://forrest.apache.org/dtd/document-v20.dtd">
+
+<document xmlns="http://maven.apache.org/XDOC/2.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd">
+  <properties>
+    <title>
+      HBase Replication
+    </title>
+  </properties>
+  <body>
+    <section name="Overview">
+      <p>
+        HBase replication is a way to copy data between HBase deployments. It
+        can serve as a disaster recovery solution and can contribute to provide
+        higher availability at HBase layer. It can also serve more practically;
+        for example, as a way to easily copy edits from a web-facing cluster to a "MapReduce"
+        cluster which will process old and new data and ship back the results
+        automatically.
+      </p>
+      <p>
+        The basic architecture pattern used for HBase replication is (HBase cluster) master-push;
+        it is much easier to keep track of what’s currently being replicated since
+        each region server has its own write-ahead-log (aka WAL or HLog), just like
+        other well known solutions like MySQL master/slave replication where
+        there’s only one bin log to keep track of. One master cluster can
+        replicate to any number of slave clusters, and each region server will
+        participate to replicate their own stream of edits.
+      </p>
+      <p>
+        The replication is done asynchronously, meaning that the clusters can
+        be geographically distant, the links between them can be offline for
+        some time, and rows inserted on the master cluster won’t be
+        available at the same time on the slave clusters (eventual consistency).
+      </p>
+      <p>
+        The replication format used in this design is conceptually the same as
+        <a href="http://dev.mysql.com/doc/refman/5.1/en/replication-formats.html">
+        MySQL’s statement-based replication </a>. Instead of SQL statements, whole
+        WALEdits (consisting of multiple cell inserts coming from the clients'
+        Put and Delete) are replicated in order to maintain atomicity.
+      </p>
+      <p>
+        The HLogs from each region server are the basis of HBase replication,
+        and must be kept in HDFS as long as they are needed to replicate data
+        to any slave cluster. Each RS reads from the oldest log it needs to
+        replicate and keeps the current position inside ZooKeeper to simplify
+        failure recovery. That position can be different for every slave 
+        cluster, same for the queue of HLogs to process.
+      </p>
+      <p>
+        The clusters participating in replication can be of asymmetric sizes
+        and the master cluster will do its “best effort” to balance the stream
+        of replication on the slave clusters by relying on randomization.
+      </p>
+      <img src="images/replication_overview.png"/>
+    </section>
+    <section name="Life of a log edit">
+      <p>
+        The following sections describe the life of a single edit going from a
+        client that communicates with a master cluster all the way to a single
+        slave cluster.
+      </p>
+      <section name="Normal processing">
+        <p>
+          The client uses a HBase API that sends a Put, Delete or ICV to a region
+          server. The key values are transformed into a WALEdit by the region
+          server and is inspected by the replication code that, for each family
+          that is scoped for replication, adds the scope to the edit. The edit
+          is appended to the current WAL and is then applied to its MemStore.
+        </p>
+        <p>
+          In a separate thread, the edit is read from the log (as part of a batch)
+          and only the KVs that are replicable are kept (that is, that are part
+          of a family scoped GLOBAL in the family's schema and non-catalog so not
+          .META. or -ROOT-). When the buffer is filled, or the reader hits the
+          end of the file, the buffer is sent to a random region server on the
+          slave cluster.
+        </p>
+        <p>
+          Synchronously, the region server that receives the edits reads them
+          sequentially and applies them on its own cluster using the HBase client
+          (HTables managed by a HTablePool) automatically. If consecutive rows
+          belong to the same table, they are inserted together in order to
+          leverage parallel insertions.
+        </p>
+        <p>
+          Back in the master cluster's region server, the offset for the current
+          WAL that's being replicated is registered in ZooKeeper.
+        </p>
+      </section>
+      <section name="Non-responding slave clusters">
+        <p>
+          The edit is inserted in the same way.
+        </p>
+        <p>
+          In the separate thread, the region server reads, filters and buffers
+          the log edits the same way as during normal processing. The slave
+          region server that's contacted doesn't answer to the RPC, so the master
+          region server will sleep and retry up to a configured number of times.
+          If the slave RS still isn't available, the master cluster RS will select a
+          new subset of RS to replicate to and will retry sending the buffer of
+          edits.
+        </p>
+        <p>
+          In the mean time, the WALs will be rolled and stored in a queue in
+          ZooKeeper. Logs that are archived by their region server (archiving is
+          basically moving a log from the region server's logs directory to a
+          central logs archive directory) will update their paths in the in-memory
+          queue of the replicating thread.
+        </p>
+        <p>
+          When the slave cluster is finally available, the buffer will be applied
+          the same way as during normal processing. The master cluster RS will then
+          replicate the backlog of logs.
+        </p>
+      </section>
+    </section>
+    <section name="Internals">
+      <p>
+        This section describes in depth how each of replication's internal
+        features operate.
+      </p>
+      <section name="Choosing region servers to replicate to">
+        <p>
+          When a master cluster RS initiates a replication source to a slave cluster,
+          it first connects to the slave's ZooKeeper ensemble using the provided
+          cluster key (taht key is composed of the value of hbase.zookeeper.quorum,
+          zookeeper.znode.parent and hbase.zookeeper.property.clientPort). It
+          then scans the "rs" directory to discover all the available sinks
+          (region servers that are accepting incoming streams of edits to replicate)
+          and will randomly choose a subset of them using a configured
+          ratio (which has a default value of 10%). For example, if a slave
+          cluster has 150 machines, 15 will be chosen as potential recipient for
+          edits that this master cluster RS will be sending. Since this is done by all
+          master cluster RSs, the probability that all slave RSs are used is very high,
+          and this method works for clusters of any size. For example, a master cluster
+          of 10 machines replicating to a slave cluster of 5 machines with a ratio
+          of 10% means that the master cluster RSs will choose one machine each
+          at random, thus the chance of overlapping and full usage of the slave
+          cluster is higher.
+        </p>
+      </section>
+      <section name="Keeping track of logs">
+        <p>
+          Every master cluster RS has its own znode in the replication znodes hierarchy.
+          It contains one znode per peer cluster (if 5 slave clusters, 5 znodes
+          are created), and each of these contain a queue
+          of HLogs to process. Each of these queues will track the HLogs created
+          by that RS, but they can differ in size. For example, if one slave
+          cluster becomes unavailable for some time then the HLogs cannot be,
+          thus they need to stay in the queue (while the others are processed).
+          See the section named "Region server failover" for an example.
+        </p>
+        <p>
+          When a source is instantiated, it contains the current HLog that the
+          region server is writing to. During log rolling, the new file is added
+          to the queue of each slave cluster's znode just before it's made available.
+          This ensures that all the sources are aware that a new log exists
+          before HLog is able to append edits into it, but this operations is
+          now more expensive.
+          The queue items are discarded when the replication thread cannot read
+          more entries from a file (because it reached the end of the last block)
+          and that there are other files in the queue.
+          This means that if a source is up-to-date and replicates from the log
+          that the region server writes to, reading up to the "end" of the
+          current file won't delete the item in the queue.
+        </p>
+        <p>
+          When a log is archived (because it's not used anymore or because there's
+          too many of them per hbase.regionserver.maxlogs typically because insertion
+          rate is faster than region flushing), it will notify the source threads that the path
+          for that log changed. If the a particular source was already done with
+          it, it will just ignore the message. If it's in the queue, the path
+          will be updated in memory. If the log is currently being replicated,
+          the change will be done atomically so that the reader doesn't try to
+          open the file when it's already moved. Also, moving a file is a NameNode
+          operation so, if the reader is currently reading the log, it won't
+          generate any exception.
+        </p>
+      </section>
+      <section name="Reading, filtering and sending edits">
+        <p>
+          By default, a source will try to read from a log file and ship log
+          entries as fast as possible to a sink. This is first limited by the
+          filtering of log entries; only KeyValues that are scoped GLOBAL and
+          that don't belong to catalog tables will be retained. A second limit
+          is imposed on the total size of the list of edits to replicate per slave,
+          which by default is 64MB. This means that a master cluster RS with 3 slaves
+          will use at most 192MB to store data to replicate. This doesn't account
+          the data filtered that wasn't garbage collected.
+        </p>
+        <p>
+          Once the maximum size of edits was buffered or the reader hits the end
+          of the log file, the source thread will stop reading and will choose
+          at random a sink to replicate to (from the list that was generated by
+          keeping only a subset of slave RSs). It will directly issue a RPC to
+          the chosen machine and will wait for the method to return. If it's
+          successful, the source will determine if the current file is emptied
+          or if it should continue to read from it. If the former, it will delete
+          the znode in the queue. If the latter, it will register the new offset
+          in the log's znode. If the RPC threw an exception, the source will retry
+          10 times until trying to find a different sink.
+        </p>
+      </section>
+      <section name="Applying edits">
+        <p>
+          The sink synchronously applies the edits to its local cluster using
+          the native client API. This method is also synchronized between every
+          incoming sources, which means that only one batch of log entries can be
+          replicated at a time by each slave region server.
+        </p>
+        <p>
+          The sink applies the edits one by one, unless it's able to batch
+          sequential Puts that belong to the same table in order to use the
+          parallel puts feature of HConnectionManager. The Put and Delete objects
+          are recreated by inspecting the incoming WALEdit objects and are
+          with the exact same row, family, qualifier, timestamp, and value (for
+          Put). Note that if the master and slave cluster don't have the same
+          time, time-related issues may occur.
+        </p>
+      </section>
+      <section name="Cleaning logs">
+        <p>
+          If replication isn't enabled, the master's logs cleaning thread will
+          delete old logs using a configured TTL. This doesn't work well with
+          replication since archived logs passed their TTL may still be in a
+          queue. Thus, the default behavior is augmented so that if a log is
+          passed its TTL, the cleaning thread will lookup every queue until it
+          finds the log (while caching the ones it finds). If it's not found,
+          the log will be deleted. The next time it has to look for a log,
+          it will first use its cache.
+        </p>
+      </section>
+      <section name="Region server failover">
+        <p>
+          As long as region servers don't fail, keeping track of the logs in ZK
+          doesn't add any value. Unfortunately, they do fail, so since ZooKeeper
+          is highly available we can count on it and its semantics to help us
+          managing the transfer of the queues.
+        </p>
+        <p>
+          All the master cluster RSs keep a watcher on every other one of them to be
+          notified when one dies (just like the master does). When it happens,
+          they all race to create a znode called "lock" inside the dead RS' znode
+          that contains its queues. The one that creates it successfully will
+          proceed by transferring all the queues to its own znode (one by one
+          since ZK doesn't support the rename operation) and will delete all the
+          old ones when it's done. The recovered queues' znodes will be named
+          with the id of the slave cluster appended with the name of the dead
+          server. 
+        </p>
+        <p>
+          Once that is done, the master cluster RS will create one new source thread per
+          copied queue, and each of them will follow the read/filter/ship pattern.
+          The main difference is that those queues will never have new data since
+          they don't belong to their new region server, which means that when
+          the reader hits the end of the last log, the queue's znode will be
+          deleted and the master cluster RS will close that replication source.
+        </p>
+        <p>
+          For example, consider a master cluster with 3 region servers that's
+          replicating to a single slave with id '2'. The following hierarchy
+          represents what the znodes layout could be at some point in time. We
+          can see the RSs' znodes all contain a "peers" znode that contains a
+          single queue. The znode names in the queues represent the actual file
+          names on HDFS in the form "address,port.timestamp".
+        </p>
+        <pre>
+/hbase/replication/rs/
+                      1.1.1.1,60020,123456780/
+                        peers/
+                              2/
+                                1.1.1.1,60020.1234  (Contains a position)
+                                1.1.1.1,60020.1265
+                      1.1.1.2,60020,123456790/
+                        peers/
+                              2/
+                                1.1.1.2,60020.1214  (Contains a position)
+                                1.1.1.2,60020.1248
+                                1.1.1.2,60020.1312
+                      1.1.1.3,60020,    123456630/
+                        peers/
+                              2/
+                                1.1.1.3,60020.1280  (Contains a position)
+
+        </pre>
+        <p>
+          Now let's say that 1.1.1.2 loses its ZK session. The survivors will race
+          to create a lock, and for some reasons 1.1.1.3 wins. It will then start
+          transferring all the queues to its local peers znode by appending the
+          name of the dead server. Right before 1.1.1.3 is able to clean up the
+          old znodes, the layout will look like the following:
+        </p>
+        <pre>
+/hbase/replication/rs/
+                      1.1.1.1,60020,123456780/
+                        peers/
+                              2/
+                                1.1.1.1,60020.1234  (Contains a position)
+                                1.1.1.1,60020.1265
+                      1.1.1.2,60020,123456790/
+                        lock
+                        peers/
+                              2/
+                                1.1.1.2,60020.1214  (Contains a position)
+                                1.1.1.2,60020.1248
+                                1.1.1.2,60020.1312
+                      1.1.1.3,60020,123456630/
+                        peers/
+                              2/
+                                1.1.1.3,60020.1280  (Contains a position)
+
+                              2-1.1.1.2,60020,123456790/
+                                1.1.1.2,60020.1214  (Contains a position)
+                                1.1.1.2,60020.1248
+                                1.1.1.2,60020.1312
+        </pre>
+        <p>
+          Some time later, but before 1.1.1.3 is able to finish replicating the
+          last HLog from 1.1.1.2, let's say that it dies too (also some new logs
+          were created in the normal queues). The last RS will then try to lock
+          1.1.1.3's znode and will begin transferring all the queues. The new
+          layout will be:
+        </p>
+        <pre>
+/hbase/replication/rs/
+                      1.1.1.1,60020,123456780/
+                        peers/
+                              2/
+                                1.1.1.1,60020.1378  (Contains a position)
+
+                              2-1.1.1.3,60020,123456630/
+                                1.1.1.3,60020.1325  (Contains a position)
+                                1.1.1.3,60020.1401
+
+                              2-1.1.1.2,60020,123456790-1.1.1.3,60020,123456630/
+                                1.1.1.2,60020.1312  (Contains a position)
+                      1.1.1.3,60020,123456630/
+                        lock
+                        peers/
+                              2/
+                                1.1.1.3,60020.1325  (Contains a position)
+                                1.1.1.3,60020.1401
+
+                              2-1.1.1.2,60020,123456790/
+                                1.1.1.2,60020.1312  (Contains a position)
+        </pre>
+      </section>
+    </section>
+    <section name="FAQ">
+      <section name="Why do all clusters need to be in the same timezone?">
+        <p>
+          Suppose an edit to cell X happens in a EST cluster, then 2 minutes
+          later a new edits happens to the same cell in a PST cluster and that
+          both clusters are in a master-master replication. The second edit is
+          considered younger, so the first will always hide it while in fact the
+          second is older.
+        </p>
+      </section>
+      <section name="GLOBAL means replicate? Any provision to replicate only to cluster X and not to cluster Y? or is that for later?">
+        <p>
+          Yes, this is for much later.
+        </p>
+      </section>
+      <section name="You need a bulk edit shipper? Something that allows you transfer 64MB of edits in one go?">
+        <p>
+          You can use the HBase-provided utility called CopyTable from the package
+          org.apache.hadoop.hbase.mapreduce in order to have a discp-like tool to
+          bulk copy data.
+        </p>
+      </section>
+      <section name="Is it a mistake that WALEdit doesn't carry Put and Delete objects, that we have to reinstantiate not only replicating but when replaying edits?">
+        <p>
+          Yes, this behavior would help a lot but it's not currently available
+          in HBase (BatchUpdate had that, but it was lost in the new API).
+        </p>
+      </section>
+    </section>
+    <section name="Known bugs/missing features">
+      <p>
+        Here's a list of all the jiras that relate to major issues or missing
+        features in the replication implementation.
+      </p>
+      <ol>
+        <li>
+            HBASE-2688, master-master replication is disabled in the code, we need
+            to enable and test it.
+        </li>
+        <li>
+            HBASE-2611, basically if a region server dies while recovering the
+            queues of another dead RS, we will miss the data from the queues
+            that weren't copied.
+        </li>
+        <li>
+            HBASE-2196, a master cluster can only support a single slave, some
+            refactoring is needed to support this.
+        </li>
+        <li>
+            HBASE-2195, edits are applied disregard their home cluster, it should
+            carry that data and check it.
+        </li>
+        <li>
+            HBASE-2200, currently all the replication operations (adding or removing
+            streams for example) are done only when the clusters are offline. This
+            should be possible at runtime.
+        </li>
+      </ol>
+    </section>
+  </body>
+</document>
\ No newline at end of file