You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2019/10/29 11:16:09 UTC
[hbase] branch branch-2 updated: HBASE-23136
PartionedMobFileCompactor bulkloaded files shouldn't get replicated
(addressing buklload replication related issue raised in HBASE-22380)
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new d2a027d HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated (addressing buklload replication related issue raised in HBASE-22380)
d2a027d is described below
commit d2a027d8be0723571759e18688bcc1f5c7865a8e
Author: Wellington Chevreuil <wc...@apache.org>
AuthorDate: Mon Oct 28 20:17:05 2019 +0000
HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated (addressing buklload replication related issue raised in HBASE-22380)
Signed-off-by: Josh Elser <el...@apache.org>
(cherry picked from commit 4d414020bb3bfd7f214d2a599426be700df772b2, then resolved conflicts)
---
.../hadoop/hbase/client/SecureBulkLoadClient.java | 8 +-
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 11 +-
.../hbase/shaded/protobuf/RequestConverter.java | 5 +-
.../src/main/protobuf/Client.proto | 1 +
hbase-protocol-shaded/src/main/protobuf/WAL.proto | 1 +
.../mob/compactions/PartitionedMobCompactor.java | 1 +
.../apache/hadoop/hbase/regionserver/HRegion.java | 7 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 2 +-
.../hbase/regionserver/SecureBulkLoadManager.java | 2 +-
.../replication/regionserver/ReplicationSink.java | 24 ++--
.../apache/hadoop/hbase/tool/BulkLoadHFiles.java | 1 +
.../hadoop/hbase/tool/LoadIncrementalHFiles.java | 11 +-
.../regionserver/TestBulkLoadReplication.java | 136 +++++++++++++++++----
13 files changed, 159 insertions(+), 51 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index 7e3166c..cb258e4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -116,7 +116,7 @@ public class SecureBulkLoadClient {
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) throws IOException {
return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken,
- bulkToken, false, null);
+ bulkToken, false, null, true);
}
/**
@@ -138,17 +138,17 @@ public class SecureBulkLoadClient {
final Token<?> userToken, final String bulkToken,
boolean copyFiles) throws IOException {
return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken,
- bulkToken, false, null);
+ bulkToken, false, null, true);
}
public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken,
- boolean copyFiles, List<String> clusterIds) throws IOException {
+ boolean copyFiles, List<String> clusterIds, boolean replicate) throws IOException {
BulkLoadHFileRequest request =
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum,
- userToken, bulkToken, copyFiles, clusterIds);
+ userToken, bulkToken, copyFiles, clusterIds, replicate);
try {
BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 0a2063d..5223551 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2582,16 +2582,19 @@ public final class ProtobufUtil {
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
- storeFilesSize, bulkloadSeqId, null);
+ storeFilesSize, bulkloadSeqId, null, true);
}
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
- Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
+ Map<String, Long> storeFilesSize, long bulkloadSeqId,
+ List<String> clusterIds, boolean replicate) {
BulkLoadDescriptor.Builder desc =
BulkLoadDescriptor.newBuilder()
- .setTableName(ProtobufUtil.toProtoTableName(tableName))
- .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
+ .setTableName(ProtobufUtil.toProtoTableName(tableName))
+ .setEncodedRegionName(encodedRegionName)
+ .setBulkloadSeqNum(bulkloadSeqId)
+ .setReplicate(replicate);
if(clusterIds != null) {
desc.addAllClusterIds(clusterIds);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 7dc8645..151a454 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -566,7 +566,7 @@ public final class RequestConverter {
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) {
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
- false, null);
+ false, null, true);
}
/**
@@ -583,7 +583,7 @@ public final class RequestConverter {
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken, boolean copyFiles,
- List<String> clusterIds) {
+ List<String> clusterIds, boolean replicate) {
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
@@ -624,6 +624,7 @@ public final class RequestConverter {
if (clusterIds != null) {
request.addAllClusterIds(clusterIds);
}
+ request.setReplicate(replicate);
return request.build();
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 07d8d71..a22c623 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -379,6 +379,7 @@ message BulkLoadHFileRequest {
optional string bulk_token = 5;
optional bool copy_file = 6 [default = false];
repeated string cluster_ids = 7;
+ optional bool replicate = 8 [default = true];
message FamilyPath {
required bytes family = 1;
diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
index c103075..fd622cf 100644
--- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
@@ -151,6 +151,7 @@ message BulkLoadDescriptor {
repeated StoreDescriptor stores = 3;
required int64 bulkload_seq_num = 4;
repeated string cluster_ids = 5;
+ optional bool replicate = 6 [default = true];
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 09d9d98..ed29902 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -848,6 +848,7 @@ public class PartitionedMobCompactor extends MobCompactor {
// bulkload the ref file
try {
LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
+ bulkload.disableReplication();
bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table,
connection.getRegionLocator(table.getName()));
} catch (Exception e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 5b894a5..2423c08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6099,7 +6099,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException {
- return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
+ return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false,
+ null, true);
}
/**
@@ -6150,7 +6151,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener,
- boolean copyFile, List<String> clusterIds) throws IOException {
+ boolean copyFile, List<String> clusterIds, boolean replicate) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6325,7 +6326,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALProtos.BulkLoadDescriptor loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
- storeFiles, storeFilesSizes, seqId, clusterIds);
+ storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 00e6169..32d75f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2402,7 +2402,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
try {
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
- request.getCopyFile(), clusterIds);
+ request.getCopyFile(), clusterIds, request.getReplicate());
} finally {
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 0badf2a..d0f3943 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -295,7 +295,7 @@ public class SecureBulkLoadManager {
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
- clusterIds);
+ clusterIds, request.getReplicate());
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 6e68760..ae0a732 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -203,18 +203,20 @@ public class ReplicationSink {
// Handle bulk load hfiles replication
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
- if(bulkLoadsPerClusters == null) {
- bulkLoadsPerClusters = new HashMap<>();
- }
- // Map of table name Vs list of pair of family and list of
- // hfile paths from its namespace
- Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
- bulkLoadsPerClusters.get(bld.getClusterIdsList());
- if (bulkLoadHFileMap == null) {
- bulkLoadHFileMap = new HashMap<>();
- bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
+ if(bld.getReplicate()) {
+ if (bulkLoadsPerClusters == null) {
+ bulkLoadsPerClusters = new HashMap<>();
+ }
+ // Map of table name Vs list of pair of family and list of
+ // hfile paths from its namespace
+ Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
+ bulkLoadsPerClusters.get(bld.getClusterIdsList());
+ if (bulkLoadHFileMap == null) {
+ bulkLoadHFileMap = new HashMap<>();
+ bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
+ }
+ buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
}
- buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
} else {
// Handle wal replication
if (isNewRowOrType(previousCell, cell)) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
index f3d627a..702ed75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
@@ -97,4 +97,5 @@ public interface BulkLoadHFiles {
static BulkLoadHFiles create(Configuration conf) {
return new BulkLoadHFilesTool(conf);
}
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index c5ba5a7..16ed114 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -157,6 +157,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private List<String> clusterIds = new ArrayList<>();
+ private boolean replicate = true;
+
/**
* Represents an HFile waiting to be loaded. An queue is used in this class in order to support
* the case where a region has split during the process of the load. When this happens, the HFile
@@ -547,7 +549,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try (Table table = conn.getTable(getTableName())) {
secureClient = new SecureBulkLoadClient(getConf(), table);
success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
- assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile, clusterIds);
+ assignSeqIds, fsDelegationToken.getUserToken(),
+ bulkToken, copyFile, clusterIds, replicate);
}
return success ? regionName : null;
} finally {
@@ -1267,6 +1270,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
/**
+ * Disables replication for these bulkloaded files.
+ */
+ public void disableReplication(){
+ this.replicate = false;
+ }
+ /**
* Infers region boundaries for a new table.
* <p>
* Parameter: <br>
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
index 028277f..6fd7288 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -24,10 +24,16 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -42,6 +48,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -54,14 +61,21 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
@@ -77,14 +91,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Integration test for bulk load replication. Defines two clusters, with two way replication.
- * Performs a bulk load on cluster defined by UTIL1 first, asserts the Cell on the bulk loaded file
- * gets into the related table in UTIL1, then also validates the same got replicated to cluster
- * UTIL2. Then, bulk loads another file into UTIL2, and checks if related values are present on
- * UTIL2, and also gets replicated to UTIL1.
- * It also defines a preBulkLoad coprocessor that is added to all test table regions on each of the
- * clusters, in order to count amount of times bulk load actually gets invoked. This is to certify
- * we are not entered in the infinite loop condition addressed by HBASE-22380.
+ * Integration test for bulk load replication. Defines three clusters, with the following
+ * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between
+ * 2 and 3).
+ *
+ * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file
+ * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication
+ * topology all these bulk loads should get replicated only once on each peer. To assert this,
+ * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the
+ * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
+ * we are not entering the infinite loop condition addressed by HBASE-22380.
*/
@Category({ ReplicationTests.class, MediumTests.class})
public class TestBulkLoadReplication extends TestReplicationBase {
@@ -103,11 +119,14 @@ public class TestBulkLoadReplication extends TestReplicationBase {
private static final String PEER_ID1 = "1";
private static final String PEER_ID3 = "3";
- private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);
+ private static AtomicInteger BULK_LOADS_COUNT;
private static CountDownLatch BULK_LOAD_LATCH;
private static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
private static final Configuration CONF3 = UTIL3.getConfiguration();
+
+ private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
+
private static Table htable3;
@Rule
@@ -132,7 +151,9 @@ public class TestBulkLoadReplication extends TestReplicationBase {
UTIL3.startMiniCluster(NUM_SLAVES1);
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
- .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
+ .setMobEnabled(true)
+ .setMobThreshold(4000)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
@@ -147,6 +168,9 @@ public class TestBulkLoadReplication extends TestReplicationBase {
@Before
@Override
public void setUpBase() throws Exception {
+ //"super.setUpBase()" already sets replication from 1->2,
+ //then on the subsequent lines, sets 2->1, 2->3 and 3->2.
+ //So we have following topology: "1 <-> 2 <->3"
super.setUpBase();
ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
@@ -157,9 +181,10 @@ public class TestBulkLoadReplication extends TestReplicationBase {
UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
//adds cluster2 as a remote peer on cluster3
UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
- setupCoprocessor(UTIL1);
- setupCoprocessor(UTIL2);
- setupCoprocessor(UTIL3);
+ setupCoprocessor(UTIL1, "cluster1");
+ setupCoprocessor(UTIL2, "cluster2");
+ setupCoprocessor(UTIL3, "cluster3");
+ BULK_LOADS_COUNT = new AtomicInteger(0);
}
private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
@@ -167,12 +192,19 @@ public class TestBulkLoadReplication extends TestReplicationBase {
.setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build();
}
- private void setupCoprocessor(HBaseTestingUtility cluster){
+ private void setupCoprocessor(HBaseTestingUtility cluster, String name){
cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
try {
- r.getCoprocessorHost()
- .load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
- cluster.getConfiguration());
+ TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost().
+ findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
+ if(cp == null) {
+ r.getCoprocessorHost().
+ load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
+ cluster.getConfiguration());
+ cp = r.getCoprocessorHost().
+ findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
+ cp.clusterName = cluster.getClusterKey();
+ }
} catch (Exception e){
LOG.error(e.getMessage(), e);
}
@@ -221,6 +253,31 @@ public class TestBulkLoadReplication extends TestReplicationBase {
assertEquals(9, BULK_LOADS_COUNT.get());
}
+ @Test
+ public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception {
+ Path path = createMobFiles(UTIL3);
+ ColumnFamilyDescriptor descriptor =
+ new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName);
+ ExecutorService pool = null;
+ try {
+ pool = Executors.newFixedThreadPool(1);
+ PartitionedMobCompactor compactor =
+ new PartitionedMobCompactor(UTIL3.getConfiguration(), UTIL3.getTestFileSystem(), tableName,
+ descriptor, pool);
+ BULK_LOAD_LATCH = new CountDownLatch(1);
+ BULK_LOADS_COUNT.set(0);
+ compactor.compact(Arrays.asList(UTIL3.getTestFileSystem().listStatus(path)), true);
+ assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS));
+ Thread.sleep(400);
+ assertEquals(1, BULK_LOADS_COUNT.get());
+ } finally {
+ if(pool != null && !pool.isTerminated()) {
+ pool.shutdownNow();
+ }
+ }
+ }
+
+
private void assertBulkLoadConditions(byte[] row, byte[] value,
HBaseTestingUtility utility, Table...tables) throws Exception {
BULK_LOAD_LATCH = new CountDownLatch(3);
@@ -236,11 +293,11 @@ public class TestBulkLoadReplication extends TestReplicationBase {
String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration());
copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster());
BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
- bulkLoadHFilesTool.bulkLoad(tableName, new Path("/bulk_dir"));
+ bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
}
private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
- Path bulkLoadDir = new Path("/bulk_dir/f");
+ Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f");
cluster.getFileSystem().mkdirs(bulkLoadDir);
cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
}
@@ -281,22 +338,53 @@ public class TestBulkLoadReplication extends TestReplicationBase {
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}
+ private Path createMobFiles(HBaseTestingUtility util) throws IOException {
+ Path testDir = FSUtils.getRootDir(util.getConfiguration());
+ Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
+ Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f");
+ HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+ MobFileName mobFileName = null;
+ byte[] mobFileStartRow = new byte[32];
+ for (byte rowKey : Bytes.toBytes("01234")) {
+ mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()),
+ UUID.randomUUID().toString().replaceAll("-", ""));
+ StoreFileWriter mobFileWriter =
+ new StoreFileWriter.Builder(util.getConfiguration(),
+ new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta)
+ .withFilePath(new Path(basePath, mobFileName.getFileName())).build();
+ long now = System.currentTimeMillis();
+ try {
+ for (int i = 0; i < 10; i++) {
+ byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i));
+ byte[] dummyData = new byte[5000];
+ new Random().nextBytes(dummyData);
+ mobFileWriter.append(
+ new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData));
+ }
+ } finally {
+ mobFileWriter.close();
+ }
+ }
+ return basePath;
+ }
+
public static class BulkReplicationTestObserver implements RegionCoprocessor {
+ String clusterName;
+ AtomicInteger bulkLoadCounts = new AtomicInteger();
+
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(new RegionObserver() {
- @Override
- public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
- List<Pair<byte[], String>> familyPaths) throws IOException {
- BULK_LOADS_COUNT.incrementAndGet();
- }
@Override
public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
throws IOException {
BULK_LOAD_LATCH.countDown();
+ BULK_LOADS_COUNT.incrementAndGet();
+ LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName,
+ bulkLoadCounts.addAndGet(1));
}
});
}