You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2020/06/06 00:58:59 UTC
[accumulo] branch master updated: Create max tablets property in
new bulk import (#1614)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new b1e67f7 Create max tablets property in new bulk import (#1614)
b1e67f7 is described below
commit b1e67f7ced838037965436fc1d1f255d139a4045
Author: Mike Miller <mm...@apache.org>
AuthorDate: Fri Jun 5 20:58:50 2020 -0400
Create max tablets property in new bulk import (#1614)
---
.../accumulo/core/clientImpl/bulk/BulkImport.java | 52 +++++++++-----
.../org/apache/accumulo/core/conf/Property.java | 3 +
.../master/tableOps/bulkVer2/PrepBulkImport.java | 57 ++++++++++++----
.../tableOps/bulkVer2/PrepBulkImportTest.java | 31 ++++++++-
.../apache/accumulo/test/functional/BulkNewIT.java | 79 ++++++++++++++++++++++
5 files changed, 189 insertions(+), 33 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
index ebc3a3d..5120333 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
@@ -64,6 +64,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.crypto.CryptoServiceFactory;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
@@ -133,6 +134,14 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
SortedMap<KeyExtent,Bulk.Files> mappings;
TableOperationsImpl tableOps = new TableOperationsImpl(context);
+
+ int maxTablets = 0;
+ for (var prop : tableOps.getProperties(tableName)) {
+ if (prop.getKey().equals(Property.TABLE_BULK_MAX_TABLETS.getKey())) {
+ maxTablets = Integer.parseInt(prop.getValue());
+ break;
+ }
+ }
Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
.incrementBy(100, MILLISECONDS).maxWait(2, MINUTES).backOffFactor(1.5)
.logInterval(3, TimeUnit.MINUTES).createRetry();
@@ -141,9 +150,9 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
boolean shouldRetry = true;
while (shouldRetry) {
if (plan == null) {
- mappings = computeMappingFromFiles(fs, tableId, srcPath);
+ mappings = computeMappingFromFiles(fs, tableId, srcPath, maxTablets);
} else {
- mappings = computeMappingFromPlan(fs, tableId, srcPath);
+ mappings = computeMappingFromPlan(fs, tableId, srcPath, maxTablets);
}
if (mappings.isEmpty())
@@ -385,7 +394,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
}
private SortedMap<KeyExtent,Files> computeMappingFromPlan(FileSystem fs, TableId tableId,
- Path srcPath)
+ Path srcPath, int maxTablets)
throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
Map<String,List<Destination>> fileDestinations =
@@ -422,7 +431,9 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
for (Entry<String,List<Destination>> entry : fileDestinations.entrySet()) {
String fileName = entry.getKey();
List<Destination> destinations = entry.getValue();
- Set<KeyExtent> extents = mapDesitnationsToExtents(tableId, extentCache, destinations);
+ Set<KeyExtent> extents = mapDestinationsToExtents(tableId, extentCache, destinations);
+ log.debug("The file {} mapped to {} tablets.", fileName, extents.size());
+ checkTabletCount(maxTablets, extents.size(), fileName);
long estSize = (long) (fileLens.get(fileName) / (double) extents.size());
@@ -439,7 +450,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
return row == null ? null : new Text(row);
}
- private Set<KeyExtent> mapDesitnationsToExtents(TableId tableId, KeyExtentCache kec,
+ private Set<KeyExtent> mapDestinationsToExtents(TableId tableId, KeyExtentCache kec,
List<Destination> destinations)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Set<KeyExtent> extents = new HashSet<>();
@@ -470,7 +481,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
}
private SortedMap<KeyExtent,Bulk.Files> computeMappingFromFiles(FileSystem fs, TableId tableId,
- Path dirPath) throws IOException {
+ Path dirPath, int maxTablets) throws IOException {
Executor executor;
ExecutorService service = null;
@@ -486,7 +497,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
}
try {
- return computeFileToTabletMappings(fs, tableId, dirPath, executor, context);
+ return computeFileToTabletMappings(fs, tableId, dirPath, executor, context, maxTablets);
} finally {
if (service != null) {
service.shutdown();
@@ -523,8 +534,8 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
return fileList;
}
- public static SortedMap<KeyExtent,Bulk.Files> computeFileToTabletMappings(FileSystem fs,
- TableId tableId, Path dirPath, Executor executor, ClientContext context) throws IOException {
+ public SortedMap<KeyExtent,Bulk.Files> computeFileToTabletMappings(FileSystem fs, TableId tableId,
+ Path dirPath, Executor executor, ClientContext context, int maxTablets) throws IOException {
KeyExtentCache extentCache = new ConcurrentKeyExtentCache(tableId, context);
@@ -540,21 +551,22 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
CryptoService cs = CryptoServiceFactory.newDefaultInstance();
for (FileStatus fileStatus : files) {
+ Path filePath = fileStatus.getPath();
CompletableFuture<Map<KeyExtent,Bulk.FileInfo>> future = CompletableFuture.supplyAsync(() -> {
try {
long t1 = System.currentTimeMillis();
- List<KeyExtent> extents = findOverlappingTablets(context, extentCache,
- fileStatus.getPath(), fs, fileLensCache, cs);
- Map<KeyExtent,Long> estSizes = estimateSizes(context.getConfiguration(),
- fileStatus.getPath(), fileStatus.getLen(), extents, fs, fileLensCache, cs);
+ List<KeyExtent> extents =
+ findOverlappingTablets(context, extentCache, filePath, fs, fileLensCache, cs);
+ // make sure file isn't going to too many tablets
+ checkTabletCount(maxTablets, extents.size(), filePath.toString());
+ Map<KeyExtent,Long> estSizes = estimateSizes(context.getConfiguration(), filePath,
+ fileStatus.getLen(), extents, fs, fileLensCache, cs);
Map<KeyExtent,Bulk.FileInfo> pathLocations = new HashMap<>();
for (KeyExtent ke : extents) {
- pathLocations.put(ke,
- new Bulk.FileInfo(fileStatus.getPath(), estSizes.getOrDefault(ke, 0L)));
+ pathLocations.put(ke, new Bulk.FileInfo(filePath, estSizes.getOrDefault(ke, 0L)));
}
long t2 = System.currentTimeMillis();
- log.trace("Mapped {} to {} tablets in {}ms", fileStatus.getPath(), pathLocations.size(),
- t2 - t1);
+ log.debug("Mapped {} to {} tablets in {}ms", filePath, pathLocations.size(), t2 - t1);
return pathLocations;
} catch (Exception e) {
throw new CompletionException(e);
@@ -611,4 +623,10 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
return mappings;
}
+
+ private void checkTabletCount(int tabletMaxSize, int tabletCount, String file) {
+ if (tabletMaxSize > 0 && tabletCount > tabletMaxSize)
+ throw new IllegalArgumentException("The file " + file + " attempted to import to "
+ + tabletCount + " tablets. Max tablets allowed set to " + tabletMaxSize);
+ }
}
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index fc83376..494541c 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -715,6 +715,9 @@ public enum Property {
+ " perform specialized parsing of the key. "),
TABLE_BLOOM_HASHTYPE("table.bloom.hash.type", "murmur", PropertyType.STRING,
"The bloom filter hash type"),
+ TABLE_BULK_MAX_TABLETS("table.bulk.max.tablets", "0", PropertyType.COUNT,
+ "The maximum number of tablets allowed for one bulk import file. Value of 0 is Unlimited. "
+ + "This property is only enforced in the new bulk import API"),
TABLE_DURABILITY("table.durability", "sync", PropertyType.DURABILITY,
"The durability used to write to the write-ahead log. Legal values are:"
+ " none, which skips the write-ahead log; log, which sends the data to the"
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
index d949f21..7bc55af 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
@@ -38,10 +38,12 @@ import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize;
import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.fate.FateTxId;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -59,7 +61,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterators;
/**
* Prepare bulk import directory. This REPO creates a bulk directory in Accumulo, list all the files
@@ -108,18 +109,21 @@ public class PrepBulkImport extends MasterRepo {
}
@VisibleForTesting
- static void checkForMerge(String tableId, Iterator<KeyExtent> lmi,
- TabletIterFactory tabletIterFactory) throws Exception {
- KeyExtent currRange = lmi.next();
+ static void checkForMerge(String tableId, LoadMappingIterator lmi,
+ TabletIterFactory tabletIterFactory, int maxNumTablets, long tid) throws Exception {
+ var currRange = lmi.next();
- Text startRow = currRange.getPrevEndRow();
+ Text startRow = currRange.getKey().getPrevEndRow();
Iterator<KeyExtent> tabletIter = tabletIterFactory.newTabletIter(startRow);
KeyExtent currTablet = tabletIter.next();
- if (!tabletIter.hasNext() && equals(KeyExtent::getPrevEndRow, currTablet, currRange)
- && equals(KeyExtent::getEndRow, currTablet, currRange))
+ var fileCounts = new HashMap<String,Integer>();
+ int count;
+
+ if (!tabletIter.hasNext() && equals(KeyExtent::getPrevEndRow, currTablet, currRange.getKey())
+ && equals(KeyExtent::getEndRow, currTablet, currRange.getKey()))
currRange = null;
while (tabletIter.hasNext()) {
@@ -131,20 +135,29 @@ public class PrepBulkImport extends MasterRepo {
currRange = lmi.next();
}
- while (!equals(KeyExtent::getPrevEndRow, currTablet, currRange) && tabletIter.hasNext()) {
+ while (!equals(KeyExtent::getPrevEndRow, currTablet, currRange.getKey())
+ && tabletIter.hasNext()) {
currTablet = tabletIter.next();
}
- boolean matchedPrevRow = equals(KeyExtent::getPrevEndRow, currTablet, currRange);
+ boolean matchedPrevRow = equals(KeyExtent::getPrevEndRow, currTablet, currRange.getKey());
+ count = matchedPrevRow ? 1 : 0;
- while (!equals(KeyExtent::getEndRow, currTablet, currRange) && tabletIter.hasNext()) {
+ while (!equals(KeyExtent::getEndRow, currTablet, currRange.getKey())
+ && tabletIter.hasNext()) {
currTablet = tabletIter.next();
+ count++;
}
- if (!matchedPrevRow || !equals(KeyExtent::getEndRow, currTablet, currRange)) {
+ if (!matchedPrevRow || !equals(KeyExtent::getEndRow, currTablet, currRange.getKey())) {
break;
}
+ if (maxNumTablets > 0) {
+ int fc = count;
+ currRange.getValue()
+ .forEach(fileInfo -> fileCounts.merge(fileInfo.getFileName(), fc, Integer::sum));
+ }
currRange = null;
}
@@ -153,12 +166,27 @@ public class PrepBulkImport extends MasterRepo {
throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT,
TableOperationExceptionType.BULK_CONCURRENT_MERGE, "Concurrent merge happened");
}
+
+ if (maxNumTablets > 0) {
+ fileCounts.values().removeIf(c -> c <= maxNumTablets);
+ if (!fileCounts.isEmpty()) {
+ log.warn("{} Bulk files overlapped too many tablets : {}", FateTxId.formatTid(tid),
+ fileCounts);
+ throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT,
+ TableOperationExceptionType.OTHER, "Files overlap the configured max (" + maxNumTablets
+ + ") number of tablets: " + fileCounts.keySet());
+ }
+ }
}
- private void checkForMerge(final Master master) throws Exception {
+ private void checkForMerge(final long tid, final Master master) throws Exception {
VolumeManager fs = master.getVolumeManager();
final Path bulkDir = new Path(bulkInfo.sourceDir);
+
+ int maxTablets = Integer.parseInt(master.getContext().getTableConfiguration(bulkInfo.tableId)
+ .get(Property.TABLE_BULK_MAX_TABLETS));
+
try (LoadMappingIterator lmi =
BulkSerialize.readLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open)) {
@@ -166,15 +194,14 @@ public class PrepBulkImport extends MasterRepo {
.forTable(bulkInfo.tableId).overlapping(startRow, null).checkConsistency().fetch(PREV_ROW)
.build(master.getContext()).stream().map(TabletMetadata::getExtent).iterator();
- checkForMerge(bulkInfo.tableId.canonical(), Iterators.transform(lmi, Map.Entry::getKey),
- tabletIterFactory);
+ checkForMerge(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets, tid);
}
}
@Override
public Repo<Master> call(final long tid, final Master master) throws Exception {
// now that table lock is acquired check that all splits in load mapping exists in table
- checkForMerge(master);
+ checkForMerge(tid, master);
bulkInfo.tableState = Tables.getTableState(master.getContext(), bulkInfo.tableId);
diff --git a/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java b/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java
index 88a0cc8..4d56526 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java
@@ -21,6 +21,9 @@ package org.apache.accumulo.master.tableOps.bulkVer2;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -28,10 +31,15 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.clientImpl.bulk.Bulk;
+import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize;
+import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.master.tableOps.bulkVer2.PrepBulkImport.TabletIterFactory;
@@ -97,7 +105,28 @@ public class PrepBulkImportTest {
return tabletRanges.subList(start, tabletRanges.size()).iterator();
};
- PrepBulkImport.checkForMerge("1", loadRanges.iterator(), tabletIterFactory);
+ try (LoadMappingIterator lmi = createLoadMappingIter(loadRanges)) {
+ PrepBulkImport.checkForMerge("1", lmi, tabletIterFactory, 100, 10001);
+ }
+ }
+
+ private LoadMappingIterator createLoadMappingIter(List<KeyExtent> loadRanges) throws IOException {
+ SortedMap<KeyExtent,Bulk.Files> mapping = new TreeMap<>();
+ Bulk.Files testFiles = new Bulk.Files();
+
+ long c = 0L;
+ for (String f : "f1 f2 f3".split(" ")) {
+ c++;
+ testFiles.add(new Bulk.FileInfo(f, c, c));
+ }
+ for (KeyExtent ke : loadRanges)
+ mapping.put(ke, testFiles);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BulkSerialize.writeLoadMapping(mapping, "/some/dir", p -> baos);
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ LoadMappingIterator lmi =
+ BulkSerialize.readLoadMapping("/some/dir", TableId.of("1"), p -> bais);
+ return lmi;
}
static String toRangeStrings(Collection<KeyExtent> extents) {
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index a4b2032..a2f925b 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -22,6 +22,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -41,6 +42,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
@@ -50,6 +52,7 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.crypto.CryptoServiceFactory;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
@@ -183,6 +186,32 @@ public class BulkNewIT extends SharedMiniClusterBase {
}
}
+ @Test
+ public void testMaxTablets() throws Exception {
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+ tableName = "testMaxTablets_table1";
+ NewTableConfiguration newTableConf = new NewTableConfiguration();
+ // set logical time type so we can set time on bulk import
+ var props = Map.of(Property.TABLE_BULK_MAX_TABLETS.getKey(), "2");
+ newTableConf.setProperties(props);
+ client.tableOperations().create(tableName, newTableConf);
+
+ // test max tablets hit while inspecting bulk files
+ var thrown = assertThrows(RuntimeException.class, () -> testBulkFileMax(false));
+ var c = thrown.getCause();
+ assertTrue("Wrong exception: " + c, c instanceof ExecutionException);
+ assertTrue("Wrong exception: " + c.getCause(),
+ c.getCause() instanceof IllegalArgumentException);
+ var msg = c.getCause().getMessage();
+ assertTrue("Bad File not in exception: " + msg, msg.contains("bad-file.rf"));
+
+ // test max tablets hit using load plan on the server side
+ c = assertThrows(AccumuloException.class, () -> testBulkFileMax(true));
+ msg = c.getMessage();
+ assertTrue("Bad File not in exception: " + msg, msg.contains("bad-file.rf"));
+ }
+ }
+
private void testSingleTabletSingleFileNoSplits(AccumuloClient c, boolean offline)
throws Exception {
if (offline) {
@@ -313,6 +342,56 @@ public class BulkNewIT extends SharedMiniClusterBase {
}
}
+ private void testBulkFileMax(boolean usePlan) throws Exception {
+ try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+ addSplits(c, tableName, "0333 0666 0999 1333 1666");
+
+ String dir = getDir("/testBulkFileMax-");
+
+ Map<String,Set<String>> hashes = new HashMap<>();
+ for (String endRow : Arrays.asList("0333 0666 0999 1333 1666 null".split(" "))) {
+ hashes.put(endRow, new HashSet<>());
+ }
+
+ // Add a junk file, should be ignored
+ FSDataOutputStream out = fs.create(new Path(dir, "junk"));
+ out.writeChars("ABCDEFG\n");
+ out.close();
+
+ // 1 Tablet 0333-null
+ String h1 = writeData(dir + "/f1.", aconf, 0, 333);
+ hashes.get("0333").add(h1);
+
+ // 3 Tablets 0666-0334, 0999-0667, 1333-1000
+ String h2 = writeData(dir + "/bad-file.", aconf, 334, 1333);
+ hashes.get("0666").add(h2);
+ hashes.get("0999").add(h2);
+ hashes.get("1333").add(h2);
+
+ // 1 Tablet 1666-1334
+ String h3 = writeData(dir + "/f3.", aconf, 1334, 1499);
+ hashes.get("1666").add(h3);
+
+ // 2 Tablets 1666-1334, >1666
+ String h4 = writeData(dir + "/f4.", aconf, 1500, 1999);
+ hashes.get("1666").add(h4);
+ hashes.get("null").add(h4);
+
+ if (usePlan) {
+ LoadPlan loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(333))
+ .loadFileTo("bad-file.rf", RangeType.TABLE, row(333), row(1333))
+ .loadFileTo("f3.rf", RangeType.FILE, row(1334), row(1499))
+ .loadFileTo("f4.rf", RangeType.FILE, row(1500), row(1999)).build();
+ c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load();
+ } else {
+ c.tableOperations().importDirectory(dir).to(tableName).load();
+ }
+
+ verifyData(c, tableName, 0, 1999, false);
+ verifyMetadata(c, tableName, hashes);
+ }
+ }
+
@Test
public void testBulkFile() throws Exception {
testBulkFile(false, false);