You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/08/09 01:00:22 UTC

svn commit: r1512090 - in /hbase/branches/0.95/hbase-server/src: main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java

Author: larsh
Date: Thu Aug  8 23:00:22 2013
New Revision: 1512090

URL: http://svn.apache.org/r1512090
Log:
HBASE-9158 Serious bug in cyclic replication

Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1512090&r1=1512089&r2=1512090&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Thu Aug  8 23:00:22 2013
@@ -20,14 +20,13 @@ package org.apache.hadoop.hbase.replicat
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -45,14 +44,12 @@ import org.apache.hadoop.hbase.Stoppable
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.util.Threads;
 
 /**
  * This class is responsible for replicating the edits coming
@@ -75,7 +72,6 @@ public class ReplicationSink {
   // Name of the HDFS directory that contains the temporary rep logs
   public static final String REPLICATION_LOG_DIR = ".replogs";
   private final Configuration conf;
-  private final ExecutorService sharedThreadPool;
   private final HConnection sharedHtableCon;
   private final MetricsSink metrics;
   private final AtomicLong totalReplicatedEdits = new AtomicLong();
@@ -93,11 +89,6 @@ public class ReplicationSink {
     decorateConf();
     this.metrics = new MetricsSink();
     this.sharedHtableCon = HConnectionManager.createConnection(this.conf);
-    this.sharedThreadPool = new ThreadPoolExecutor(1,
-        conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE),
-        conf.getLong("hbase.htable.threads.keepalivetime", 60), TimeUnit.SECONDS,
-        new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("hbase-repl"));
-    ((ThreadPoolExecutor) this.sharedThreadPool).allowCoreThreadTimeOut(true);
   }
 
   /**
@@ -125,10 +116,9 @@ public class ReplicationSink {
     // to the same table.
     try {
       long totalReplicated = 0;
-      // Map of table => list of Rows, we only want to flushCommits once per
-      // invocation of this method per table.
-      Map<TableName, List<Row>> rows =
-          new TreeMap<TableName, List<Row>>();
+      // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
+      // invocation of this method per table and cluster id.
+      Map<TableName, Map<UUID,List<Row>>> rowMap = new TreeMap<TableName, Map<UUID,List<Row>>>();
       for (WALEntry entry : entries) {
         TableName table =
             TableName.valueOf(entry.getKey().getTableName().toByteArray());
@@ -148,7 +138,7 @@ public class ReplicationSink {
               new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
               new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
             m.setClusterId(uuid);
-            addToMultiMap(rows, table, m);
+            addToHashMultiMap(rowMap, table, uuid, m);
           }
           if (CellUtil.isDelete(cell)) {
             ((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
@@ -159,8 +149,8 @@ public class ReplicationSink {
         }
         totalReplicated++;
       }
-      for (Entry<TableName, List<Row>> entry : rows.entrySet()) {
-        batch(entry.getKey(), entry.getValue());
+      for (Entry<TableName, Map<UUID,List<Row>>> entry : rowMap.entrySet()) {
+        batch(entry.getKey(), entry.getValue().values());
       }
       int size = entries.size();
       this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
@@ -190,15 +180,21 @@ public class ReplicationSink {
    * Simple helper to a map from key to (a list of) values
    * TODO: Make a general utility method
    * @param map
-   * @param key
+   * @param key1
+   * @param key2
    * @param value
    * @return
    */
-  private <K, V> List<V> addToMultiMap(Map<K, List<V>> map, K key, V value) {
-    List<V> values = map.get(key);
+  private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
+    Map<K2,List<V>> innerMap = map.get(key1);
+    if (innerMap == null) {
+      innerMap = new HashMap<K2, List<V>>();
+      map.put(key1, innerMap);
+    }
+    List<V> values = innerMap.get(key2);
     if (values == null) {
       values = new ArrayList<V>();
-      map.put(key, values);
+      innerMap.put(key2, values);
     }
     values.add(value);
     return values;
@@ -209,15 +205,6 @@ public class ReplicationSink {
    */
   public void stopReplicationSinkServices() {
     try {
-      this.sharedThreadPool.shutdown();
-      if (!this.sharedThreadPool.awaitTermination(60000, TimeUnit.MILLISECONDS)) {
-        this.sharedThreadPool.shutdownNow();
-      }
-    }  catch (InterruptedException e) {
-      LOG.warn("Interrupted while closing the table pool", e); // ignoring it as we are closing.
-      Thread.currentThread().interrupt();
-    }
-    try {
       this.sharedHtableCon.close();
     } catch (IOException e) {
       LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
@@ -231,14 +218,16 @@ public class ReplicationSink {
    * @param rows list of actions
    * @throws IOException
    */
-  private void batch(TableName tableName, List<Row> rows) throws IOException {
-    if (rows.isEmpty()) {
+  private void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
+    if (allRows.isEmpty()) {
       return;
     }
     HTableInterface table = null;
     try {
-      table = new HTable(tableName, this.sharedHtableCon, this.sharedThreadPool);
-      table.batch(rows);
+      table = this.sharedHtableCon.getTable(tableName);
+      for (List<Row> rows : allRows) {
+        table.batch(rows);
+      }
     } catch (InterruptedException ix) {
       throw new IOException(ix);
     } finally {

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java?rev=1512090&r1=1512089&r2=1512090&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java Thu Aug  8 23:00:22 2013
@@ -73,6 +73,8 @@ public class TestMasterReplication {
   private static final byte[] row = Bytes.toBytes("row");
   private static final byte[] row1 = Bytes.toBytes("row1");
   private static final byte[] row2 = Bytes.toBytes("row2");
+  private static final byte[] row3 = Bytes.toBytes("row3");
+  private static final byte[] row4 = Bytes.toBytes("row4");
   private static final byte[] noRepfamName = Bytes.toBytes("norep");
 
   private static final byte[] count = Bytes.toBytes("count");
@@ -178,6 +180,21 @@ public class TestMasterReplication {
     assertEquals("Deletes were replicated back ", 3, getCount(htable1, delete));
     assertEquals("Deletes were replicated back ", 3, getCount(htable2, delete));
     assertEquals("Deletes were replicated back ", 3, getCount(htable3, delete));
+
+    // Test HBASE-9158
+    admin2.disablePeer("1");
+    // we now have an edit that was replicated into cluster originating from cluster 1
+    putAndWait(row3, famName, htable1, htable2);
+    // now add a local edit to cluster 2
+    Put put = new Put(row4);
+    put.add(famName, row4, row4);
+    htable2.put(put);
+    // reenable replication from cluster 2 to cluster 3
+    admin2.enablePeer("1");
+    // without HBASE-9158 the edit for row4 would have been marked with cluster 1's id
+    // and hence not replicated to cluster 1
+    wait(row4, htable1);
+    
     utility3.shutdownMiniCluster();
     utility2.shutdownMiniCluster();
     utility1.shutdownMiniCluster();
@@ -271,6 +288,10 @@ public class TestMasterReplication {
     put.add(fam, row, row);
     source.put(put);
 
+    wait(row, target);
+  }
+
+  private void wait(byte[] row, HTable target) throws Exception {
     Get get = new Get(row);
     for (int i = 0; i < NB_RETRIES; i++) {
       if (i==NB_RETRIES-1) {
@@ -284,7 +305,7 @@ public class TestMasterReplication {
         assertArrayEquals(res.value(), row);
         break;
       }
-    }
+    }    
   }
 
   /**