You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/06/07 23:03:35 UTC

[GitHub] ctubbsii commented on a change in pull request #506: fixes #472 Enabled bulk imports into offline table

ctubbsii commented on a change in pull request #506: fixes #472 Enabled bulk imports into offline table
URL: https://github.com/apache/accumulo/pull/506#discussion_r193911306
 
 

 ##########
 File path: server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
 ##########
 @@ -190,20 +183,142 @@ private long loadFiles(Table.ID tableId, Path bulkDir, LoadMappingIterator lmi,
           }
         }
       }
+
     }
-    long t2 = System.currentTimeMillis();
 
-    long sleepTime = 0;
-    if (tabletsPerServer.size() > 0) {
-      // find which tablet server had the most load messages sent to it and sleep 13ms for each load
-      // message
-      sleepTime = Collections.max(tabletsPerServer.values()) * 13;
+    @Override
+    long finish() {
+      long sleepTime = 0;
+      if (loadMsgs.size() > 0) {
+        // find which tablet server had the most load messages sent to it and sleep 13ms for each
+        // load message
+        sleepTime = Collections.max(loadMsgs.values()) * 13;
+      }
+
+      if (locationLess > 0) {
+        sleepTime = Math.max(Math.max(100L, locationLess), sleepTime);
+      }
+
+      return sleepTime;
     }
 
-    if (locationLess > 0) {
-      sleepTime = Math.max(100, Math.max(2 * (t2 - t1), sleepTime));
+  }
+
+  private static class OfflineLoader extends Loader {
+
+    BatchWriter bw;
+
+    // track how many tablets were sent load messages per tablet server
+    MapCounter<HostAndPort> unloadingTablets;
+
+    @Override
+    void start(Path bulkDir, Master master, long tid, boolean setTime) throws Exception {
+      Preconditions.checkArgument(!setTime);
+      super.start(bulkDir, master, tid, setTime);
+      bw = master.getConnector().createBatchWriter(MetadataTable.NAME);
+      unloadingTablets = new MapCounter<>();
     }
 
+    @Override
+    void load(List<TabletMetadata> tablets, Files files) throws Exception {
+      byte[] fam = TextUtil.getBytes(DataFileColumnFamily.NAME);
+
+      for (TabletMetadata tablet : tablets) {
+        if (tablet.getLocation() != null) {
+          unloadingTablets.increment(tablet.getLocation().getHostAndPort(), 1L);
+          continue;
+        }
+
+        Mutation mutation = new Mutation(tablet.getExtent().getMetadataEntry());
+
+        for (final Bulk.FileInfo fileInfo : files) {
+          String fullPath = new Path(bulkDir, fileInfo.getFileName()).toString();
+          byte[] val = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries())
+              .encode();
+          mutation.put(fam, fullPath.getBytes(UTF_8), val);
+        }
+
+        bw.addMutation(mutation);
+      }
+    }
+
+    @Override
+    long finish() throws Exception {
+
+      bw.close();
+
+      long sleepTime = 0;
+      if (unloadingTablets.size() > 0) {
+        // find which tablet server had the most tablets to unload and sleep 13ms for each tablet
+        sleepTime = Collections.max(unloadingTablets.values()) * 13;
+      }
+
+      return sleepTime;
+    }
+  }
+
+  /**
+   * Make asynchronous load calls to each overlapping Tablet in the bulk mapping. Return a sleep
+   * time to isReady based on a factor of the TabletServer with the most Tablets. This method will
+   * scan the metadata table getting Tablet range and location information. It will return 0 when
+   * all files have been loaded.
+   */
+  private long loadFiles(Table.ID tableId, Path bulkDir, LoadMappingIterator lmi, Master master,
+      long tid) throws Exception {
+
+    Map.Entry<KeyExtent,Bulk.Files> loadMapEntry = lmi.next();
+
+    Text startRow = loadMapEntry.getKey().getPrevEndRow();
+
+    Iterator<TabletMetadata> tabletIter = MetadataScanner.builder().from(master).scanMetadataTable()
+        .overRange(tableId, startRow, null).checkConsistency().fetchPrev().fetchLocation()
+        .fetchLoaded().build().iterator();
+
+    List<TabletMetadata> tablets = new ArrayList<>();
+    TabletMetadata currentTablet = tabletIter.next();
+
+    Loader loader;
+    if (bulkInfo.tableState == TableState.ONLINE) {
+      loader = new OnlineLoader();
+    } else {
+      loader = new OfflineLoader();
+    }
+
+    loader.start(bulkDir, master, tid, bulkInfo.setTime);
+
+    long t1 = System.currentTimeMillis();
+    while (true) {
+      if (loadMapEntry == null) {
+        if (!lmi.hasNext()) {
+          break;
+        }
+        loadMapEntry = lmi.next();
+      }
+      KeyExtent fke = loadMapEntry.getKey();
+      Files files = loadMapEntry.getValue();
+      loadMapEntry = null;
+
+      tablets.clear();
+
+      while (!Objects.equals(currentTablet.getPrevEndRow(), fke.getPrevEndRow())) {
+        currentTablet = tabletIter.next();
+      }
+      tablets.add(currentTablet);
+
+      while (!Objects.equals(currentTablet.getEndRow(), fke.getEndRow())) {
+        currentTablet = tabletIter.next();
+        tablets.add(currentTablet);
+      }
+
+      loader.load(tablets, files);
+    }
+    long t2 = System.currentTimeMillis();
+
+    long sleepTime = loader.finish();
+    if (sleepTime > 0) {
+      long scanTime = Math.min(t2 - t1, 30000);
+      sleepTime = Math.max(sleepTime, scanTime * 2);
 
 Review comment:
   scanTime can be negative because `System.currentTimeMillis` are used instead of `System.nanoTime`; that could cause some unexpected behavior here. Probably unlikely, though... and worst case seems to just use wait longer than necessary.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services