You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2019/10/30 08:53:48 UTC
[hbase] branch branch-2.2 updated: HBASE-23136
PartionedMobFileCompactor bulkloaded files shouldn't get replicated
(addressing buklload replication related issue raised in HBASE-22380)
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new 97498e2 HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated (addressing buklload replication related issue raised in HBASE-22380)
97498e2 is described below
commit 97498e283911bf361a2b10e52db6f3619385ecf9
Author: Wellington Chevreuil <wc...@apache.org>
AuthorDate: Wed Oct 30 08:52:16 2019 +0000
HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated (addressing buklload replication related issue raised in HBASE-22380)
Signed-off-by: Josh Elser <el...@apache.org>
(cherry picked from commit 4d414020bb3bfd7f214d2a599426be700df772b2, then resolved conflicts)
---
.../hadoop/hbase/client/SecureBulkLoadClient.java | 8 +-
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 11 ++-
.../hbase/shaded/protobuf/RequestConverter.java | 5 +-
.../src/main/protobuf/Client.proto | 1 +
hbase-protocol-shaded/src/main/protobuf/WAL.proto | 1 +
.../mob/compactions/PartitionedMobCompactor.java | 1 +
.../apache/hadoop/hbase/regionserver/HRegion.java | 7 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 2 +-
.../hbase/regionserver/SecureBulkLoadManager.java | 2 +-
.../replication/regionserver/ReplicationSink.java | 24 ++---
.../apache/hadoop/hbase/tool/BulkLoadHFiles.java | 1 +
.../hadoop/hbase/tool/LoadIncrementalHFiles.java | 11 ++-
.../regionserver/TestBulkLoadReplication.java | 107 ++++++++++++++++++---
13 files changed, 142 insertions(+), 39 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index 7e3166c..cb258e4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -116,7 +116,7 @@ public class SecureBulkLoadClient {
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) throws IOException {
return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken,
- bulkToken, false, null);
+ bulkToken, false, null, true);
}
/**
@@ -138,17 +138,17 @@ public class SecureBulkLoadClient {
final Token<?> userToken, final String bulkToken,
boolean copyFiles) throws IOException {
return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken,
- bulkToken, false, null);
+ bulkToken, false, null, true);
}
public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken,
- boolean copyFiles, List<String> clusterIds) throws IOException {
+ boolean copyFiles, List<String> clusterIds, boolean replicate) throws IOException {
BulkLoadHFileRequest request =
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum,
- userToken, bulkToken, copyFiles, clusterIds);
+ userToken, bulkToken, copyFiles, clusterIds, replicate);
try {
BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 753dd55..354e41c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2581,16 +2581,19 @@ public final class ProtobufUtil {
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
- storeFilesSize, bulkloadSeqId, null);
+ storeFilesSize, bulkloadSeqId, null, true);
}
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
- Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
+ Map<String, Long> storeFilesSize, long bulkloadSeqId,
+ List<String> clusterIds, boolean replicate) {
BulkLoadDescriptor.Builder desc =
BulkLoadDescriptor.newBuilder()
- .setTableName(ProtobufUtil.toProtoTableName(tableName))
- .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
+ .setTableName(ProtobufUtil.toProtoTableName(tableName))
+ .setEncodedRegionName(encodedRegionName)
+ .setBulkloadSeqNum(bulkloadSeqId)
+ .setReplicate(replicate);
if(clusterIds != null) {
desc.addAllClusterIds(clusterIds);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index d2c818e..1c9bac7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -562,7 +562,7 @@ public final class RequestConverter {
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) {
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
- false, null);
+ false, null, true);
}
/**
@@ -579,7 +579,7 @@ public final class RequestConverter {
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken, boolean copyFiles,
- List<String> clusterIds) {
+ List<String> clusterIds, boolean replicate) {
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
@@ -620,6 +620,7 @@ public final class RequestConverter {
if (clusterIds != null) {
request.addAllClusterIds(clusterIds);
}
+ request.setReplicate(replicate);
return request.build();
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 07d8d71..a22c623 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -379,6 +379,7 @@ message BulkLoadHFileRequest {
optional string bulk_token = 5;
optional bool copy_file = 6 [default = false];
repeated string cluster_ids = 7;
+ optional bool replicate = 8 [default = true];
message FamilyPath {
required bytes family = 1;
diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
index 9020daf..fe44813 100644
--- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
@@ -145,6 +145,7 @@ message BulkLoadDescriptor {
repeated StoreDescriptor stores = 3;
required int64 bulkload_seq_num = 4;
repeated string cluster_ids = 5;
+ optional bool replicate = 6 [default = true];
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 09d9d98..ed29902 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -848,6 +848,7 @@ public class PartitionedMobCompactor extends MobCompactor {
// bulkload the ref file
try {
LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
+ bulkload.disableReplication();
bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table,
connection.getRegionLocator(table.getName()));
} catch (Exception e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d7d2a15..28a8749 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6101,7 +6101,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException {
- return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
+ return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false,
+ null, true);
}
/**
@@ -6152,7 +6153,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener,
- boolean copyFile, List<String> clusterIds) throws IOException {
+ boolean copyFile, List<String> clusterIds, boolean replicate) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6327,7 +6328,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALProtos.BulkLoadDescriptor loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
- storeFiles, storeFilesSizes, seqId, clusterIds);
+ storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 5b12430..5e6625e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2393,7 +2393,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
try {
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
- request.getCopyFile(), clusterIds);
+ request.getCopyFile(), clusterIds, request.getReplicate());
} finally {
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index f51608d..efc85cf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -294,7 +294,7 @@ public class SecureBulkLoadManager {
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
- clusterIds);
+ clusterIds, request.getReplicate());
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 6e68760..ae0a732 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -203,18 +203,20 @@ public class ReplicationSink {
// Handle bulk load hfiles replication
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
- if(bulkLoadsPerClusters == null) {
- bulkLoadsPerClusters = new HashMap<>();
- }
- // Map of table name Vs list of pair of family and list of
- // hfile paths from its namespace
- Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
- bulkLoadsPerClusters.get(bld.getClusterIdsList());
- if (bulkLoadHFileMap == null) {
- bulkLoadHFileMap = new HashMap<>();
- bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
+ if(bld.getReplicate()) {
+ if (bulkLoadsPerClusters == null) {
+ bulkLoadsPerClusters = new HashMap<>();
+ }
+ // Map of table name Vs list of pair of family and list of
+ // hfile paths from its namespace
+ Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
+ bulkLoadsPerClusters.get(bld.getClusterIdsList());
+ if (bulkLoadHFileMap == null) {
+ bulkLoadHFileMap = new HashMap<>();
+ bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
+ }
+ buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
}
- buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
} else {
// Handle wal replication
if (isNewRowOrType(previousCell, cell)) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
index f3d627a..702ed75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
@@ -97,4 +97,5 @@ public interface BulkLoadHFiles {
static BulkLoadHFiles create(Configuration conf) {
return new BulkLoadHFilesTool(conf);
}
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index c5ba5a7..16ed114 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -157,6 +157,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private List<String> clusterIds = new ArrayList<>();
+ private boolean replicate = true;
+
/**
* Represents an HFile waiting to be loaded. An queue is used in this class in order to support
* the case where a region has split during the process of the load. When this happens, the HFile
@@ -547,7 +549,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try (Table table = conn.getTable(getTableName())) {
secureClient = new SecureBulkLoadClient(getConf(), table);
success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
- assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile, clusterIds);
+ assignSeqIds, fsDelegationToken.getUserToken(),
+ bulkToken, copyFile, clusterIds, replicate);
}
return success ? regionName : null;
} finally {
@@ -1267,6 +1270,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
/**
+ * Disables replication for these bulkloaded files.
+ */
+ public void disableReplication(){
+ this.replicate = false;
+ }
+ /**
* Infers region boundaries for a new table.
* <p>
* Parameter: <br>
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
index 4d69bdf..14e3976 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -25,11 +25,17 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -45,6 +51,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -57,14 +64,21 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
@@ -109,9 +123,11 @@ public class TestBulkLoadReplication extends TestReplicationBase {
private static final String PEER_ID3 = "3";
private static final String PEER_ID4 = "4";
- private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);
+ private static AtomicInteger BULK_LOADS_COUNT;
private static CountDownLatch BULK_LOAD_LATCH;
+ private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
+
private static HBaseTestingUtility utility3;
private static HBaseTestingUtility utility4;
private static Configuration conf3;
@@ -151,7 +167,9 @@ public class TestBulkLoadReplication extends TestReplicationBase {
util.startMiniCluster(2);
TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
- .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
+ .setMobEnabled(true)
+ .setMobThreshold(4000)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
@@ -180,6 +198,7 @@ public class TestBulkLoadReplication extends TestReplicationBase {
setupCoprocessor(utility1);
setupCoprocessor(utility4);
setupCoprocessor(utility3);
+ BULK_LOADS_COUNT = new AtomicInteger(0);
}
private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
@@ -190,9 +209,16 @@ public class TestBulkLoadReplication extends TestReplicationBase {
private void setupCoprocessor(HBaseTestingUtility cluster){
cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
try {
- r.getCoprocessorHost()
- .load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
- cluster.getConfiguration());
+ TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost().
+ findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
+ if(cp == null) {
+ r.getCoprocessorHost().
+ load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
+ cluster.getConfiguration());
+ cp = r.getCoprocessorHost().
+ findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
+ cp.clusterName = cluster.getClusterKey();
+ }
} catch (Exception e){
LOG.error(e.getMessage(), e);
}
@@ -206,6 +232,7 @@ public class TestBulkLoadReplication extends TestReplicationBase {
utility4.getAdmin().removeReplicationPeer(PEER_ID1);
utility4.getAdmin().removeReplicationPeer(PEER_ID3);
utility3.getAdmin().removeReplicationPeer(PEER_ID4);
+ utility1.getAdmin().removeReplicationPeer(PEER_ID4);
}
private static void setupBulkLoadConfigsForCluster(Configuration config,
@@ -241,6 +268,31 @@ public class TestBulkLoadReplication extends TestReplicationBase {
assertEquals(9, BULK_LOADS_COUNT.get());
}
+ @Test
+ public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception {
+ Path path = createMobFiles(utility3);
+ ColumnFamilyDescriptor descriptor =
+ new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName);
+ ExecutorService pool = null;
+ try {
+ pool = Executors.newFixedThreadPool(1);
+ PartitionedMobCompactor compactor =
+ new PartitionedMobCompactor(utility3.getConfiguration(), utility3.getTestFileSystem(),
+ tableName, descriptor, pool);
+ BULK_LOAD_LATCH = new CountDownLatch(1);
+ BULK_LOADS_COUNT.set(0);
+ compactor.compact(Arrays.asList(utility3.getTestFileSystem().listStatus(path)), true);
+ assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS));
+ Thread.sleep(400);
+ assertEquals(1, BULK_LOADS_COUNT.get());
+ } finally {
+ if(pool != null && !pool.isTerminated()) {
+ pool.shutdownNow();
+ }
+ }
+ }
+
+
private void assertBulkLoadConditions(byte[] row, byte[] value,
HBaseTestingUtility utility, Table...tables) throws Exception {
BULK_LOAD_LATCH = new CountDownLatch(3);
@@ -260,13 +312,13 @@ public class TestBulkLoadReplication extends TestReplicationBase {
new LoadIncrementalHFiles(cluster.getConfiguration());
Map<byte[], List<Path>> family2Files = new HashMap<>();
List<Path> files = new ArrayList<>();
- files.add(new Path("/bulk_dir/f/" + bulkLoadFilePath.getName()));
+ files.add(new Path(BULK_LOAD_BASE_DIR + "/f/" + bulkLoadFilePath.getName()));
family2Files.put(Bytes.toBytes("f"), files);
bulkLoadHFilesTool.run(family2Files, tableName);
}
private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
- Path bulkLoadDir = new Path("/bulk_dir/f");
+ Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/f/");
cluster.getFileSystem().mkdirs(bulkLoadDir);
cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
}
@@ -307,22 +359,53 @@ public class TestBulkLoadReplication extends TestReplicationBase {
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}
+ private Path createMobFiles(HBaseTestingUtility util) throws IOException {
+ Path testDir = FSUtils.getRootDir(util.getConfiguration());
+ Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
+ Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f");
+ HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+ MobFileName mobFileName = null;
+ byte[] mobFileStartRow = new byte[32];
+ for (byte rowKey : Bytes.toBytes("01234")) {
+ mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()),
+ UUID.randomUUID().toString().replaceAll("-", ""));
+ StoreFileWriter mobFileWriter =
+ new StoreFileWriter.Builder(util.getConfiguration(),
+ new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta)
+ .withFilePath(new Path(basePath, mobFileName.getFileName())).build();
+ long now = System.currentTimeMillis();
+ try {
+ for (int i = 0; i < 10; i++) {
+ byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i));
+ byte[] dummyData = new byte[5000];
+ new Random().nextBytes(dummyData);
+ mobFileWriter.append(
+ new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData));
+ }
+ } finally {
+ mobFileWriter.close();
+ }
+ }
+ return basePath;
+ }
+
public static class BulkReplicationTestObserver implements RegionCoprocessor {
+ String clusterName;
+ AtomicInteger bulkLoadCounts = new AtomicInteger();
+
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(new RegionObserver() {
- @Override
- public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
- List<Pair<byte[], String>> familyPaths) throws IOException {
- BULK_LOADS_COUNT.incrementAndGet();
- }
@Override
public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
throws IOException {
BULK_LOAD_LATCH.countDown();
+ BULK_LOADS_COUNT.incrementAndGet();
+ LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName,
+ bulkLoadCounts.addAndGet(1));
}
});
}