You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/02/12 03:05:36 UTC
[hbase] branch branch-2.1 updated: HBASE-18484 VerifyRep by
snapshot does not work when Yarn/SourceHBase/PeerHBase located in three
different HDFS clusters
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new fca1ece HBASE-18484 VerifyRep by snapshot does not work when Yarn/SourceHBase/PeerHBase located in three different HDFS clusters
fca1ece is described below
commit fca1ece03ce95a73888bcbe3a8aaf4ba0a9c59a8
Author: huzheng <op...@gmail.com>
AuthorDate: Thu Jan 24 21:29:11 2019 +0800
HBASE-18484 VerifyRep by snapshot does not work when Yarn/SourceHBase/PeerHBase located in three different HDFS clusters
---
.../mapreduce/TableSnapshotInputFormatImpl.java | 2 -
.../mapreduce/replication/VerifyReplication.java | 7 +-
.../TestVerifyReplicationCrossDiffHdfs.java | 207 +++++++++++++++++++++
.../apache/hadoop/hbase/regionserver/HRegion.java | 6 +-
4 files changed, 213 insertions(+), 9 deletions(-)
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
index edd8c7f..73b5d05 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
@@ -538,9 +538,7 @@ public class TableSnapshotInputFormatImpl {
restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
- // TODO: restore from record readers to parallelize.
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
-
conf.set(RESTORE_DIR_KEY, restoreDir.toString());
}
}
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index ead64c9..63e1627 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -124,10 +124,9 @@ public class VerifyReplication extends Configured implements Tool {
public static class Verifier
extends TableMapper<ImmutableBytesWritable, Put> {
-
-
- public static enum Counters {
- GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
+ public enum Counters {
+ GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS
+ }
private Connection sourceConnection;
private Table sourceTable;
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationCrossDiffHdfs.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationCrossDiffHdfs.java
new file mode 100644
index 0000000..a07e0a8
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationCrossDiffHdfs.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestVerifyReplicationCrossDiffHdfs {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestVerifyReplicationCrossDiffHdfs.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestVerifyReplicationCrossDiffHdfs.class);
+
+ private static HBaseTestingUtility util1;
+ private static HBaseTestingUtility util2;
+ private static HBaseTestingUtility mapReduceUtil = new HBaseTestingUtility();
+
+ private static Configuration conf1 = HBaseConfiguration.create();
+ private static Configuration conf2;
+
+ private static final byte[] FAMILY = Bytes.toBytes("f");
+ private static final byte[] QUALIFIER = Bytes.toBytes("q");
+ private static final String PEER_ID = "1";
+ private static final TableName TABLE_NAME = TableName.valueOf("testVerifyRepCrossDiffHDFS");
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+ util1 = new HBaseTestingUtility(conf1);
+ util1.startMiniZKCluster();
+ MiniZooKeeperCluster miniZK = util1.getZkCluster();
+ conf1 = util1.getConfiguration();
+
+ conf2 = HBaseConfiguration.create(conf1);
+ conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+ util2 = new HBaseTestingUtility(conf2);
+ util2.setZkCluster(miniZK);
+
+ util1.startMiniCluster();
+ util2.startMiniCluster();
+
+ createTestingTable(util1.getAdmin());
+ createTestingTable(util2.getAdmin());
+ addTestingPeer();
+
+ LOG.info("Start to load some data to source cluster.");
+ loadSomeData();
+
+ LOG.info("Start mini MapReduce cluster.");
+ mapReduceUtil.setZkCluster(miniZK);
+ mapReduceUtil.startMiniMapReduceCluster();
+ }
+
+ private static void createTestingTable(Admin admin) throws IOException {
+ TableDescriptor table = TableDescriptorBuilder.newBuilder(TABLE_NAME)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(100)
+ .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+ .build();
+ admin.createTable(table);
+ }
+
+ private static void addTestingPeer() throws IOException {
+ ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
+ .setClusterKey(util2.getClusterKey()).setReplicateAllUserTables(false)
+ .setTableCFsMap(ImmutableMap.of(TABLE_NAME, ImmutableList.of())).build();
+ util1.getAdmin().addReplicationPeer(PEER_ID, rpc);
+ }
+
+ private static void loadSomeData() throws IOException, InterruptedException {
+ int numOfRows = 10;
+ try (Table table = util1.getConnection().getTable(TABLE_NAME)) {
+ for (int i = 0; i < numOfRows; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)));
+ }
+ }
+ // Wait some time until the peer received those rows.
+ Result[] results = null;
+ try (Table table = util2.getConnection().getTable(TABLE_NAME)) {
+ for (int i = 0; i < 100; i++) {
+ try (ResultScanner rs = table.getScanner(new Scan())) {
+ results = rs.next(numOfRows);
+ if (results == null || results.length < numOfRows) {
+ LOG.info("Retrying, wait until the peer received all the rows, currentRows:"
+ + (results == null ? 0 : results.length));
+ Thread.sleep(100);
+ }
+ }
+ }
+ }
+ Assert.assertNotNull(results);
+ Assert.assertEquals(10, results.length);
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ if (mapReduceUtil != null) {
+ mapReduceUtil.shutdownMiniCluster();
+ }
+ if (util2 != null) {
+ util2.shutdownMiniCluster();
+ }
+ if (util1 != null) {
+ util1.shutdownMiniCluster();
+ }
+ }
+
+ @Test
+ public void testVerifyRepBySnapshot() throws Exception {
+ Path rootDir = FSUtils.getRootDir(conf1);
+ FileSystem fs = rootDir.getFileSystem(conf1);
+ String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+ SnapshotTestingUtils.createSnapshotAndValidate(util1.getAdmin(), TABLE_NAME, new String(FAMILY),
+ sourceSnapshotName, rootDir, fs, true);
+
+ // Take target snapshot
+ Path peerRootDir = FSUtils.getRootDir(conf2);
+ FileSystem peerFs = peerRootDir.getFileSystem(conf2);
+ String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+ SnapshotTestingUtils.createSnapshotAndValidate(util2.getAdmin(), TABLE_NAME, new String(FAMILY),
+ peerSnapshotName, peerRootDir, peerFs, true);
+
+ String peerFSAddress = peerFs.getUri().toString();
+ String temPath1 = new Path(fs.getUri().toString(), "/tmp1").toString();
+ String temPath2 = "/tmp2";
+
+ String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+ "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
+ "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
+ "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), PEER_ID, TABLE_NAME.toString() };
+
+ // Use the yarn's config override the source cluster's config.
+ Configuration newConf = HBaseConfiguration.create(conf1);
+ HBaseConfiguration.merge(newConf, mapReduceUtil.getConfiguration());
+ newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+ CommonFSUtils.setRootDir(newConf, CommonFSUtils.getRootDir(conf1));
+ Job job = new VerifyReplication().createSubmittableJob(newConf, args);
+ if (job == null) {
+ fail("Job wasn't created, see the log");
+ }
+ if (!job.waitForCompletion(true)) {
+ fail("Job failed, see the log");
+ }
+ assertEquals(10,
+ job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+ assertEquals(0,
+ job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 32e3d12..7ae5dcc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7023,7 +7023,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
LOG.info("creating HRegion " + info.getTable().getNameAsString()
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
" Table name == " + info.getTable().getNameAsString());
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = rootDir.getFileSystem(conf);
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
HRegion region = HRegion.newHRegion(tableDir, wal, fs, conf, info, hTableDescriptor, null);
@@ -7125,7 +7125,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
fs = rsServices.getFileSystem();
}
if (fs == null) {
- fs = FileSystem.get(conf);
+ fs = rootDir.getFileSystem(conf);
}
return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
}
@@ -7294,7 +7294,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
fs = rsServices.getFileSystem();
}
if (fs == null) {
- fs = FileSystem.get(conf);
+ fs = rootDir.getFileSystem(conf);
}
HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, null);