You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/18 05:28:59 UTC

[03/50] [abbrv] hbase git commit: HBASE-19792 TestReplicationSmallTests.testDisableEnable fails

http://git-wip-us.apache.org/repos/asf/hbase/blob/48bb1901/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
new file mode 100644
index 0000000..cb47827
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.client.replication.TableCFs;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationSmallTests extends TestReplicationBase {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSmallTests.class);
+  private static final String PEER_ID = "2";
+
+  @Before
+  public void setUp() throws Exception {
+    cleanUp();
+  }
+
+  /**
+   * Verify that version and column delete marker types are replicated correctly.
+   */
+  @Test
+  public void testDeleteTypes() throws Exception {
+    LOG.info("testDeleteTypes");
+    final byte[] v1 = Bytes.toBytes("v1");
+    final byte[] v2 = Bytes.toBytes("v2");
+    final byte[] v3 = Bytes.toBytes("v3");
+    htable1 = utility1.getConnection().getTable(tableName);
+
+    long t = EnvironmentEdgeManager.currentTime();
+    // create three versions for "row"
+    Put put = new Put(row);
+    put.addColumn(famName, row, t, v1);
+    htable1.put(put);
+
+    put = new Put(row);
+    put.addColumn(famName, row, t + 1, v2);
+    htable1.put(put);
+
+    put = new Put(row);
+    put.addColumn(famName, row, t + 2, v3);
+    htable1.put(put);
+
+    Get get = new Get(row);
+    get.readAllVersions();
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES - 1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() < 3) {
+        LOG.info("Rows not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
+        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
+        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
+        break;
+      }
+    }
+    // place a version delete marker (delete last version)
+    Delete d = new Delete(row);
+    d.addColumn(famName, row, t);
+    htable1.delete(d);
+
+    get = new Get(row);
+    get.readAllVersions();
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES - 1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() > 2) {
+        LOG.info("Version not deleted");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
+        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
+        break;
+      }
+    }
+
+    // place a column delete marker
+    d = new Delete(row);
+    d.addColumns(famName, row, t + 2);
+    htable1.delete(d);
+
+    // now *both* of the remaining version should be deleted
+    // at the replica
+    get = new Get(row);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES - 1) {
+        fail("Waited too much time for del replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        LOG.info("Rows not deleted");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Add a row, check it's replicated, delete it, check's gone
+   */
+  @Test
+  public void testSimplePutDelete() throws Exception {
+    LOG.info("testSimplePutDelete");
+    runSimplePutDeleteTest();
+  }
+
+  /**
+   * Try a small batch upload using the write buffer, check it's replicated
+   */
+  @Test
+  public void testSmallBatch() throws Exception {
+    LOG.info("testSmallBatch");
+    runSmallBatchTest();
+  }
+
+  /**
+   * Test disable/enable replication, trying to insert, make sure nothing's replicated, enable it,
+   * the insert should be replicated
+   */
+  @Test
+  public void testDisableEnable() throws Exception {
+    // Test disabling replication
+    hbaseAdmin.disableReplicationPeer(PEER_ID);
+
+    byte[] rowkey = Bytes.toBytes("disable enable");
+    Put put = new Put(rowkey);
+    put.addColumn(famName, row, row);
+    htable1.put(put);
+
+    Get get = new Get(rowkey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        fail("Replication wasn't disabled");
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+
+    // Test enable replication
+    hbaseAdmin.enableReplicationPeer(PEER_ID);
+
+    for (int i = 0; i < NB_RETRIES; i++) {
+      Result res = htable2.get(get);
+      if (res.isEmpty()) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(res.value(), row);
+        return;
+      }
+    }
+    fail("Waited too much time for put replication");
+  }
+
+  /**
+   * Integration test for TestReplicationAdmin, removes and re-add a peer cluster
+   */
+  @Test
+  public void testAddAndRemoveClusters() throws Exception {
+    LOG.info("testAddAndRemoveClusters");
+    hbaseAdmin.removeReplicationPeer(PEER_ID);
+    Thread.sleep(SLEEP_TIME);
+    byte[] rowKey = Bytes.toBytes("Won't be replicated");
+    Put put = new Put(rowKey);
+    put.addColumn(famName, row, row);
+    htable1.put(put);
+
+    Get get = new Get(rowKey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES - 1) {
+        break;
+      }
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        fail("Not supposed to be replicated");
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+    ReplicationPeerConfig rpc =
+        ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build();
+    hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
+    Thread.sleep(SLEEP_TIME);
+    rowKey = Bytes.toBytes("do rep");
+    put = new Put(rowKey);
+    put.addColumn(famName, row, row);
+    LOG.info("Adding new row");
+    htable1.put(put);
+
+    get = new Get(rowKey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES - 1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.isEmpty()) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME * i);
+      } else {
+        assertArrayEquals(res.value(), row);
+        break;
+      }
+    }
+  }
+
+  /**
+   * Do a more intense version testSmallBatch, one that will trigger wal rolling and other
+   * non-trivial code paths
+   */
+  @Test
+  public void testLoading() throws Exception {
+    LOG.info("Writing out rows to table1 in testLoading");
+    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BIG_BATCH);
+    for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
+      Put put = new Put(Bytes.toBytes(i));
+      put.addColumn(famName, row, row);
+      puts.add(put);
+    }
+    // The puts will be iterated through and flushed only when the buffer
+    // size is reached.
+    htable1.put(puts);
+
+    Scan scan = new Scan();
+
+    ResultScanner scanner = htable1.getScanner(scan);
+    Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
+    scanner.close();
+
+    assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
+
+    LOG.info("Looking in table2 for replicated rows in testLoading");
+    long start = System.currentTimeMillis();
+    // Retry more than NB_RETRIES. As it was, retries were done in 5 seconds and we'd fail
+    // sometimes.
+    final long retries = NB_RETRIES * 10;
+    for (int i = 0; i < retries; i++) {
+      scan = new Scan();
+      scanner = htable2.getScanner(scan);
+      res = scanner.next(NB_ROWS_IN_BIG_BATCH);
+      scanner.close();
+      if (res.length != NB_ROWS_IN_BIG_BATCH) {
+        if (i == retries - 1) {
+          int lastRow = -1;
+          for (Result result : res) {
+            int currentRow = Bytes.toInt(result.getRow());
+            for (int row = lastRow + 1; row < currentRow; row++) {
+              LOG.error("Row missing: " + row);
+            }
+            lastRow = currentRow;
+          }
+          LOG.error("Last row: " + lastRow);
+          fail("Waited too much time for normal batch replication, " + res.length + " instead of " +
+            NB_ROWS_IN_BIG_BATCH + "; waited=" + (System.currentTimeMillis() - start) + "ms");
+        } else {
+          LOG.info("Only got " + res.length + " rows... retrying");
+          Thread.sleep(SLEEP_TIME);
+        }
+      } else {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the
+   * compaction WALEdit.
+   */
+  @Test
+  public void testCompactionWALEdits() throws Exception {
+    WALProtos.CompactionDescriptor compactionDescriptor =
+        WALProtos.CompactionDescriptor.getDefaultInstance();
+    RegionInfo hri = RegionInfoBuilder.newBuilder(htable1.getName())
+        .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.EMPTY_END_ROW).build();
+    WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
+    Replication.scopeWALEdits(new WALKeyImpl(), edit, htable1.getConfiguration(), null);
+  }
+
+  /**
+   * Test for HBASE-8663
+   * <p>
+   * Create two new Tables with colfamilies enabled for replication then run
+   * ReplicationAdmin.listReplicated(). Finally verify the table:colfamilies. Note:
+   * TestReplicationAdmin is a better place for this testing but it would need mocks.
+   */
+  @Test
+  public void testVerifyListReplicatedTable() throws Exception {
+    LOG.info("testVerifyListReplicatedTable");
+
+    final String tName = "VerifyListReplicated_";
+    final String colFam = "cf1";
+    final int numOfTables = 3;
+
+    Admin hadmin = utility1.getAdmin();
+
+    // Create Tables
+    for (int i = 0; i < numOfTables; i++) {
+      hadmin.createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(tName + i))
+          .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(colFam))
+              .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+          .build());
+    }
+
+    // verify the result
+    List<TableCFs> replicationColFams = hbaseAdmin.listReplicatedTableCFs();
+    int[] match = new int[numOfTables]; // array of 3 with init value of zero
+
+    for (int i = 0; i < replicationColFams.size(); i++) {
+      TableCFs replicationEntry = replicationColFams.get(i);
+      String tn = replicationEntry.getTable().getNameAsString();
+      if (tn.startsWith(tName) && replicationEntry.getColumnFamilyMap().containsKey(colFam)) {
+        int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit
+        match[m]++; // should only increase once
+      }
+    }
+
+    // check the matching result
+    for (int i = 0; i < match.length; i++) {
+      assertTrue("listReplicated() does not match table " + i, (match[i] == 1));
+    }
+
+    // drop tables
+    for (int i = 0; i < numOfTables; i++) {
+      TableName tableName = TableName.valueOf(tName + i);
+      hadmin.disableTable(tableName);
+      hadmin.deleteTable(tableName);
+    }
+
+    hadmin.close();
+  }
+
+  /**
+   * Test for HBase-15259 WALEdits under replay will also be replicated
+   */
+  @Test
+  public void testReplicationInReplay() throws Exception {
+    final TableName tableName = htable1.getName();
+
+    HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0);
+    RegionInfo hri = region.getRegionInfo();
+    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) {
+      scopes.put(fam, 1);
+    }
+    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    int index = utility1.getMiniHBaseCluster().getServerWith(hri.getRegionName());
+    WAL wal = utility1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo());
+    final byte[] rowName = Bytes.toBytes("testReplicationInReplay");
+    final byte[] qualifier = Bytes.toBytes("q");
+    final byte[] value = Bytes.toBytes("v");
+    WALEdit edit = new WALEdit(true);
+    long now = EnvironmentEdgeManager.currentTime();
+    edit.add(new KeyValue(rowName, famName, qualifier, now, value));
+    WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes);
+    wal.append(hri, walKey, edit, true);
+    wal.sync();
+
+    Get get = new Get(rowName);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES - 1) {
+        break;
+      }
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        fail("Not supposed to be replicated for " + Bytes.toString(res.getRow()));
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+  }
+}