You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2019/10/10 09:37:49 UTC
[hbase] branch master updated: HBASE-23136
PartionedMobFileCompactor bulkloaded files shouldn't get replicated
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new fec4c52 HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated
fec4c52 is described below
commit fec4c5249968e112b5ab6f4154d0e6c1fe428abc
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Thu Oct 10 10:37:42 2019 +0100
HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated
Signed-off-by: stack <st...@apache.org>
---
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 11 ++--
.../hbase/shaded/protobuf/RequestConverter.java | 5 +-
.../src/main/protobuf/Client.proto | 1 +
hbase-protocol-shaded/src/main/protobuf/WAL.proto | 1 +
.../hbase/client/AsyncClusterConnection.java | 15 ++---
.../hbase/client/AsyncClusterConnectionImpl.java | 4 +-
.../mob/compactions/PartitionedMobCompactor.java | 4 +-
.../apache/hadoop/hbase/regionserver/HRegion.java | 7 ++-
.../hbase/regionserver/SecureBulkLoadManager.java | 2 +-
.../replication/regionserver/ReplicationSink.java | 23 ++++----
.../apache/hadoop/hbase/tool/BulkLoadHFiles.java | 6 ++
.../hadoop/hbase/tool/BulkLoadHFilesTool.java | 9 ++-
.../hbase/client/DummyAsyncClusterConnection.java | 2 +-
.../regionserver/TestBulkLoadReplication.java | 65 +++++++++++++++++++++-
.../tool/TestBulkLoadHFilesSplitRecovery.java | 3 +-
15 files changed, 123 insertions(+), 35 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 52e3bf1..8108217 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2570,16 +2570,19 @@ public final class ProtobufUtil {
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
- storeFilesSize, bulkloadSeqId, null);
+ storeFilesSize, bulkloadSeqId, null, true);
}
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
- Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
+ Map<String, Long> storeFilesSize, long bulkloadSeqId,
+ List<String> clusterIds, boolean replicate) {
BulkLoadDescriptor.Builder desc =
BulkLoadDescriptor.newBuilder()
- .setTableName(ProtobufUtil.toProtoTableName(tableName))
- .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
+ .setTableName(ProtobufUtil.toProtoTableName(tableName))
+ .setEncodedRegionName(encodedRegionName)
+ .setBulkloadSeqNum(bulkloadSeqId)
+ .setReplicate(replicate);
if(clusterIds != null) {
desc.addAllClusterIds(clusterIds);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index ae3cd3f..d45423c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -568,7 +568,7 @@ public final class RequestConverter {
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) {
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
- false, null);
+ false, null, true);
}
/**
@@ -585,7 +585,7 @@ public final class RequestConverter {
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken, boolean copyFiles,
- List<String> clusterIds) {
+ List<String> clusterIds, boolean replicate) {
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
@@ -626,6 +626,7 @@ public final class RequestConverter {
if (clusterIds != null) {
request.addAllClusterIds(clusterIds);
}
+ request.setReplicate(replicate);
return request.build();
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 07d8d71..a22c623 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -379,6 +379,7 @@ message BulkLoadHFileRequest {
optional string bulk_token = 5;
optional bool copy_file = 6 [default = false];
repeated string cluster_ids = 7;
+ optional bool replicate = 8 [default = true];
message FamilyPath {
required bytes family = 1;
diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
index c103075..fd622cf 100644
--- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
@@ -151,6 +151,7 @@ message BulkLoadDescriptor {
repeated StoreDescriptor stores = 3;
required int64 bulkload_seq_num = 4;
repeated string cluster_ids = 5;
+ optional bool replicate = 6 [default = true];
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 5c57817..92118ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -81,17 +81,18 @@ public interface AsyncClusterConnection extends AsyncConnection {
* Defined as default here to avoid breaking callers who rely on the bulkLoad version that does
* not expect additional clusterIds param.
* @param tableName the target table
- * @param familyPaths hdfs path for the the table family dirs containg files to be loaded
- * @param row row key
- * @param assignSeqNum seq num for the event on WAL
- * @param userToken user token
- * @param bulkToken bulk load token
- * @param copyFiles flag for copying the loaded hfiles
+ * @param familyPaths hdfs path for the the table family dirs containg files to be loaded.
+ * @param row row key.
+ * @param assignSeqNum seq num for the event on WAL.
+ * @param userToken user token.
+ * @param bulkToken bulk load token.
+ * @param copyFiles flag for copying the loaded hfiles.
* @param clusterIds list of cluster ids where the given bulk load has already been processed.
+ * @param replicate flags if the bulkload is targeted for replication.
*/
CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths,
byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken, boolean copyFiles,
- List<String> clusterIds);
+ List<String> clusterIds, boolean replicate);
/**
* Clean up after finishing bulk load, no matter success or not.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 746c3b8..046ef41 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -109,13 +109,13 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
@Override
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
- String bulkToken, boolean copyFiles, List<String> clusterIds) {
+ String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) {
return callerFactory.<Boolean> single().table(tableName).row(row)
.action((controller, loc, stub) -> ConnectionUtils
.<Void, BulkLoadHFileRequest, BulkLoadHFileResponse, Boolean> call(controller, loc, stub,
null,
(rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum,
- userToken, bulkToken, copyFiles, clusterIds),
+ userToken, bulkToken, copyFiles, clusterIds, replicate),
(s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded()))
.call();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index a5823ec..dba591d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -827,7 +827,9 @@ public class PartitionedMobCompactor extends MobCompactor {
throws IOException {
// bulkload the ref file
try {
- BulkLoadHFiles.create(conf).bulkLoad(tableName, bulkloadDirectory);
+ BulkLoadHFiles bulkLoader = BulkLoadHFiles.create(conf);
+ bulkLoader.disableReplication();
+ bulkLoader.bulkLoad(tableName, bulkloadDirectory);
} catch (Exception e) {
throw new IOException(e);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a9ff440..571be00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6146,7 +6146,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException {
- return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
+ return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false,
+ null, true);
}
/**
@@ -6197,7 +6198,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener,
- boolean copyFile, List<String> clusterIds) throws IOException {
+ boolean copyFile, List<String> clusterIds, boolean replicate) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6372,7 +6373,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALProtos.BulkLoadDescriptor loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
- storeFiles, storeFilesSizes, seqId, clusterIds);
+ storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index ad19473..bccc8fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -296,7 +296,7 @@ public class SecureBulkLoadManager {
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
- clusterIds);
+ clusterIds, request.getReplicate());
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 6c3f70c..51cbea8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -203,18 +203,19 @@ public class ReplicationSink {
// Handle bulk load hfiles replication
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
- if(bulkLoadsPerClusters == null) {
- bulkLoadsPerClusters = new HashMap<>();
- }
- // Map of table name Vs list of pair of family and list of
- // hfile paths from its namespace
- Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
- bulkLoadsPerClusters.get(bld.getClusterIdsList());
- if (bulkLoadHFileMap == null) {
- bulkLoadHFileMap = new HashMap<>();
- bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
+ if(bld.getReplicate()) {
+ if (bulkLoadsPerClusters == null) {
+ bulkLoadsPerClusters = new HashMap<>();
+ }
+ // Map of table name Vs list of pair of family and list of
+ // hfile paths from its namespace
+ Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = bulkLoadsPerClusters.get(bld.getClusterIdsList());
+ if (bulkLoadHFileMap == null) {
+ bulkLoadHFileMap = new HashMap<>();
+ bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
+ }
+ buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
}
- buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
} else {
// Handle wal replication
if (isNewRowOrType(previousCell, cell)) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
index f3d627a..1cffe05 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
@@ -85,6 +85,10 @@ public interface BulkLoadHFiles {
throws TableNotFoundException, IOException;
/**
+ * Disable replication for this bulkload, if bulkload replication is configured.
+ */
+ void disableReplication();
+ /**
* Perform a bulk load of the given directory into the given pre-existing table.
* @param tableName the table to load into
* @param dir the directory that was provided as the output path of a job using
@@ -97,4 +101,6 @@ public interface BulkLoadHFiles {
static BulkLoadHFiles create(Configuration conf) {
return new BulkLoadHFilesTool(conf);
}
+
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
index 0e2e029..294d94b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
@@ -132,6 +132,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
private String bulkToken;
private List<String> clusterIds = new ArrayList<>();
+ private boolean replicate = true;
public BulkLoadHFilesTool(Configuration conf) {
// make a copy, just to be sure we're not overriding someone else's config
@@ -379,7 +380,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
.collect(Collectors.toList());
CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
- fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds), (loaded, error) -> {
+ fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate),
+ (loaded, error) -> {
if (error != null) {
LOG.error("Encountered unrecoverable error from region server", error);
if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) &&
@@ -1052,4 +1054,9 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
System.exit(ret);
}
+
+ @Override
+ public void disableReplication(){
+ this.replicate = false;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
index 5a34457..8755749 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -144,7 +144,7 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
@Override
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
- String bulkToken, boolean copyFiles, List<String> clusterIds) {
+ String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) {
return null;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
index 7a49989..b227403 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -24,10 +24,15 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -42,6 +47,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -54,14 +60,22 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
+import org.apache.hadoop.hbase.mob.compactions.TestPartitionedMobCompactor;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
@@ -137,7 +151,9 @@ public class TestBulkLoadReplication extends TestReplicationBase {
UTIL3.startMiniCluster(NUM_SLAVES1);
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
- .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
+ .setMobEnabled(true)
+ .setMobThreshold(4000)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
@@ -232,6 +248,23 @@ public class TestBulkLoadReplication extends TestReplicationBase {
assertEquals(9, BULK_LOADS_COUNT.get());
}
+ @Test
+ public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception {
+ Path path = createMobFiles(UTIL3);
+ ColumnFamilyDescriptor descriptor =
+ new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName);
+ PartitionedMobCompactor compactor = new PartitionedMobCompactor(UTIL3.getConfiguration(),
+ UTIL3.getTestFileSystem(), tableName, descriptor, Executors.newFixedThreadPool(1));
+ BULK_LOAD_LATCH = new CountDownLatch(1);
+ BULK_LOADS_COUNT.set(0);
+ compactor.compact(Arrays.asList(UTIL3.getTestFileSystem().listStatus(path)), true);
+ assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS));
+ Thread.sleep(400);
+ assertEquals(1, BULK_LOADS_COUNT.get());
+
+ }
+
+
private void assertBulkLoadConditions(byte[] row, byte[] value,
HBaseTestingUtility utility, Table...tables) throws Exception {
BULK_LOAD_LATCH = new CountDownLatch(3);
@@ -292,6 +325,36 @@ public class TestBulkLoadReplication extends TestReplicationBase {
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}
+ private Path createMobFiles(HBaseTestingUtility util) throws IOException {
+ Path testDir = FSUtils.getRootDir(util.getConfiguration());
+ Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
+ Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f");
+ HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+ MobFileName mobFileName = null;
+ byte[] mobFileStartRow = new byte[32];
+ for (byte rowKey : Bytes.toBytes("01234")) {
+ mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()),
+ UUID.randomUUID().toString().replaceAll("-", ""));
+ StoreFileWriter mobFileWriter =
+ new StoreFileWriter.Builder(util.getConfiguration(),
+ new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta)
+ .withFilePath(new Path(basePath, mobFileName.getFileName())).build();
+ long now = System.currentTimeMillis();
+ try {
+ for (int i = 0; i < 10; i++) {
+ byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i));
+ byte[] dummyData = new byte[5000];
+ new Random().nextBytes(dummyData);
+ mobFileWriter.append(
+ new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData));
+ }
+ } finally {
+ mobFileWriter.close();
+ }
+ }
+ return basePath;
+ }
+
public static class BulkReplicationTestObserver implements RegionCoprocessor {
String clusterName;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java
index 1326564..b626fe8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java
@@ -267,7 +267,8 @@ public class TestBulkLoadHFilesSplitRecovery {
private static AsyncClusterConnection mockAndInjectError(AsyncClusterConnection conn) {
AsyncClusterConnection errConn = spy(conn);
doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn)
- .bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList());
+ .bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList(),
+ anyBoolean());
return errConn;
}