You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/08/24 05:59:32 UTC
[06/12] accumulo git commit: Merge branch '1.5' into 1.6
Merge branch '1.5' into 1.6
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/79245e1e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/79245e1e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/79245e1e
Branch: refs/heads/1.6
Commit: 79245e1e8824f7e7e41b7be121a0d9e47ec3a401
Parents: dcd27de 5cd2063
Author: Josh Elser <el...@apache.org>
Authored: Sun Aug 23 17:54:52 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Sun Aug 23 17:54:52 2015 -0400
----------------------------------------------------------------------
.../accumulo/server/client/BulkImporter.java | 14 ++++++++++---
.../server/client/BulkImporterTest.java | 21 ++++++++++++++++++++
2 files changed, 32 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79245e1e/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index 59bc09d,0000000..0c77367
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@@ -1,783 -1,0 +1,791 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.ServerClient;
+import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.Translators;
+import org.apache.accumulo.core.client.impl.thrift.ClientService;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.accumulo.core.util.StopWatch;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.util.FileUtil;
+import org.apache.accumulo.trace.instrument.TraceRunnable;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TServiceClient;
+
+public class BulkImporter {
+
+ private static final Logger log = Logger.getLogger(BulkImporter.class);
+
+ public static List<String> bulkLoad(AccumuloConfiguration conf, Instance instance, Credentials creds, long tid, String tableId, List<String> files,
+ String errorDir, boolean setTime) throws IOException, AccumuloException, AccumuloSecurityException, ThriftTableOperationException {
+ AssignmentStats stats = new BulkImporter(conf, instance, creds, tid, tableId, setTime).importFiles(files, new Path(errorDir));
+ List<String> result = new ArrayList<String>();
+ for (Path p : stats.completeFailures.keySet()) {
+ result.add(p.toString());
+ }
+ return result;
+ }
+
+ private StopWatch<Timers> timer;
+
+ private static enum Timers {
+ EXAMINE_MAP_FILES, QUERY_METADATA, IMPORT_MAP_FILES, SLEEP, TOTAL
+ }
+
+ private Instance instance;
+ private Credentials credentials;
+ private String tableId;
+ private long tid;
+ private AccumuloConfiguration acuConf;
+ private boolean setTime;
+
+ public BulkImporter(AccumuloConfiguration conf, Instance instance, Credentials credentials, long tid, String tableId, boolean setTime) {
+ this.instance = instance;
+ this.credentials = credentials;
+ this.tid = tid;
+ this.tableId = tableId;
+ this.acuConf = conf;
+ this.setTime = setTime;
+ }
+
+ public AssignmentStats importFiles(List<String> files, Path failureDir) throws IOException, AccumuloException, AccumuloSecurityException,
+ ThriftTableOperationException {
+
+ int numThreads = acuConf.getCount(Property.TSERV_BULK_PROCESS_THREADS);
+ int numAssignThreads = acuConf.getCount(Property.TSERV_BULK_ASSIGNMENT_THREADS);
+
+ timer = new StopWatch<Timers>(Timers.class);
+ timer.start(Timers.TOTAL);
+
+ Configuration conf = CachedConfiguration.getInstance();
+ VolumeManagerImpl.get(acuConf);
+ final VolumeManager fs = VolumeManagerImpl.get(acuConf);
+
+ Set<Path> paths = new HashSet<Path>();
+ for (String file : files) {
+ paths.add(new Path(file));
+ }
+ AssignmentStats assignmentStats = new AssignmentStats(paths.size());
+
+ final Map<Path,List<KeyExtent>> completeFailures = Collections.synchronizedSortedMap(new TreeMap<Path,List<KeyExtent>>());
+
+ ClientService.Client client = null;
+ final TabletLocator locator = TabletLocator.getLocator(instance, new Text(tableId));
+
+ try {
+ final Map<Path,List<TabletLocation>> assignments = Collections.synchronizedSortedMap(new TreeMap<Path,List<TabletLocation>>());
+
+ timer.start(Timers.EXAMINE_MAP_FILES);
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("findOverlapping"));
+
+ for (Path path : paths) {
+ final Path mapFile = path;
+ Runnable getAssignments = new Runnable() {
+ @Override
+ public void run() {
+ List<TabletLocation> tabletsToAssignMapFileTo = Collections.emptyList();
+ try {
+ tabletsToAssignMapFileTo = findOverlappingTablets(ServerConfiguration.getSystemConfiguration(instance), fs, locator, mapFile, credentials);
+ } catch (Exception ex) {
+ log.warn("Unable to find tablets that overlap file " + mapFile.toString(), ex);
+ }
+ log.debug("Map file " + mapFile + " found to overlap " + tabletsToAssignMapFileTo.size() + " tablets");
+ if (tabletsToAssignMapFileTo.size() == 0) {
+ List<KeyExtent> empty = Collections.emptyList();
+ completeFailures.put(mapFile, empty);
+ } else
+ assignments.put(mapFile, tabletsToAssignMapFileTo);
+ }
+ };
+ threadPool.submit(new TraceRunnable(new LoggingRunnable(log, getAssignments)));
+ }
+ threadPool.shutdown();
+ while (!threadPool.isTerminated()) {
+ try {
+ threadPool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ timer.stop(Timers.EXAMINE_MAP_FILES);
+
+ assignmentStats.attemptingAssignments(assignments);
+ Map<Path,List<KeyExtent>> assignmentFailures = assignMapFiles(acuConf, instance, conf, credentials, fs, tableId, assignments, paths, numAssignThreads,
+ numThreads);
+ assignmentStats.assignmentsFailed(assignmentFailures);
+
+ Map<Path,Integer> failureCount = new TreeMap<Path,Integer>();
+
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet())
+ failureCount.put(entry.getKey(), 1);
+
+ long sleepTime = 2 * 1000;
+ while (assignmentFailures.size() > 0) {
+ sleepTime = Math.min(sleepTime * 2, 60 * 1000);
+ locator.invalidateCache();
+ // assumption about assignment failures is that it caused by a split
+ // happening or a missing location
+ //
+ // for splits we need to find children key extents that cover the
+ // same key range and are contiguous (no holes, no overlap)
+
+ timer.start(Timers.SLEEP);
+ UtilWaitThread.sleep(sleepTime);
+ timer.stop(Timers.SLEEP);
+
+ log.debug("Trying to assign " + assignmentFailures.size() + " map files that previously failed on some key extents");
+ assignments.clear();
+
+ // for failed key extents, try to find children key extents to
+ // assign to
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
+ Iterator<KeyExtent> keListIter = entry.getValue().iterator();
+
+ List<TabletLocation> tabletsToAssignMapFileTo = new ArrayList<TabletLocation>();
+
+ while (keListIter.hasNext()) {
+ KeyExtent ke = keListIter.next();
+
+ try {
+ timer.start(Timers.QUERY_METADATA);
+ tabletsToAssignMapFileTo.addAll(findOverlappingTablets(ServerConfiguration.getSystemConfiguration(instance), fs, locator, entry.getKey(), ke,
+ credentials));
+ timer.stop(Timers.QUERY_METADATA);
+ keListIter.remove();
+ } catch (Exception ex) {
+ log.warn("Exception finding overlapping tablets, will retry tablet " + ke, ex);
+ }
+ }
+
+ if (tabletsToAssignMapFileTo.size() > 0)
+ assignments.put(entry.getKey(), tabletsToAssignMapFileTo);
+ }
+
+ assignmentStats.attemptingAssignments(assignments);
+ Map<Path,List<KeyExtent>> assignmentFailures2 = assignMapFiles(acuConf, instance, conf, credentials, fs, tableId, assignments, paths, numAssignThreads,
+ numThreads);
+ assignmentStats.assignmentsFailed(assignmentFailures2);
+
+ // merge assignmentFailures2 into assignmentFailures
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures2.entrySet()) {
+ assignmentFailures.get(entry.getKey()).addAll(entry.getValue());
+
+ Integer fc = failureCount.get(entry.getKey());
+ if (fc == null)
+ fc = 0;
+
+ failureCount.put(entry.getKey(), fc + 1);
+ }
+
+ // remove map files that have no more key extents to assign
+ Iterator<Entry<Path,List<KeyExtent>>> afIter = assignmentFailures.entrySet().iterator();
+ while (afIter.hasNext()) {
+ Entry<Path,List<KeyExtent>> entry = afIter.next();
+ if (entry.getValue().size() == 0)
+ afIter.remove();
+ }
+
+ Set<Entry<Path,Integer>> failureIter = failureCount.entrySet();
+ for (Entry<Path,Integer> entry : failureIter) {
+ int retries = acuConf.getCount(Property.TSERV_BULK_RETRY);
+ if (entry.getValue() > retries && assignmentFailures.get(entry.getKey()) != null) {
+ log.error("Map file " + entry.getKey() + " failed more than " + retries + " times, giving up.");
+ completeFailures.put(entry.getKey(), assignmentFailures.get(entry.getKey()));
+ assignmentFailures.remove(entry.getKey());
+ }
+ }
+ }
+ assignmentStats.assignmentsAbandoned(completeFailures);
+ Set<Path> failedFailures = processFailures(completeFailures);
+ assignmentStats.unrecoveredMapFiles(failedFailures);
+
+ timer.stop(Timers.TOTAL);
+ printReport(paths);
+ return assignmentStats;
+ } finally {
+ if (client != null) {
+ ServerClient.close(client);
+ }
+ }
+ }
+
+ private void printReport(Set<Path> paths) {
+ long totalTime = 0;
+ for (Timers t : Timers.values()) {
+ if (t == Timers.TOTAL)
+ continue;
+
+ totalTime += timer.get(t);
+ }
+ List<String> files = new ArrayList<String>();
+ for (Path path : paths) {
+ files.add(path.getName());
+ }
+ Collections.sort(files);
+
+ log.debug("BULK IMPORT TIMING STATISTICS");
+ log.debug("Files: " + files);
+ log.debug(String.format("Examine map files : %,10.2f secs %6.2f%s", timer.getSecs(Timers.EXAMINE_MAP_FILES), 100.0 * timer.get(Timers.EXAMINE_MAP_FILES)
+ / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Query %-14s : %,10.2f secs %6.2f%s", MetadataTable.NAME, timer.getSecs(Timers.QUERY_METADATA),
+ 100.0 * timer.get(Timers.QUERY_METADATA) / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Import Map Files : %,10.2f secs %6.2f%s", timer.getSecs(Timers.IMPORT_MAP_FILES), 100.0 * timer.get(Timers.IMPORT_MAP_FILES)
+ / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Sleep : %,10.2f secs %6.2f%s", timer.getSecs(Timers.SLEEP),
+ 100.0 * timer.get(Timers.SLEEP) / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Misc : %,10.2f secs %6.2f%s", (timer.get(Timers.TOTAL) - totalTime) / 1000.0, 100.0
+ * (timer.get(Timers.TOTAL) - totalTime) / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Total : %,10.2f secs", timer.getSecs(Timers.TOTAL)));
+ }
+
+ private Set<Path> processFailures(Map<Path,List<KeyExtent>> completeFailures) {
+ // we should check if map file was not assigned to any tablets, then we
+ // should just move it; not currently being done?
+
+ Set<Entry<Path,List<KeyExtent>>> es = completeFailures.entrySet();
+
+ if (completeFailures.size() == 0)
+ return Collections.emptySet();
+
+ log.debug("The following map files failed ");
+
+ for (Entry<Path,List<KeyExtent>> entry : es) {
+ List<KeyExtent> extents = entry.getValue();
+
+ for (KeyExtent keyExtent : extents)
+ log.debug("\t" + entry.getKey() + " -> " + keyExtent);
+ }
+
+ return Collections.emptySet();
+ }
+
+ private class AssignmentInfo {
+ public AssignmentInfo(KeyExtent keyExtent, Long estSize) {
+ this.ke = keyExtent;
+ this.estSize = estSize;
+ }
+
+ KeyExtent ke;
+ long estSize;
+ }
+
+ private static List<KeyExtent> extentsOf(List<TabletLocation> locations) {
+ List<KeyExtent> result = new ArrayList<KeyExtent>(locations.size());
+ for (TabletLocation tl : locations)
+ result.add(tl.tablet_extent);
+ return result;
+ }
+
+ private Map<Path,List<AssignmentInfo>> estimateSizes(final AccumuloConfiguration acuConf, final Configuration conf, final VolumeManager vm,
+ Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads) {
+
+ long t1 = System.currentTimeMillis();
+ final Map<Path,Long> mapFileSizes = new TreeMap<Path,Long>();
+
+ try {
+ for (Path path : paths) {
+ FileSystem fs = vm.getVolumeByPath(path).getFileSystem();
+ mapFileSizes.put(path, fs.getContentSummary(path).getLength());
+ }
+ } catch (IOException e) {
+ log.error("Failed to get map files in for " + paths + ": " + e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+
+ final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new TreeMap<Path,List<AssignmentInfo>>());
+
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("estimateSizes"));
+
+ for (final Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
+ if (entry.getValue().size() == 1) {
+ TabletLocation tabletLocation = entry.getValue().get(0);
+
+ // if the tablet completely contains the map file, there is no
+ // need to estimate its
+ // size
+ ais.put(entry.getKey(), Collections.singletonList(new AssignmentInfo(tabletLocation.tablet_extent, mapFileSizes.get(entry.getKey()))));
+ continue;
+ }
+
+ Runnable estimationTask = new Runnable() {
+ @Override
+ public void run() {
+ Map<KeyExtent,Long> estimatedSizes = null;
+
+ try {
+ estimatedSizes = FileUtil.estimateSizes(acuConf, entry.getKey(), mapFileSizes.get(entry.getKey()), extentsOf(entry.getValue()), conf, vm);
+ } catch (IOException e) {
+ log.warn("Failed to estimate map file sizes " + e.getMessage());
+ }
+
+ if (estimatedSizes == null) {
+ // estimation failed, do a simple estimation
+ estimatedSizes = new TreeMap<KeyExtent,Long>();
+ long estSize = (long) (mapFileSizes.get(entry.getKey()) / (double) entry.getValue().size());
+ for (TabletLocation tl : entry.getValue())
+ estimatedSizes.put(tl.tablet_extent, estSize);
+ }
+
+ List<AssignmentInfo> assignmentInfoList = new ArrayList<AssignmentInfo>(estimatedSizes.size());
+
+ for (Entry<KeyExtent,Long> entry2 : estimatedSizes.entrySet())
+ assignmentInfoList.add(new AssignmentInfo(entry2.getKey(), entry2.getValue()));
+
+ ais.put(entry.getKey(), assignmentInfoList);
+ }
+ };
+
+ threadPool.submit(new TraceRunnable(new LoggingRunnable(log, estimationTask)));
+ }
+
+ threadPool.shutdown();
+
+ while (!threadPool.isTerminated()) {
+ try {
+ threadPool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ long t2 = System.currentTimeMillis();
+
+ log.debug(String.format("Estimated map files sizes in %6.2f secs", (t2 - t1) / 1000.0));
+
+ return ais;
+ }
+
+ private static Map<KeyExtent,String> locationsOf(Map<Path,List<TabletLocation>> assignments) {
+ Map<KeyExtent,String> result = new HashMap<KeyExtent,String>();
+ for (List<TabletLocation> entry : assignments.values()) {
+ for (TabletLocation tl : entry) {
+ result.put(tl.tablet_extent, tl.tablet_location);
+ }
+ }
+ return result;
+ }
+
+ private Map<Path,List<KeyExtent>> assignMapFiles(AccumuloConfiguration acuConf, Instance instance, Configuration conf, Credentials credentials,
+ VolumeManager fs, String tableId, Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads, int numMapThreads) {
+ timer.start(Timers.EXAMINE_MAP_FILES);
+ Map<Path,List<AssignmentInfo>> assignInfo = estimateSizes(acuConf, conf, fs, assignments, paths, numMapThreads);
+ timer.stop(Timers.EXAMINE_MAP_FILES);
+
+ Map<Path,List<KeyExtent>> ret;
+
+ timer.start(Timers.IMPORT_MAP_FILES);
+ ret = assignMapFiles(credentials, tableId, assignInfo, locationsOf(assignments), numThreads);
+ timer.stop(Timers.IMPORT_MAP_FILES);
+
+ return ret;
+ }
+
+ private class AssignmentTask implements Runnable {
+ final Map<Path,List<KeyExtent>> assignmentFailures;
+ String location;
+ Credentials credentials;
+ private Map<KeyExtent,List<PathSize>> assignmentsPerTablet;
+
+ public AssignmentTask(Credentials credentials, Map<Path,List<KeyExtent>> assignmentFailures, String tableName, String location,
+ Map<KeyExtent,List<PathSize>> assignmentsPerTablet) {
+ this.assignmentFailures = assignmentFailures;
+ this.location = location;
+ this.assignmentsPerTablet = assignmentsPerTablet;
+ this.credentials = credentials;
+ }
+
+ private void handleFailures(Collection<KeyExtent> failures, String message) {
+ for (KeyExtent ke : failures) {
+ List<PathSize> mapFiles = assignmentsPerTablet.get(ke);
+ synchronized (assignmentFailures) {
+ for (PathSize pathSize : mapFiles) {
+ List<KeyExtent> existingFailures = assignmentFailures.get(pathSize.path);
+ if (existingFailures == null) {
+ existingFailures = new ArrayList<KeyExtent>();
+ assignmentFailures.put(pathSize.path, existingFailures);
+ }
+
+ existingFailures.add(ke);
+ }
+ }
+
+ log.info("Could not assign " + mapFiles.size() + " map files to tablet " + ke + " because : " + message + ". Will retry ...");
+ }
+ }
+
+ @Override
+ public void run() {
+ HashSet<Path> uniqMapFiles = new HashSet<Path>();
+ for (List<PathSize> mapFiles : assignmentsPerTablet.values())
+ for (PathSize ps : mapFiles)
+ uniqMapFiles.add(ps.path);
+
+ log.debug("Assigning " + uniqMapFiles.size() + " map files to " + assignmentsPerTablet.size() + " tablets at " + location);
+
+ try {
+ List<KeyExtent> failures = assignMapFiles(credentials, location, assignmentsPerTablet);
+ handleFailures(failures, "Not Serving Tablet");
+ } catch (AccumuloException e) {
+ handleFailures(assignmentsPerTablet.keySet(), e.getMessage());
+ } catch (AccumuloSecurityException e) {
+ handleFailures(assignmentsPerTablet.keySet(), e.getMessage());
+ }
+ }
+
+ }
+
+ private class PathSize {
+ public PathSize(Path mapFile, long estSize) {
+ this.path = mapFile;
+ this.estSize = estSize;
+ }
+
+ Path path;
+ long estSize;
+
+ @Override
+ public String toString() {
+ return path + " " + estSize;
+ }
+ }
+
+ private Map<Path,List<KeyExtent>> assignMapFiles(Credentials credentials, String tableName, Map<Path,List<AssignmentInfo>> assignments,
+ Map<KeyExtent,String> locations, int numThreads) {
+
+ // group assignments by tablet
+ Map<KeyExtent,List<PathSize>> assignmentsPerTablet = new TreeMap<KeyExtent,List<PathSize>>();
+ for (Entry<Path,List<AssignmentInfo>> entry : assignments.entrySet()) {
+ Path mapFile = entry.getKey();
+ List<AssignmentInfo> tabletsToAssignMapFileTo = entry.getValue();
+
+ for (AssignmentInfo ai : tabletsToAssignMapFileTo) {
+ List<PathSize> mapFiles = assignmentsPerTablet.get(ai.ke);
+ if (mapFiles == null) {
+ mapFiles = new ArrayList<PathSize>();
+ assignmentsPerTablet.put(ai.ke, mapFiles);
+ }
+
+ mapFiles.add(new PathSize(mapFile, ai.estSize));
+ }
+ }
+
+ // group assignments by tabletserver
+
+ Map<Path,List<KeyExtent>> assignmentFailures = Collections.synchronizedMap(new TreeMap<Path,List<KeyExtent>>());
+
+ TreeMap<String,Map<KeyExtent,List<PathSize>>> assignmentsPerTabletServer = new TreeMap<String,Map<KeyExtent,List<PathSize>>>();
+
+ for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
+ KeyExtent ke = entry.getKey();
+ String location = locations.get(ke);
+
+ if (location == null) {
+ for (PathSize pathSize : entry.getValue()) {
+ synchronized (assignmentFailures) {
+ List<KeyExtent> failures = assignmentFailures.get(pathSize.path);
+ if (failures == null) {
+ failures = new ArrayList<KeyExtent>();
+ assignmentFailures.put(pathSize.path, failures);
+ }
+
+ failures.add(ke);
+ }
+ }
+
+ log.warn("Could not assign " + entry.getValue().size() + " map files to tablet " + ke + " because it had no location, will retry ...");
+
+ continue;
+ }
+
+ Map<KeyExtent,List<PathSize>> apt = assignmentsPerTabletServer.get(location);
+ if (apt == null) {
+ apt = new TreeMap<KeyExtent,List<PathSize>>();
+ assignmentsPerTabletServer.put(location, apt);
+ }
+
+ apt.put(entry.getKey(), entry.getValue());
+ }
+
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("submit"));
+
+ for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer.entrySet()) {
+ String location = entry.getKey();
+ threadPool.submit(new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue()));
+ }
+
+ threadPool.shutdown();
+
+ while (!threadPool.isTerminated()) {
+ try {
+ threadPool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ return assignmentFailures;
+ }
+
+ private List<KeyExtent> assignMapFiles(Credentials credentials, String location, Map<KeyExtent,List<PathSize>> assignmentsPerTablet)
+ throws AccumuloException, AccumuloSecurityException {
+ try {
+ long timeInMillis = ServerConfiguration.getSystemConfiguration(instance).getTimeInMillis(Property.TSERV_BULK_TIMEOUT);
+ TabletClientService.Iface client = ThriftUtil.getTServerClient(location, ServerConfiguration.getSystemConfiguration(instance), timeInMillis);
+ try {
+ HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>> files = new HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>();
+ for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
+ HashMap<String,org.apache.accumulo.core.data.thrift.MapFileInfo> tabletFiles = new HashMap<String,org.apache.accumulo.core.data.thrift.MapFileInfo>();
+ files.put(entry.getKey(), tabletFiles);
+
+ for (PathSize pathSize : entry.getValue()) {
+ org.apache.accumulo.core.data.thrift.MapFileInfo mfi = new org.apache.accumulo.core.data.thrift.MapFileInfo(pathSize.estSize);
+ tabletFiles.put(pathSize.path.toString(), mfi);
+ }
+ }
+
+ log.debug("Asking " + location + " to bulk load " + files);
+ List<TKeyExtent> failures = client.bulkImport(Tracer.traceInfo(), credentials.toThrift(instance), tid, Translator.translate(files, Translators.KET),
+ setTime);
+
+ return Translator.translate(failures, Translators.TKET);
+ } finally {
+ ThriftUtil.returnClient((TServiceClient) client);
+ }
+ } catch (ThriftSecurityException e) {
+ throw new AccumuloSecurityException(e.user, e.code, e);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ throw new AccumuloException(t);
+ }
+ }
+
+ public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager fs, TabletLocator locator, Path file,
+ Credentials credentials) throws Exception {
+ return findOverlappingTablets(acuConf, fs, locator, file, null, null, credentials);
+ }
+
+ public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager fs, TabletLocator locator, Path file,
+ KeyExtent failed, Credentials credentials) throws Exception {
+ locator.invalidateCache(failed);
- Text start = failed.getPrevEndRow();
- if (start != null)
- start = Range.followingPrefix(start);
++ Text start = getStartRowForExtent(failed);
+ return findOverlappingTablets(acuConf, fs, locator, file, start, failed.getEndRow(), credentials);
+ }
+
++ protected static Text getStartRowForExtent(KeyExtent extent) {
++ Text start = extent.getPrevEndRow();
++ if (start != null) {
++ start = new Text(start);
++ // ACCUMULO-3967 We want the first possible key in this tablet, not the following row from the previous tablet
++ start.append(byte0, 0, 1);
++ }
++ return start;
++ }
++
+ final static byte[] byte0 = {0};
+
+ public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager vm, TabletLocator locator, Path file, Text startRow,
+ Text endRow, Credentials credentials) throws Exception {
+ List<TabletLocation> result = new ArrayList<TabletLocation>();
+ Collection<ByteSequence> columnFamilies = Collections.emptyList();
+ String filename = file.toString();
+ // log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow);
+ FileSystem fs = vm.getVolumeByPath(file).getFileSystem();
+ FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), acuConf);
+ try {
+ Text row = startRow;
+ if (row == null)
+ row = new Text();
+ while (true) {
+ // log.debug(filename + " Seeking to row " + row);
+ reader.seek(new Range(row, null), columnFamilies, false);
+ if (!reader.hasTop()) {
+ // log.debug(filename + " not found");
+ break;
+ }
+ row = reader.getTopKey().getRow();
+ TabletLocation tabletLocation = locator.locateTablet(credentials, row, false, true);
+ // log.debug(filename + " found row " + row + " at location " + tabletLocation);
+ result.add(tabletLocation);
+ row = tabletLocation.tablet_extent.getEndRow();
+ if (row != null && (endRow == null || row.compareTo(endRow) < 0)) {
+ row = new Text(row);
+ row.append(byte0, 0, byte0.length);
+ } else
+ break;
+ }
+ } finally {
+ reader.close();
+ }
+ // log.debug(filename + " to be sent to " + result);
+ return result;
+ }
+
+ public static class AssignmentStats {
+ private Map<KeyExtent,Integer> counts;
+ private int numUniqueMapFiles;
+ private Map<Path,List<KeyExtent>> completeFailures = null;
+ private Set<Path> failedFailures = null;
+
+ AssignmentStats(int fileCount) {
+ counts = new HashMap<KeyExtent,Integer>();
+ numUniqueMapFiles = fileCount;
+ }
+
+ void attemptingAssignments(Map<Path,List<TabletLocation>> assignments) {
+ for (Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
+ for (TabletLocation tl : entry.getValue()) {
+
+ Integer count = getCount(tl.tablet_extent);
+
+ counts.put(tl.tablet_extent, count + 1);
+ }
+ }
+ }
+
+ void assignmentsFailed(Map<Path,List<KeyExtent>> assignmentFailures) {
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
+ for (KeyExtent ke : entry.getValue()) {
+
+ Integer count = getCount(ke);
+
+ counts.put(ke, count - 1);
+ }
+ }
+ }
+
+ void assignmentsAbandoned(Map<Path,List<KeyExtent>> completeFailures) {
+ this.completeFailures = completeFailures;
+ }
+
+ void tabletSplit(KeyExtent parent, Collection<KeyExtent> children) {
+ Integer count = getCount(parent);
+
+ counts.remove(parent);
+
+ for (KeyExtent keyExtent : children)
+ counts.put(keyExtent, count);
+ }
+
+ private Integer getCount(KeyExtent parent) {
+ Integer count = counts.get(parent);
+
+ if (count == null) {
+ count = 0;
+ }
+ return count;
+ }
+
+ void unrecoveredMapFiles(Set<Path> failedFailures) {
+ this.failedFailures = failedFailures;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ int totalAssignments = 0;
+ int tabletsImportedTo = 0;
+
+ int min = Integer.MAX_VALUE, max = Integer.MIN_VALUE;
+
+ for (Entry<KeyExtent,Integer> entry : counts.entrySet()) {
+ totalAssignments += entry.getValue();
+ if (entry.getValue() > 0)
+ tabletsImportedTo++;
+
+ if (entry.getValue() < min)
+ min = entry.getValue();
+
+ if (entry.getValue() > max)
+ max = entry.getValue();
+ }
+
+ double stddev = 0;
+
+ for (Entry<KeyExtent,Integer> entry : counts.entrySet())
+ stddev += Math.pow(entry.getValue() - totalAssignments / (double) counts.size(), 2);
+
+ stddev = stddev / counts.size();
+ stddev = Math.sqrt(stddev);
+
+ Set<KeyExtent> failedTablets = new HashSet<KeyExtent>();
+ for (List<KeyExtent> ft : completeFailures.values())
+ failedTablets.addAll(ft);
+
+ sb.append("BULK IMPORT ASSIGNMENT STATISTICS\n");
+ sb.append(String.format("# of map files : %,10d%n", numUniqueMapFiles));
+ sb.append(String.format("# map files with failures : %,10d %6.2f%s%n", completeFailures.size(), completeFailures.size() * 100.0 / numUniqueMapFiles, "%"));
+ sb.append(String.format("# failed failed map files : %,10d %s%n", failedFailures.size(), failedFailures.size() > 0 ? " <-- THIS IS BAD" : ""));
+ sb.append(String.format("# of tablets : %,10d%n", counts.size()));
+ sb.append(String.format("# tablets imported to : %,10d %6.2f%s%n", tabletsImportedTo, tabletsImportedTo * 100.0 / counts.size(), "%"));
+ sb.append(String.format("# tablets with failures : %,10d %6.2f%s%n", failedTablets.size(), failedTablets.size() * 100.0 / counts.size(), "%"));
+ sb.append(String.format("min map files per tablet : %,10d%n", min));
+ sb.append(String.format("max map files per tablet : %,10d%n", max));
+ sb.append(String.format("avg map files per tablet : %,10.2f (std dev = %.2f)%n", totalAssignments / (double) counts.size(), stddev));
+ return sb.toString();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79245e1e/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
----------------------------------------------------------------------
diff --cc server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
index 8f52845,0000000..d1290ac
mode 100644,000000..100644
--- a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
@@@ -1,155 -1,0 +1,176 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.client;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BulkImporterTest {
+
+ static final SortedSet<KeyExtent> fakeMetaData = new TreeSet<KeyExtent>();
+ static final Text tableId = new Text("1");
+ static {
+ fakeMetaData.add(new KeyExtent(tableId, new Text("a"), null));
+ for (String part : new String[] {"b", "bm", "c", "cm", "d", "dm", "e", "em", "f", "g", "h", "i", "j", "k", "l"}) {
+ fakeMetaData.add(new KeyExtent(tableId, new Text(part), fakeMetaData.last().getEndRow()));
+ }
+ fakeMetaData.add(new KeyExtent(tableId, null, fakeMetaData.last().getEndRow()));
+ }
+
+ class MockTabletLocator extends TabletLocator {
+ int invalidated = 0;
+
+ @Override
+ public TabletLocation locateTablet(Credentials credentials, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ return new TabletLocation(fakeMetaData.tailSet(new KeyExtent(tableId, row, null)).first(), "localhost", "1");
+ }
+
+ @Override
+ public <T extends Mutation> void binMutations(Credentials credentials, List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations,
+ List<T> failures) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public List<Range> binRanges(Credentials credentials, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException,
+ AccumuloSecurityException, TableNotFoundException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void invalidateCache(KeyExtent failedExtent) {
+ invalidated++;
+ }
+
+ @Override
+ public void invalidateCache(Collection<KeyExtent> keySet) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void invalidateCache() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void invalidateCache(String server) {
+ throw new NotImplementedException();
+ }
+ }
+
+ @Test
+ public void testFindOverlappingTablets() throws Exception {
+ Credentials credentials = null;
+ MockTabletLocator locator = new MockTabletLocator();
+ FileSystem fs = FileSystem.getLocal(CachedConfiguration.getInstance());
+ AccumuloConfiguration acuConf = AccumuloConfiguration.getDefaultConfiguration();
+ String file = "target/testFile.rf";
+ fs.delete(new Path(file), true);
+ FileSKVWriter writer = FileOperations.getInstance().openWriter(file, fs, fs.getConf(), acuConf);
+ writer.startDefaultLocalityGroup();
+ Value empty = new Value(new byte[] {});
+ writer.append(new Key("a", "cf", "cq"), empty);
+ writer.append(new Key("a", "cf", "cq1"), empty);
+ writer.append(new Key("a", "cf", "cq2"), empty);
+ writer.append(new Key("a", "cf", "cq3"), empty);
+ writer.append(new Key("a", "cf", "cq4"), empty);
+ writer.append(new Key("a", "cf", "cq5"), empty);
+ writer.append(new Key("d", "cf", "cq"), empty);
+ writer.append(new Key("d", "cf", "cq1"), empty);
+ writer.append(new Key("d", "cf", "cq2"), empty);
+ writer.append(new Key("d", "cf", "cq3"), empty);
+ writer.append(new Key("d", "cf", "cq4"), empty);
+ writer.append(new Key("d", "cf", "cq5"), empty);
+ writer.append(new Key("dd", "cf", "cq1"), empty);
+ writer.append(new Key("ichabod", "cf", "cq"), empty);
+ writer.append(new Key("icky", "cf", "cq1"), empty);
+ writer.append(new Key("iffy", "cf", "cq2"), empty);
+ writer.append(new Key("internal", "cf", "cq3"), empty);
+ writer.append(new Key("is", "cf", "cq4"), empty);
+ writer.append(new Key("iterator", "cf", "cq5"), empty);
+ writer.append(new Key("xyzzy", "cf", "cq"), empty);
+ writer.close();
+ VolumeManager vm = VolumeManagerImpl.get(acuConf);
+ List<TabletLocation> overlaps = BulkImporter.findOverlappingTablets(acuConf, vm, locator, new Path(file), credentials);
+ Assert.assertEquals(5, overlaps.size());
+ Collections.sort(overlaps);
+ Assert.assertEquals(new KeyExtent(tableId, new Text("a"), null), overlaps.get(0).tablet_extent);
+ Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps.get(1).tablet_extent);
+ Assert.assertEquals(new KeyExtent(tableId, new Text("dm"), new Text("d")), overlaps.get(2).tablet_extent);
+ Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps.get(3).tablet_extent);
+ Assert.assertEquals(new KeyExtent(tableId, null, new Text("l")), overlaps.get(4).tablet_extent);
+
+ List<TabletLocation> overlaps2 = BulkImporter.findOverlappingTablets(acuConf, vm, locator, new Path(file), new KeyExtent(tableId, new Text("h"), new Text(
+ "b")), credentials);
+ Assert.assertEquals(3, overlaps2.size());
+ Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps2.get(0).tablet_extent);
+ Assert.assertEquals(new KeyExtent(tableId, new Text("dm"), new Text("d")), overlaps2.get(1).tablet_extent);
+ Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps2.get(2).tablet_extent);
+ Assert.assertEquals(locator.invalidated, 1);
+ }
+
++ @Test
++ public void testSequentialTablets() throws Exception {
++ // ACCUMULO-3967 make sure that the startRow we compute in BulkImporter is actually giving
++ // a correct startRow so that findOverlappingTablets works as intended.
++
++ // 1;2;1
++ KeyExtent extent = new KeyExtent(new Text("1"), new Text("2"), new Text("1"));
++ Assert.assertEquals(new Text("1\0"), BulkImporter.getStartRowForExtent(extent));
++
++ // 1;2<
++ extent = new KeyExtent(new Text("1"), new Text("2"), null);
++ Assert.assertEquals(null, BulkImporter.getStartRowForExtent(extent));
++
++ // 1<<
++ extent = new KeyExtent(new Text("1"), null, null);
++ Assert.assertEquals(null, BulkImporter.getStartRowForExtent(extent));
++
++ // 1;8;7777777
++ extent = new KeyExtent(new Text("1"), new Text("8"), new Text("7777777"));
++ Assert.assertEquals(new Text("7777777\0"), BulkImporter.getStartRowForExtent(extent));
++ }
+}