You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/06/15 14:50:13 UTC

[GitHub] keith-turner closed pull request #506: fixes #472 Enabled bulk imports into offline table

keith-turner closed pull request #506: fixes #472 Enabled bulk imports into offline table
URL: https://github.com/apache/accumulo/pull/506
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
index 1c09dcd7fe..894b2fc60c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
@@ -676,6 +676,9 @@ void load()
    * lock. The old bulk import method ({@link #importDirectory(String, String, String, boolean)})
    * examines files on the server side while holding a table read lock.
    *
+   * <p>
+   * This API supports adding files to online and offline tables.
+   *
    * @since 2.0.0
    */
   default ImportSourceArguments addFilesTo(String tableName) {
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java
index 0c19cb0c7e..5d6802d351 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java
@@ -50,7 +50,6 @@
 import org.apache.accumulo.core.client.admin.TableOperations.ImportExecutorOptions;
 import org.apache.accumulo.core.client.admin.TableOperations.ImportSourceArguments;
 import org.apache.accumulo.core.client.admin.TableOperations.ImportSourceOptions;
-import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
@@ -72,7 +71,6 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Collections2;
 
 public class BulkImport implements ImportSourceArguments, ImportExecutorOptions {
 
@@ -245,10 +243,15 @@ public MLong(long i) {
     return results;
   }
 
-  public static List<TabletLocation> findOverlappingTablets(ClientContext context,
-      TabletLocator locator, Text startRow, Text endRow, FileSKVIterator reader)
+  public interface KeyExtentCache {
+    KeyExtent lookup(Text row)
+        throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException;
+  }
+
+  public static List<KeyExtent> findOverlappingTablets(ClientContext context,
+      KeyExtentCache extentCache, Text startRow, Text endRow, FileSKVIterator reader)
       throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
-    List<TabletLocation> result = new ArrayList<>();
+    List<KeyExtent> result = new ArrayList<>();
     Collection<ByteSequence> columnFamilies = Collections.emptyList();
     Text row = startRow;
     if (row == null)
@@ -261,10 +264,10 @@ public MLong(long i) {
         break;
       }
       row = reader.getTopKey().getRow();
-      TabletLocation tabletLocation = locator.locateTablet(context, row, false, true);
+      KeyExtent extent = extentCache.lookup(row);
       // log.debug(filename + " found row " + row + " at location " + tabletLocation);
-      result.add(tabletLocation);
-      row = tabletLocation.tablet_extent.getEndRow();
+      result.add(extent);
+      row = extent.getEndRow();
       if (row != null && (endRow == null || row.compareTo(endRow) < 0)) {
         row = new Text(row);
         row.append(byte0, 0, byte0.length);
@@ -275,19 +278,20 @@ public MLong(long i) {
     return result;
   }
 
-  public static List<TabletLocation> findOverlappingTablets(ClientContext context,
-      TabletLocator locator, Path file, FileSystem fs)
+  public static List<KeyExtent> findOverlappingTablets(ClientContext context,
+      KeyExtentCache extentCache, Path file, FileSystem fs)
       throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
     try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
         .forFile(file.toString(), fs, fs.getConf())
         .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) {
-      return findOverlappingTablets(context, locator, null, null, reader);
+      return findOverlappingTablets(context, extentCache, null, null, reader);
     }
   }
 
   public static SortedMap<KeyExtent,Bulk.Files> computeFileToTabletMappings(FileSystem fs,
       Table.ID tableId, Path dirPath, Executor executor, ClientContext context) throws IOException {
-    TabletLocator locator = TabletLocator.getLocator(context, tableId);
+
+    KeyExtentCache extentCache = new ConcurrentKeyExtentCache(tableId, context);
 
     FileStatus[] files = fs.listStatus(dirPath,
         p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING));
@@ -298,14 +302,12 @@ public MLong(long i) {
       CompletableFuture<Map<KeyExtent,Bulk.FileInfo>> future = CompletableFuture.supplyAsync(() -> {
         try {
           long t1 = System.currentTimeMillis();
-          List<TabletLocation> locations = findOverlappingTablets(context, locator,
+          List<KeyExtent> extents = findOverlappingTablets(context, extentCache,
               fileStatus.getPath(), fs);
-          Collection<KeyExtent> extents = Collections2.transform(locations, l -> l.tablet_extent);
           Map<KeyExtent,Long> estSizes = estimateSizes(context.getConfiguration(),
               fileStatus.getPath(), fileStatus.getLen(), extents, fs);
           Map<KeyExtent,Bulk.FileInfo> pathLocations = new HashMap<>();
-          for (TabletLocation location : locations) {
-            KeyExtent ke = location.tablet_extent;
+          for (KeyExtent ke : extents) {
             pathLocations.put(ke,
                 new Bulk.FileInfo(fileStatus.getPath(), estSizes.getOrDefault(ke, 0L)));
           }
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConcurrentKeyExtentCache.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConcurrentKeyExtentCache.java
new file mode 100644
index 0000000000..71dedeeb78
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConcurrentKeyExtentCache.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.BulkImport.KeyExtentCache;
+import org.apache.accumulo.core.client.impl.Table.ID;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.MetadataScanner;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@VisibleForTesting
+class ConcurrentKeyExtentCache implements KeyExtentCache {
+
+  private static final Text MAX = new Text();
+
+  private Set<Text> rowsToLookup = Collections.synchronizedSet(new HashSet<>());
+
+  List<Text> lookupRows = new ArrayList<>();
+
+  private ConcurrentSkipListMap<Text,KeyExtent> extents = new ConcurrentSkipListMap<>((t1, t2) -> {
+    return (t1 == t2) ? 0 : (t1 == MAX ? 1 : (t2 == MAX ? -1 : t1.compareTo(t2)));
+  });
+  private ID tableId;
+  private ClientContext ctx;
+
+  @VisibleForTesting
+  ConcurrentKeyExtentCache(Table.ID tableId, ClientContext ctx) {
+    this.tableId = tableId;
+    this.ctx = ctx;
+  }
+
+  private KeyExtent getFromCache(Text row) {
+    Entry<Text,KeyExtent> entry = extents.ceilingEntry(row);
+    if (entry != null && entry.getValue().contains(row)) {
+      return entry.getValue();
+    }
+
+    return null;
+  }
+
+  private boolean inCache(KeyExtent e) {
+    return Objects.equals(e, extents.get(e.getEndRow() == null ? MAX : e.getEndRow()));
+  }
+
+  @VisibleForTesting
+  protected void updateCache(KeyExtent e) {
+    Text prevRow = e.getPrevEndRow() == null ? new Text() : e.getPrevEndRow();
+    Text endRow = e.getEndRow() == null ? MAX : e.getEndRow();
+    extents.subMap(prevRow, e.getPrevEndRow() == null, endRow, true).clear();
+    extents.put(endRow, e);
+  }
+
+  @VisibleForTesting
+  protected Stream<KeyExtent> lookupExtents(Text row)
+      throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    return MetadataScanner.builder().from(ctx).scanMetadataTable().overRange(tableId, row, null)
+        .checkConsistency().fetchPrev().build().stream().limit(100).map(TabletMetadata::getExtent);
+  }
+
+  @Override
+  public KeyExtent lookup(Text row)
+      throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    while (true) {
+      KeyExtent ke = getFromCache(row);
+      if (ke != null)
+        return ke;
+
+      // If a metadata lookup is currently in progress, then multiple threads can queue up their
+      // rows. The next lookup will process all queued. Processing multiple at once can be more
+      // efficient.
+      rowsToLookup.add(row);
+
+      synchronized (this) {
+        // This check is done to avoid processing rowsToLookup when the current thread's row is in
+        // the cache.
+        ke = getFromCache(row);
+        if (ke != null) {
+          rowsToLookup.remove(row);
+          return ke;
+        }
+
+        lookupRows.clear();
+        synchronized (rowsToLookup) {
+          // Gather all rows that were queued for lookup before this point in time.
+          rowsToLookup.forEach(lookupRows::add);
+          rowsToLookup.clear();
+        }
+        // Lookup rows in the metadata table in sorted order. This could possibly lead to less
+        // metadata lookups.
+        lookupRows.sort(Text::compareTo);
+
+        for (Text lookupRow : lookupRows) {
+          if (getFromCache(lookupRow) == null) {
+            Iterator<KeyExtent> iter = lookupExtents(lookupRow).iterator();
+            while (iter.hasNext()) {
+              KeyExtent ke2 = iter.next();
+              if (inCache(ke2))
+                break;
+              updateCache(ke2);
+            }
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/ConcurrentKeyExtentCacheTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/ConcurrentKeyExtentCacheTest.java
new file mode 100644
index 0000000000..45d9456675
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/ConcurrentKeyExtentCacheTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+public class ConcurrentKeyExtentCacheTest {
+
+  private static List<KeyExtent> extents = new ArrayList<>();
+  private static Set<KeyExtent> extentsSet = new HashSet<>();
+
+  @BeforeClass
+  public static void setupSplits() {
+    Text prev = null;
+    for (int i = 1; i < 256; i++) {
+      Text endRow = new Text(String.format("%02x", i));
+      extents.add(new KeyExtent(Table.ID.of("1"), endRow, prev));
+      prev = endRow;
+    }
+
+    extents.add(new KeyExtent(Table.ID.of("1"), null, prev));
+
+    extentsSet.addAll(extents);
+  }
+
+  private static class TestCache extends ConcurrentKeyExtentCache {
+
+    AtomicInteger updates = new AtomicInteger(0);
+
+    TestCache() {
+      super(null, null);
+    }
+
+    @Override
+    protected void updateCache(KeyExtent e) {
+      super.updateCache(e);
+      updates.incrementAndGet();
+    }
+
+    @Override
+    protected Stream<KeyExtent> lookupExtents(Text row) {
+      int index = -1;
+      for (int i = 0; i < extents.size(); i++) {
+        if (extents.get(i).contains(row)) {
+          index = i;
+          break;
+        }
+      }
+
+      Uninterruptibles.sleepUninterruptibly(3, TimeUnit.MILLISECONDS);
+
+      return extents.subList(index, extents.size()).stream().limit(73);
+    }
+  }
+
+  private void testLookup(TestCache tc, Text lookupRow) {
+    try {
+      KeyExtent extent = tc.lookup(lookupRow);
+      Assert.assertTrue(extent.contains(lookupRow));
+      Assert.assertTrue(extentsSet.contains(extent));
+    } catch (IOException | AccumuloException | AccumuloSecurityException
+        | TableNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testExactEndRows() {
+    Random rand = new Random(42);
+    TestCache tc = new TestCache();
+    rand.ints(10000, 0, 256).mapToObj(i -> new Text(String.format("%02x", i))).sequential()
+        .forEach(lookupRow -> testLookup(tc, lookupRow));
+    Assert.assertEquals(256, tc.updates.get());
+
+    // try parallel
+    TestCache tc2 = new TestCache();
+    rand.ints(10000, 0, 256).mapToObj(i -> new Text(String.format("%02x", i))).parallel()
+        .forEach(lookupRow -> testLookup(tc2, lookupRow));
+    Assert.assertEquals(256, tc2.updates.get());
+  }
+
+  @Test
+  public void testRandom() throws Exception {
+    TestCache tc = new TestCache();
+
+    Random rand = new Random(42);
+    rand.ints(10000).mapToObj(i -> new Text(String.format("%08x", i))).sequential()
+        .forEach(lookupRow -> testLookup(tc, lookupRow));
+    Assert.assertEquals(256, tc.updates.get());
+
+    // try parallel
+    TestCache tc2 = new TestCache();
+    rand.ints(10000).mapToObj(i -> new Text(String.format("%08x", i))).parallel()
+        .forEach(lookupRow -> testLookup(tc2, lookupRow));
+    Assert.assertEquals(256, tc2.updates.get());
+  }
+}
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
index 71abea886a..c2de4dd30d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
@@ -29,6 +29,7 @@
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
@@ -75,7 +76,9 @@ public BulkImportMove(BulkInfo bulkInfo) {
 
     VolumeManager fs = master.getFileSystem();
 
-    ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid);
+    if (bulkInfo.tableState == TableState.ONLINE) {
+      ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid);
+    }
 
     try {
       Map<String,String> oldToNewNameMap = BulkSerialize.readRenameMap(bulkDir.toString(),
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkInfo.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkInfo.java
index 6c94e20f59..657f9af050 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkInfo.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkInfo.java
@@ -19,6 +19,7 @@
 import java.io.Serializable;
 
 import org.apache.accumulo.core.client.impl.Table;
+import org.apache.accumulo.core.master.state.tables.TableState;
 
 /**
  * Package private class to hold all the information used for bulk import2
@@ -30,4 +31,5 @@
   String sourceDir;
   String bulkDir;
   boolean setTime;
+  TableState tableState;
 }
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java
index c912f51417..3fec43e4a5 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CleanUpBulkImport.java
@@ -20,7 +20,7 @@
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.impl.Table;
+import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -37,28 +37,26 @@
 
   private static final Logger log = LoggerFactory.getLogger(CleanUpBulkImport.class);
 
-  private Table.ID tableId;
-  private String source;
-  private String bulk;
+  private BulkInfo info;
 
-  public CleanUpBulkImport(Table.ID tableId, String source, String bulk) {
-    this.tableId = tableId;
-    this.source = source;
-    this.bulk = bulk;
+  public CleanUpBulkImport(BulkInfo info) {
+    this.info = info;
   }
 
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
-    log.debug("removing the bulkDir processing flag file in " + bulk);
-    Path bulkDir = new Path(bulk);
+    log.debug("removing the bulkDir processing flag file in " + info.bulkDir);
+    Path bulkDir = new Path(info.bulkDir);
     MetadataTableUtil.removeBulkLoadInProgressFlag(master,
         "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
-    MetadataTableUtil.addDeleteEntry(master, tableId, bulkDir.toString());
-    log.debug("removing the metadata table markers for loaded files");
-    Connector conn = master.getConnector();
-    MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);
-    Utils.unreserveHdfsDirectory(source, tid);
-    Utils.getReadLock(tableId, tid).unlock();
+    MetadataTableUtil.addDeleteEntry(master, info.tableId, bulkDir.toString());
+    if (info.tableState == TableState.ONLINE) {
+      log.debug("removing the metadata table markers for loaded files");
+      Connector conn = master.getConnector();
+      MetadataTableUtil.removeBulkLoadEntries(conn, info.tableId, tid);
+    }
+    Utils.unreserveHdfsDirectory(info.sourceDir, tid);
+    Utils.getReadLock(info.tableId, tid).unlock();
     // delete json renames and mapping files
     Path renamingFile = new Path(bulkDir, Constants.BULK_RENAME_FILE);
     Path mappingFile = new Path(bulkDir, Constants.BULK_LOAD_MAPPING);
@@ -70,7 +68,9 @@ public CleanUpBulkImport(Table.ID tableId, String source, String bulk) {
     }
 
     log.debug("completing bulkDir import transaction " + tid);
-    ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
+    if (info.tableState == TableState.ONLINE) {
+      ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
+    }
     return null;
   }
 }
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CompleteBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CompleteBulkImport.java
index 02a9848017..ac7a8a5efd 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CompleteBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/CompleteBulkImport.java
@@ -17,7 +17,6 @@
 package org.apache.accumulo.master.tableOps.bulkVer2;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.impl.Table;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -27,19 +26,15 @@
 
   private static final long serialVersionUID = 1L;
 
-  private Table.ID tableId;
-  private String source;
-  private String bulk;
+  private BulkInfo info;
 
-  public CompleteBulkImport(Table.ID tableId, String source, String bulk) {
-    this.tableId = tableId;
-    this.source = source;
-    this.bulk = bulk;
+  public CompleteBulkImport(BulkInfo info) {
+    this.info = info;
   }
 
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
     ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
-    return new CleanUpBulkImport(tableId, source, bulk);
+    return new CleanUpBulkImport(info);
   }
 }
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
index 390a3ea72c..dcf97218bf 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
@@ -16,32 +16,40 @@
  */
 package org.apache.accumulo.master.tableOps.bulkVer2;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.impl.Bulk;
 import org.apache.accumulo.core.client.impl.Bulk.Files;
 import org.apache.accumulo.core.client.impl.BulkSerialize;
 import org.apache.accumulo.core.client.impl.BulkSerialize.LoadMappingIterator;
 import org.apache.accumulo.core.client.impl.Table;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.data.thrift.MapFileInfo;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.MetadataScanner;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.MapCounter;
+import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -52,6 +60,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Make asynchronous load calls to each overlapping Tablet. This RepO does its work on the isReady
  * and will return a linear sleep value based on the largest number of Tablets on a TabletServer.
@@ -84,74 +94,58 @@ public long isReady(long tid, Master master) throws Exception {
 
   @Override
   public Repo<Master> call(final long tid, final Master master) throws Exception {
-    return new CompleteBulkImport(bulkInfo.tableId, bulkInfo.sourceDir, bulkInfo.bulkDir);
-  }
-
-  static boolean equals(Text t1, Text t2) {
-    if (t1 == null || t2 == null)
-      return t1 == t2;
-
-    return t1.equals(t2);
+    if (bulkInfo.tableState == TableState.ONLINE) {
+      return new CompleteBulkImport(bulkInfo);
+    } else {
+      return new CleanUpBulkImport(bulkInfo);
+    }
   }
 
-  /**
-   * Make asynchronous load calls to each overlapping Tablet in the bulk mapping. Return a sleep
-   * time to isReady based on a factor of the TabletServer with the most Tablets. This method will
-   * scan the metadata table getting Tablet range and location information. It will return 0 when
-   * all files have been loaded.
-   */
-  private long loadFiles(Table.ID tableId, Path bulkDir, LoadMappingIterator lmi, Master master,
-      long tid) throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
-
-    Map.Entry<KeyExtent,Bulk.Files> loadMapEntry = lmi.next();
-
-    Text startRow = loadMapEntry.getKey().getPrevEndRow();
-
-    long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
-    Iterator<TabletMetadata> tabletIter = MetadataScanner.builder().from(master).scanMetadataTable()
-        .overRange(tableId, startRow, null).checkConsistency().fetchPrev().fetchLocation()
-        .fetchLoaded().build().iterator();
+  private static abstract class Loader {
+    protected Path bulkDir;
+    protected Master master;
+    protected long tid;
+    protected boolean setTime;
+
+    void start(Path bulkDir, Master master, long tid, boolean setTime) throws Exception {
+      this.bulkDir = bulkDir;
+      this.master = master;
+      this.tid = tid;
+      this.setTime = setTime;
+    }
 
-    List<TabletMetadata> tablets = new ArrayList<>();
-    TabletMetadata currentTablet = tabletIter.next();
-    HostAndPort server = null;
+    abstract void load(List<TabletMetadata> tablets, Files files) throws Exception;
 
-    // track how many tablets were sent load messages per tablet server
-    MapCounter<HostAndPort> tabletsPerServer = new MapCounter<>();
+    abstract long finish() throws Exception;
+  }
 
-    String fmtTid = String.format("%016x", tid);
+  private static class OnlineLoader extends Loader {
 
+    long timeInMillis;
+    String fmtTid;
     int locationLess = 0;
 
-    long t1 = System.currentTimeMillis();
-    while (true) {
-      if (loadMapEntry == null) {
-        if (!lmi.hasNext()) {
-          break;
-        }
-        loadMapEntry = lmi.next();
-      }
-      KeyExtent fke = loadMapEntry.getKey();
-      Files files = loadMapEntry.getValue();
-      loadMapEntry = null;
+    // track how many tablets were sent load messages per tablet server
+    MapCounter<HostAndPort> loadMsgs;
 
-      tablets.clear();
+    @Override
+    void start(Path bulkDir, Master master, long tid, boolean setTime) throws Exception {
+      super.start(bulkDir, master, tid, setTime);
 
-      while (!equals(currentTablet.getPrevEndRow(), fke.getPrevEndRow())) {
-        currentTablet = tabletIter.next();
-      }
-      tablets.add(currentTablet);
+      timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
+      fmtTid = String.format("%016x", tid);
 
-      while (!equals(currentTablet.getEndRow(), fke.getEndRow())) {
-        currentTablet = tabletIter.next();
-        tablets.add(currentTablet);
-      }
+      loadMsgs = new MapCounter<>();
+    }
 
+    @Override
+    void load(List<TabletMetadata> tablets, Files files) {
       for (TabletMetadata tablet : tablets) {
         // send files to tablet sever
         // ideally there should only be one tablet location to send all the files
 
         TabletMetadata.Location location = tablet.getLocation();
+        HostAndPort server = null;
         if (location == null) {
           locationLess++;
           continue;
@@ -174,14 +168,14 @@ private long loadFiles(Table.ID tableId, Path bulkDir, LoadMappingIterator lmi,
         if (thriftImports.size() > 0) {
           // must always increment this even if there is a comms failure, because it indicates there
           // is work to do
-          tabletsPerServer.increment(server, 1);
+          loadMsgs.increment(server, 1);
           log.trace("tid {} asking {} to bulk import {} files", fmtTid, server,
               thriftImports.size());
           TabletClientService.Client client = null;
           try {
             client = ThriftUtil.getTServerClient(server, master, timeInMillis);
-            client.loadFiles(Tracer.traceInfo(), master.rpcCreds(), tid, fke.toThrift(),
-                bulkDir.toString(), thriftImports, bulkInfo.setTime);
+            client.loadFiles(Tracer.traceInfo(), master.rpcCreds(), tid,
+                tablet.getExtent().toThrift(), bulkDir.toString(), thriftImports, setTime);
           } catch (TException ex) {
             log.debug("rpc failed server: " + server + ", tid:" + fmtTid + " " + ex.getMessage(),
                 ex);
@@ -190,20 +184,142 @@ private long loadFiles(Table.ID tableId, Path bulkDir, LoadMappingIterator lmi,
           }
         }
       }
+
     }
-    long t2 = System.currentTimeMillis();
 
-    long sleepTime = 0;
-    if (tabletsPerServer.size() > 0) {
-      // find which tablet server had the most load messages sent to it and sleep 13ms for each load
-      // message
-      sleepTime = Collections.max(tabletsPerServer.values()) * 13;
+    @Override
+    long finish() {
+      long sleepTime = 0;
+      if (loadMsgs.size() > 0) {
+        // find which tablet server had the most load messages sent to it and sleep 13ms for each
+        // load message
+        sleepTime = Collections.max(loadMsgs.values()) * 13;
+      }
+
+      if (locationLess > 0) {
+        sleepTime = Math.max(Math.max(100L, locationLess), sleepTime);
+      }
+
+      return sleepTime;
     }
 
-    if (locationLess > 0) {
-      sleepTime = Math.max(100, Math.max(2 * (t2 - t1), sleepTime));
+  }
+
+  private static class OfflineLoader extends Loader {
+
+    BatchWriter bw;
+
+    // track how many tablets were sent load messages per tablet server
+    MapCounter<HostAndPort> unloadingTablets;
+
+    @Override
+    void start(Path bulkDir, Master master, long tid, boolean setTime) throws Exception {
+      Preconditions.checkArgument(!setTime);
+      super.start(bulkDir, master, tid, setTime);
+      bw = master.getConnector().createBatchWriter(MetadataTable.NAME);
+      unloadingTablets = new MapCounter<>();
     }
 
+    @Override
+    void load(List<TabletMetadata> tablets, Files files) throws MutationsRejectedException {
+      byte[] fam = TextUtil.getBytes(DataFileColumnFamily.NAME);
+
+      for (TabletMetadata tablet : tablets) {
+        if (tablet.getLocation() != null) {
+          unloadingTablets.increment(tablet.getLocation().getHostAndPort(), 1L);
+          continue;
+        }
+
+        Mutation mutation = new Mutation(tablet.getExtent().getMetadataEntry());
+
+        for (final Bulk.FileInfo fileInfo : files) {
+          String fullPath = new Path(bulkDir, fileInfo.getFileName()).toString();
+          byte[] val = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries())
+              .encode();
+          mutation.put(fam, fullPath.getBytes(UTF_8), val);
+        }
+
+        bw.addMutation(mutation);
+      }
+    }
+
+    @Override
+    long finish() throws Exception {
+
+      bw.close();
+
+      long sleepTime = 0;
+      if (unloadingTablets.size() > 0) {
+        // find which tablet server had the most tablets to unload and sleep 13ms for each tablet
+        sleepTime = Collections.max(unloadingTablets.values()) * 13;
+      }
+
+      return sleepTime;
+    }
+  }
+
+  /**
+   * Make asynchronous load calls to each overlapping Tablet in the bulk mapping. Return a sleep
+   * time to isReady based on a factor of the TabletServer with the most Tablets. This method will
+   * scan the metadata table getting Tablet range and location information. It will return 0 when
+   * all files have been loaded.
+   */
+  private long loadFiles(Table.ID tableId, Path bulkDir, LoadMappingIterator lmi, Master master,
+      long tid) throws Exception {
+
+    Map.Entry<KeyExtent,Bulk.Files> loadMapEntry = lmi.next();
+
+    Text startRow = loadMapEntry.getKey().getPrevEndRow();
+
+    Iterator<TabletMetadata> tabletIter = MetadataScanner.builder().from(master).scanMetadataTable()
+        .overRange(tableId, startRow, null).checkConsistency().fetchPrev().fetchLocation()
+        .fetchLoaded().build().iterator();
+
+    List<TabletMetadata> tablets = new ArrayList<>();
+    TabletMetadata currentTablet = tabletIter.next();
+
+    Loader loader;
+    if (bulkInfo.tableState == TableState.ONLINE) {
+      loader = new OnlineLoader();
+    } else {
+      loader = new OfflineLoader();
+    }
+
+    loader.start(bulkDir, master, tid, bulkInfo.setTime);
+
+    long t1 = System.currentTimeMillis();
+    while (true) {
+      if (loadMapEntry == null) {
+        if (!lmi.hasNext()) {
+          break;
+        }
+        loadMapEntry = lmi.next();
+      }
+      KeyExtent fke = loadMapEntry.getKey();
+      Files files = loadMapEntry.getValue();
+      loadMapEntry = null;
+
+      tablets.clear();
+
+      while (!Objects.equals(currentTablet.getPrevEndRow(), fke.getPrevEndRow())) {
+        currentTablet = tabletIter.next();
+      }
+      tablets.add(currentTablet);
+
+      while (!Objects.equals(currentTablet.getEndRow(), fke.getEndRow())) {
+        currentTablet = tabletIter.next();
+        tablets.add(currentTablet);
+      }
+
+      loader.load(tablets, files);
+    }
+    long t2 = System.currentTimeMillis();
+
+    long sleepTime = loader.finish();
+    if (sleepTime > 0) {
+      long scanTime = Math.min(t2 - t1, 30000);
+      sleepTime = Math.max(sleepTime, scanTime * 2);
+    }
     return sleepTime;
   }
 }
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
index bca1389411..b97426ce6c 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
@@ -22,7 +22,9 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException;
@@ -34,7 +36,6 @@
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.metadata.schema.MetadataScanner;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.fate.Repo;
@@ -87,16 +88,8 @@ public long isReady(long tid, Master master) throws Exception {
     if (master.onlineTabletServers().size() == 0)
       return 500;
     Tables.clearCache(master.getInstance());
-    if (Tables.getTableState(master.getInstance(), bulkInfo.tableId) == TableState.ONLINE) {
-      return Utils.reserveHdfsDirectory(bulkInfo.sourceDir, tid);
-    } else {
-      throw new AcceptableThriftTableOperationException(bulkInfo.tableId.canonicalID(), null,
-          TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, null);
-    }
-  }
 
-  static boolean equals(Text t1, Text t2) {
-    return LoadFiles.equals(t1, t2);
+    return Utils.reserveHdfsDirectory(bulkInfo.sourceDir, tid);
   }
 
   @VisibleForTesting
@@ -104,49 +97,52 @@ static boolean equals(Text t1, Text t2) {
     Iterator<KeyExtent> newTabletIter(Text startRow) throws Exception;
   }
 
+  private static boolean equals(Function<KeyExtent,Text> extractor, KeyExtent ke1, KeyExtent ke2) {
+    return Objects.equals(extractor.apply(ke1), extractor.apply(ke2));
+  }
+
   @VisibleForTesting
   static void checkForMerge(String tableId, Iterator<KeyExtent> lmi,
       TabletIterFactory tabletIterFactory) throws Exception {
-    KeyExtent currentRange = lmi.next();
+    KeyExtent currRange = lmi.next();
 
-    Text startRow = currentRange.getPrevEndRow();
+    Text startRow = currRange.getPrevEndRow();
 
     Iterator<KeyExtent> tabletIter = tabletIterFactory.newTabletIter(startRow);
 
-    KeyExtent currentTablet = tabletIter.next();
+    KeyExtent currTablet = tabletIter.next();
 
-    if (!tabletIter.hasNext() && equals(currentTablet.getPrevEndRow(), currentRange.getPrevEndRow())
-        && equals(currentTablet.getEndRow(), currentRange.getEndRow()))
-      currentRange = null;
+    if (!tabletIter.hasNext() && equals(KeyExtent::getPrevEndRow, currTablet, currRange)
+        && equals(KeyExtent::getEndRow, currTablet, currRange))
+      currRange = null;
 
     while (tabletIter.hasNext()) {
 
-      if (currentRange == null) {
+      if (currRange == null) {
         if (!lmi.hasNext()) {
           break;
         }
-        currentRange = lmi.next();
+        currRange = lmi.next();
       }
 
-      while (!equals(currentTablet.getPrevEndRow(), currentRange.getPrevEndRow())
-          && tabletIter.hasNext()) {
-        currentTablet = tabletIter.next();
+      while (!equals(KeyExtent::getPrevEndRow, currTablet, currRange) && tabletIter.hasNext()) {
+        currTablet = tabletIter.next();
       }
 
-      boolean matchedPrevRow = equals(currentTablet.getPrevEndRow(), currentRange.getPrevEndRow());
+      boolean matchedPrevRow = equals(KeyExtent::getPrevEndRow, currTablet, currRange);
 
-      while (!equals(currentTablet.getEndRow(), currentRange.getEndRow()) && tabletIter.hasNext()) {
-        currentTablet = tabletIter.next();
+      while (!equals(KeyExtent::getEndRow, currTablet, currRange) && tabletIter.hasNext()) {
+        currTablet = tabletIter.next();
       }
 
-      if (!matchedPrevRow || !equals(currentTablet.getEndRow(), currentRange.getEndRow())) {
+      if (!matchedPrevRow || !equals(KeyExtent::getEndRow, currTablet, currRange)) {
         break;
       }
 
-      currentRange = null;
+      currRange = null;
     }
 
-    if (currentRange != null || lmi.hasNext()) {
+    if (currRange != null || lmi.hasNext()) {
       // a merge happened between the time the mapping was generated and the table lock was
       // acquired
       throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT,
@@ -181,6 +177,8 @@ private void checkForMerge(final Master master) throws Exception {
     // now that table lock is acquired check that all splits in load mapping exists in table
     checkForMerge(master);
 
+    bulkInfo.tableState = Tables.getTableState(master.getInstance(), bulkInfo.tableId);
+
     VolumeManager fs = master.getFileSystem();
     final UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
     Path sourceDir = new Path(bulkInfo.sourceDir);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
index d97d1f8c1d..7aa0b31633 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
@@ -20,20 +20,35 @@
 import static org.junit.Assert.fail;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.Table;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.metadata.schema.MetadataScanner;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.minicluster.MemoryUnit;
@@ -46,8 +61,13 @@
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
 /**
  * Tests new bulk import technique. If the old technique ever gets removed this will replace
  * {@link BulkFileIT}
@@ -69,82 +89,95 @@ protected int defaultTimeoutSeconds() {
     return 4 * 60;
   }
 
-  @Test
-  public void testSingleTabletSingleFile() throws Exception {
+  private String tableName;
+  private AccumuloConfiguration aconf;
+  private FileSystem fs;
+  private String rootPath;
+
+  @Before
+  public void setupBulkTest() throws Exception {
     Connector c = getConnector();
-    String tableName = getUniqueNames(1)[0];
+    tableName = getUniqueNames(1)[0];
     c.tableOperations().create(tableName);
-    addSplits(tableName, "0333");
+    aconf = new ServerConfigurationFactory(c.getInstance()).getSystemConfiguration();
+    fs = getCluster().getFileSystem();
+    rootPath = cluster.getTemporaryPath().toString();
+  }
 
-    Configuration conf = new Configuration();
-    AccumuloConfiguration aconf = new ServerConfigurationFactory(c.getInstance())
-        .getSystemConfiguration();
-    FileSystem fs = getCluster().getFileSystem();
-    String rootPath = cluster.getTemporaryPath().toString();
+  private String getDir(String testName) throws Exception {
+    String dir = rootPath + testName + getUniqueNames(1)[0];
+    fs.delete(new Path(dir), true);
+    return dir;
+  }
 
-    String dir = rootPath + "/testSingleTabletSingleFileNoSplits-" + getUniqueNames(1)[0];
-    Path bulkDir = new Path(dir);
+  private void testSingleTabletSingleFile(boolean offline) throws Exception {
+    Connector c = getConnector();
+    addSplits(tableName, "0333");
+
+    if (offline)
+      c.tableOperations().offline(tableName);
 
-    fs.delete(bulkDir, true);
+    String dir = getDir("/testSingleTabletSingleFileNoSplits-");
 
-    FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f1." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer1.startDefaultLocalityGroup();
-    writeData(writer1, 0, 332);
-    writer1.close();
+    String h1 = writeData(dir + "/f1.", aconf, 0, 332);
 
     c.tableOperations().addFilesTo(tableName).from(dir).load();
+
+    if (offline)
+      c.tableOperations().online(tableName);
+
+    verifyData(tableName, 0, 332);
+    verifyMetadata(tableName,
+        ImmutableMap.of("0333", ImmutableSet.of(h1), "null", ImmutableSet.of()));
   }
 
   @Test
-  public void testSingleTabletSingleFileNoSplits() throws Exception {
-    Connector c = getConnector();
-    String tableName = getUniqueNames(1)[0];
-    c.tableOperations().create(tableName);
+  public void testSingleTabletSingleFile() throws Exception {
+    testSingleTabletSingleFile(false);
+  }
 
-    Configuration conf = new Configuration();
-    AccumuloConfiguration aconf = new ServerConfigurationFactory(c.getInstance())
-        .getSystemConfiguration();
-    FileSystem fs = getCluster().getFileSystem();
-    String rootPath = cluster.getTemporaryPath().toString();
+  @Test
+  public void testSingleTabletSingleFileOffline() throws Exception {
+    testSingleTabletSingleFile(true);
+  }
 
-    String dir = rootPath + "/testSingleTabletSingleFileNoSplits-" + getUniqueNames(1)[0];
-    Path bulkDir = new Path(dir);
+  private void testSingleTabletSingleFileNoSplits(boolean offline) throws Exception {
+    Connector c = getConnector();
 
-    fs.delete(bulkDir, true);
+    if (offline)
+      c.tableOperations().offline(tableName);
 
-    FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f1." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer1.startDefaultLocalityGroup();
-    writeData(writer1, 0, 333);
-    writer1.close();
+    String dir = getDir("/testSingleTabletSingleFileNoSplits-");
+
+    String h1 = writeData(dir + "/f1.", aconf, 0, 333);
 
     c.tableOperations().addFilesTo(tableName).from(dir).load();
+
+    if (offline)
+      c.tableOperations().online(tableName);
+
+    verifyData(tableName, 0, 333);
+    verifyMetadata(tableName, ImmutableMap.of("null", ImmutableSet.of(h1)));
+  }
+
+  @Test
+  public void testSingleTabletSingleFileNoSplits() throws Exception {
+    testSingleTabletSingleFileNoSplits(false);
+  }
+
+  @Test
+  public void testSingleTabletSingleFileNoSplitsOffline() throws Exception {
+    testSingleTabletSingleFileNoSplits(true);
   }
 
   @Test
   public void testBadPermissions() throws Exception {
     Connector c = getConnector();
-    String tableName = getUniqueNames(1)[0];
-    c.tableOperations().create(tableName);
     addSplits(tableName, "0333");
 
-    Configuration conf = new Configuration();
-    AccumuloConfiguration aconf = new ServerConfigurationFactory(c.getInstance())
-        .getSystemConfiguration();
-    FileSystem fs = getCluster().getFileSystem();
-    String rootPath = cluster.getTemporaryPath().toString();
-
-    String dir = rootPath + "/testBadPermissions-" + getUniqueNames(1)[0];
-    Path bulkDir = new Path(dir);
-
-    fs.delete(bulkDir, true);
+    String dir = getDir("/testBadPermissions-");
 
-    FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f1." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer1.startDefaultLocalityGroup();
-    writeData(writer1, 0, 333);
-    writer1.close();
+    writeData(dir + "/f1.", aconf, 0, 333);
 
     Path rFilePath = new Path(dir, "f1." + RFile.EXTENSION);
     FsPermission originalPerms = fs.getFileStatus(rFilePath).getPermission();
@@ -160,70 +193,68 @@ public void testBadPermissions() throws Exception {
       fs.setPermission(rFilePath, originalPerms);
     }
 
-    originalPerms = fs.getFileStatus(bulkDir).getPermission();
-    fs.setPermission(bulkDir, FsPermission.valueOf("dr--r--r--"));
+    originalPerms = fs.getFileStatus(new Path(dir)).getPermission();
+    fs.setPermission(new Path(dir), FsPermission.valueOf("dr--r--r--"));
     try {
       c.tableOperations().addFilesTo(tableName).from(dir).load();
     } catch (AccumuloException ae) {
       if (!(ae.getCause() instanceof FileNotFoundException))
         fail("Expected FileNotFoundException but threw " + ae.getCause());
     } finally {
-      fs.setPermission(bulkDir, originalPerms);
+      fs.setPermission(new Path(dir), originalPerms);
     }
   }
 
-  @Test
-  public void testBulkFile() throws Exception {
+  private void testBulkFile(boolean offline) throws Exception {
     Connector c = getConnector();
-    String tableName = getUniqueNames(1)[0];
-    c.tableOperations().create(tableName);
     addSplits(tableName, "0333 0666 0999 1333 1666");
 
-    Configuration conf = new Configuration();
-    AccumuloConfiguration aconf = new ServerConfigurationFactory(c.getInstance())
-        .getSystemConfiguration();
-    FileSystem fs = getCluster().getFileSystem();
-
-    String rootPath = cluster.getTemporaryPath().toString();
+    if (offline)
+      c.tableOperations().offline(tableName);
 
-    String dir = rootPath + "/testBulkFile-" + getUniqueNames(1)[0];
+    String dir = getDir("/testBulkFile-");
 
-    fs.delete(new Path(dir), true);
+    Map<String,Set<String>> hashes = new HashMap<>();
+    for (String endRow : Arrays.asList("0333 0666 0999 1333 1666 null".split(" "))) {
+      hashes.put(endRow, new HashSet<>());
+    }
 
-    // TODO verify that data gets loaded in appropriate Tablets
     // 1 Tablet 0333-null
-    FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f1." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer1.startDefaultLocalityGroup();
-    writeData(writer1, 0, 333);
-    writer1.close();
+    String h1 = writeData(dir + "/f1.", aconf, 0, 333);
+    hashes.get("0333").add(h1);
 
     // 2 Tablets 0666-0334, 0999-0667
-    FileSKVWriter writer2 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f2." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer2.startDefaultLocalityGroup();
-    writeData(writer2, 334, 999);
-    writer2.close();
+    String h2 = writeData(dir + "/f2.", aconf, 334, 999);
+    hashes.get("0666").add(h2);
+    hashes.get("0999").add(h2);
 
     // 2 Tablets 1333-1000, 1666-1334
-    FileSKVWriter writer3 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f3." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer3.startDefaultLocalityGroup();
-    // writeData(writer3, 1000, 1999);
-    writeData(writer3, 1000, 1499);
-    writer3.close();
+    String h3 = writeData(dir + "/f3.", aconf, 1000, 1499);
+    hashes.get("1333").add(h3);
+    hashes.get("1666").add(h3);
 
     // 2 Tablets 1666-1334, >1666
-    FileSKVWriter writer4 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f4." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer4.startDefaultLocalityGroup();
-    writeData(writer4, 1500, 1999);
-    writer4.close();
+    String h4 = writeData(dir + "/f4.", aconf, 1500, 1999);
+    hashes.get("1666").add(h4);
+    hashes.get("null").add(h4);
 
-    // TODO test c.tableOperations().offline(tableName, true);
     c.tableOperations().addFilesTo(tableName).from(dir).load();
 
+    if (offline)
+      c.tableOperations().online(tableName);
+
     verifyData(tableName, 0, 1999);
+    verifyMetadata(tableName, hashes);
+  }
+
+  @Test
+  public void testBulkFile() throws Exception {
+    testBulkFile(false);
+  }
+
+  @Test
+  public void testBulkFileOffline() throws Exception {
+    testBulkFile(true);
   }
 
   private void addSplits(String tableName, String splitString) throws Exception {
@@ -258,11 +289,55 @@ private void verifyData(String table, int s, int e) throws Exception {
     }
   }
 
-  private void writeData(FileSKVWriter w, int s, int e) throws Exception {
-    for (int i = s; i <= e; i++) {
-      w.append(new Key(new Text(String.format("%04d", i))),
-          new Value(Integer.toString(i).getBytes(UTF_8)));
+  private void verifyMetadata(String tableName, Map<String,Set<String>> expectedHashes)
+      throws Exception {
+
+    Set<String> endRowsSeen = new HashSet<>();
+
+    String id = getConnector().tableOperations().tableIdMap().get(tableName);
+    try (
+        MetadataScanner scanner = MetadataScanner.builder().from(getConnector()).scanMetadataTable()
+            .overRange(Table.ID.of(id)).fetchFiles().fetchLoaded().fetchPrev().build()) {
+      for (TabletMetadata tablet : scanner) {
+        Assert.assertTrue(tablet.getLoaded().isEmpty());
+
+        Set<String> fileHashes = tablet.getFiles().stream().map(f -> hash(f))
+            .collect(Collectors.toSet());
+
+        String endRow = tablet.getEndRow() == null ? "null" : tablet.getEndRow().toString();
+
+        Assert.assertEquals(expectedHashes.get(endRow), fileHashes);
+
+        endRowsSeen.add(endRow);
+      }
+
+      Assert.assertEquals(expectedHashes.keySet(), endRowsSeen);
     }
   }
 
+  private String hash(String filename) {
+    try {
+      byte data[] = Files.readAllBytes(Paths.get(filename.replaceFirst("^file:", "")));
+      byte hash[] = MessageDigest.getInstance("SHA1").digest(data);
+      return new BigInteger(1, hash).toString(16);
+    } catch (IOException | NoSuchAlgorithmException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private String writeData(String file, AccumuloConfiguration aconf, int s, int e)
+      throws Exception {
+    FileSystem fs = getCluster().getFileSystem();
+    String filename = file + RFile.EXTENSION;
+    try (FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder()
+        .forFile(filename, fs, fs.getConf()).withTableConfiguration(aconf).build()) {
+      writer.startDefaultLocalityGroup();
+      for (int i = s; i <= e; i++) {
+        writer.append(new Key(new Text(String.format("%04d", i))),
+            new Value(Integer.toString(i).getBytes(UTF_8)));
+      }
+    }
+
+    return hash(filename);
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services