You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/09/02 05:28:39 UTC
[01/19] accumulo git commit: ACCUMULO-3979 make sure timers get
stopped in the presence of exceptions
Repository: accumulo
Updated Branches:
refs/heads/1.5 08b805bd9 -> c6aae57a4
refs/heads/1.6 c767e1d9f -> 07ac4c85f
refs/heads/1.7 9b3d7c1df -> 203300558
refs/heads/master 9d25381b7 -> 5350dbc11
ACCUMULO-3979 make sure timers get stopped in the presence of exceptions
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/08b805bd
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/08b805bd
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/08b805bd
Branch: refs/heads/1.7
Commit: 08b805bd9eca422a1f0423dd5e1734b4321c4b0e
Parents: 5cd2063
Author: Eric C. Newton <er...@gmail.com>
Authored: Fri Aug 28 15:14:07 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Fri Aug 28 15:14:07 2015 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/server/client/BulkImporter.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/08b805bd/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index 0f89e73..a68b9ec 100644
--- a/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -143,6 +143,7 @@ public class BulkImporter {
for (Path path : paths) {
final Path mapFile = path;
Runnable getAssignments = new Runnable() {
+ @Override
public void run() {
List<TabletLocation> tabletsToAssignMapFileTo = Collections.emptyList();
try {
@@ -207,14 +208,14 @@ public class BulkImporter {
while (keListIter.hasNext()) {
KeyExtent ke = keListIter.next();
+ timer.start(Timers.QUERY_METADATA);
try {
- timer.start(Timers.QUERY_METADATA);
tabletsToAssignMapFileTo.addAll(findOverlappingTablets(instance.getConfiguration(), fs, locator, entry.getKey(), ke, credentials));
- timer.stop(Timers.QUERY_METADATA);
keListIter.remove();
} catch (Exception ex) {
log.warn("Exception finding overlapping tablets, will retry tablet " + ke);
}
+ timer.stop(Timers.QUERY_METADATA);
}
if (tabletsToAssignMapFileTo.size() > 0)
@@ -361,6 +362,7 @@ public class BulkImporter {
}
Runnable estimationTask = new Runnable() {
+ @Override
public void run() {
Map<KeyExtent,Long> estimatedSizes = null;
@@ -466,6 +468,7 @@ public class BulkImporter {
}
}
+ @Override
public void run() {
HashSet<Path> uniqMapFiles = new HashSet<Path>();
for (List<PathSize> mapFiles : assignmentsPerTablet.values())
@@ -495,6 +498,7 @@ public class BulkImporter {
Path path;
long estSize;
+ @Override
public String toString() {
return path + " " + estSize;
}
@@ -729,6 +733,7 @@ public class BulkImporter {
this.failedFailures = failedFailures;
}
+ @Override
public String toString() {
StringBuilder sb = new StringBuilder();
int totalAssignments = 0;
[16/19] accumulo git commit: Merge branch '1.5' into 1.6
Posted by el...@apache.org.
Merge branch '1.5' into 1.6
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/07ac4c85
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/07ac4c85
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/07ac4c85
Branch: refs/heads/master
Commit: 07ac4c85f30e57d094df8c1b481fe02b3237d2a7
Parents: c767e1d c6aae57
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 1 23:27:36 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 1 23:27:36 2015 -0400
----------------------------------------------------------------------
----------------------------------------------------------------------
[05/19] accumulo git commit: Merge branch '1.7' of
https://git-wip-us.apache.org/repos/asf/accumulo into 1.7
Posted by el...@apache.org.
Merge branch '1.7' of https://git-wip-us.apache.org/repos/asf/accumulo into 1.7
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9b3d7c1d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9b3d7c1d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9b3d7c1d
Branch: refs/heads/master
Commit: 9b3d7c1df5341920b914f2d0d37204ebe934a9b6
Parents: 32da99a 1761200
Author: Eric C. Newton <er...@gmail.com>
Authored: Fri Aug 28 15:30:12 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Fri Aug 28 15:30:12 2015 -0400
----------------------------------------------------------------------
.../main/asciidoc/chapters/administration.txt | 126 ++++++++++++++++++-
1 file changed, 124 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[03/19] accumulo git commit: ACCUMULO-3979 make sure timers get
stopped in the presence of exceptions
Posted by el...@apache.org.
ACCUMULO-3979 make sure timers get stopped in the presence of exceptions
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c767e1d9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c767e1d9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c767e1d9
Branch: refs/heads/1.7
Commit: c767e1d9f2b9702aecafdf4b1ea30970da712b88
Parents: a1497d1 08b805b
Author: Eric C. Newton <er...@gmail.com>
Authored: Fri Aug 28 15:16:25 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Fri Aug 28 15:16:25 2015 -0400
----------------------------------------------------------------------
.../java/org/apache/accumulo/server/client/BulkImporter.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c767e1d9/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index 0c77367,0000000..32186ea
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@@ -1,791 -1,0 +1,791 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.ServerClient;
+import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.Translators;
+import org.apache.accumulo.core.client.impl.thrift.ClientService;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.accumulo.core.util.StopWatch;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.util.FileUtil;
+import org.apache.accumulo.trace.instrument.TraceRunnable;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TServiceClient;
+
+public class BulkImporter {
+
+ private static final Logger log = Logger.getLogger(BulkImporter.class);
+
+ public static List<String> bulkLoad(AccumuloConfiguration conf, Instance instance, Credentials creds, long tid, String tableId, List<String> files,
+ String errorDir, boolean setTime) throws IOException, AccumuloException, AccumuloSecurityException, ThriftTableOperationException {
+ AssignmentStats stats = new BulkImporter(conf, instance, creds, tid, tableId, setTime).importFiles(files, new Path(errorDir));
+ List<String> result = new ArrayList<String>();
+ for (Path p : stats.completeFailures.keySet()) {
+ result.add(p.toString());
+ }
+ return result;
+ }
+
+ private StopWatch<Timers> timer;
+
+ private static enum Timers {
+ EXAMINE_MAP_FILES, QUERY_METADATA, IMPORT_MAP_FILES, SLEEP, TOTAL
+ }
+
+ private Instance instance;
+ private Credentials credentials;
+ private String tableId;
+ private long tid;
+ private AccumuloConfiguration acuConf;
+ private boolean setTime;
+
+ public BulkImporter(AccumuloConfiguration conf, Instance instance, Credentials credentials, long tid, String tableId, boolean setTime) {
+ this.instance = instance;
+ this.credentials = credentials;
+ this.tid = tid;
+ this.tableId = tableId;
+ this.acuConf = conf;
+ this.setTime = setTime;
+ }
+
+ public AssignmentStats importFiles(List<String> files, Path failureDir) throws IOException, AccumuloException, AccumuloSecurityException,
+ ThriftTableOperationException {
+
+ int numThreads = acuConf.getCount(Property.TSERV_BULK_PROCESS_THREADS);
+ int numAssignThreads = acuConf.getCount(Property.TSERV_BULK_ASSIGNMENT_THREADS);
+
+ timer = new StopWatch<Timers>(Timers.class);
+ timer.start(Timers.TOTAL);
+
+ Configuration conf = CachedConfiguration.getInstance();
+ VolumeManagerImpl.get(acuConf);
+ final VolumeManager fs = VolumeManagerImpl.get(acuConf);
+
+ Set<Path> paths = new HashSet<Path>();
+ for (String file : files) {
+ paths.add(new Path(file));
+ }
+ AssignmentStats assignmentStats = new AssignmentStats(paths.size());
+
+ final Map<Path,List<KeyExtent>> completeFailures = Collections.synchronizedSortedMap(new TreeMap<Path,List<KeyExtent>>());
+
+ ClientService.Client client = null;
+ final TabletLocator locator = TabletLocator.getLocator(instance, new Text(tableId));
+
+ try {
+ final Map<Path,List<TabletLocation>> assignments = Collections.synchronizedSortedMap(new TreeMap<Path,List<TabletLocation>>());
+
+ timer.start(Timers.EXAMINE_MAP_FILES);
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("findOverlapping"));
+
+ for (Path path : paths) {
+ final Path mapFile = path;
+ Runnable getAssignments = new Runnable() {
+ @Override
+ public void run() {
+ List<TabletLocation> tabletsToAssignMapFileTo = Collections.emptyList();
+ try {
+ tabletsToAssignMapFileTo = findOverlappingTablets(ServerConfiguration.getSystemConfiguration(instance), fs, locator, mapFile, credentials);
+ } catch (Exception ex) {
+ log.warn("Unable to find tablets that overlap file " + mapFile.toString(), ex);
+ }
+ log.debug("Map file " + mapFile + " found to overlap " + tabletsToAssignMapFileTo.size() + " tablets");
+ if (tabletsToAssignMapFileTo.size() == 0) {
+ List<KeyExtent> empty = Collections.emptyList();
+ completeFailures.put(mapFile, empty);
+ } else
+ assignments.put(mapFile, tabletsToAssignMapFileTo);
+ }
+ };
+ threadPool.submit(new TraceRunnable(new LoggingRunnable(log, getAssignments)));
+ }
+ threadPool.shutdown();
+ while (!threadPool.isTerminated()) {
+ try {
+ threadPool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ timer.stop(Timers.EXAMINE_MAP_FILES);
+
+ assignmentStats.attemptingAssignments(assignments);
+ Map<Path,List<KeyExtent>> assignmentFailures = assignMapFiles(acuConf, instance, conf, credentials, fs, tableId, assignments, paths, numAssignThreads,
+ numThreads);
+ assignmentStats.assignmentsFailed(assignmentFailures);
+
+ Map<Path,Integer> failureCount = new TreeMap<Path,Integer>();
+
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet())
+ failureCount.put(entry.getKey(), 1);
+
+ long sleepTime = 2 * 1000;
+ while (assignmentFailures.size() > 0) {
+ sleepTime = Math.min(sleepTime * 2, 60 * 1000);
+ locator.invalidateCache();
+ // assumption about assignment failures is that it caused by a split
+ // happening or a missing location
+ //
+ // for splits we need to find children key extents that cover the
+ // same key range and are contiguous (no holes, no overlap)
+
+ timer.start(Timers.SLEEP);
+ UtilWaitThread.sleep(sleepTime);
+ timer.stop(Timers.SLEEP);
+
+ log.debug("Trying to assign " + assignmentFailures.size() + " map files that previously failed on some key extents");
+ assignments.clear();
+
+ // for failed key extents, try to find children key extents to
+ // assign to
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
+ Iterator<KeyExtent> keListIter = entry.getValue().iterator();
+
+ List<TabletLocation> tabletsToAssignMapFileTo = new ArrayList<TabletLocation>();
+
+ while (keListIter.hasNext()) {
+ KeyExtent ke = keListIter.next();
+
++ timer.start(Timers.QUERY_METADATA);
+ try {
- timer.start(Timers.QUERY_METADATA);
+ tabletsToAssignMapFileTo.addAll(findOverlappingTablets(ServerConfiguration.getSystemConfiguration(instance), fs, locator, entry.getKey(), ke,
+ credentials));
- timer.stop(Timers.QUERY_METADATA);
+ keListIter.remove();
+ } catch (Exception ex) {
+ log.warn("Exception finding overlapping tablets, will retry tablet " + ke, ex);
+ }
++ timer.stop(Timers.QUERY_METADATA);
+ }
+
+ if (tabletsToAssignMapFileTo.size() > 0)
+ assignments.put(entry.getKey(), tabletsToAssignMapFileTo);
+ }
+
+ assignmentStats.attemptingAssignments(assignments);
+ Map<Path,List<KeyExtent>> assignmentFailures2 = assignMapFiles(acuConf, instance, conf, credentials, fs, tableId, assignments, paths, numAssignThreads,
+ numThreads);
+ assignmentStats.assignmentsFailed(assignmentFailures2);
+
+ // merge assignmentFailures2 into assignmentFailures
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures2.entrySet()) {
+ assignmentFailures.get(entry.getKey()).addAll(entry.getValue());
+
+ Integer fc = failureCount.get(entry.getKey());
+ if (fc == null)
+ fc = 0;
+
+ failureCount.put(entry.getKey(), fc + 1);
+ }
+
+ // remove map files that have no more key extents to assign
+ Iterator<Entry<Path,List<KeyExtent>>> afIter = assignmentFailures.entrySet().iterator();
+ while (afIter.hasNext()) {
+ Entry<Path,List<KeyExtent>> entry = afIter.next();
+ if (entry.getValue().size() == 0)
+ afIter.remove();
+ }
+
+ Set<Entry<Path,Integer>> failureIter = failureCount.entrySet();
+ for (Entry<Path,Integer> entry : failureIter) {
+ int retries = acuConf.getCount(Property.TSERV_BULK_RETRY);
+ if (entry.getValue() > retries && assignmentFailures.get(entry.getKey()) != null) {
+ log.error("Map file " + entry.getKey() + " failed more than " + retries + " times, giving up.");
+ completeFailures.put(entry.getKey(), assignmentFailures.get(entry.getKey()));
+ assignmentFailures.remove(entry.getKey());
+ }
+ }
+ }
+ assignmentStats.assignmentsAbandoned(completeFailures);
+ Set<Path> failedFailures = processFailures(completeFailures);
+ assignmentStats.unrecoveredMapFiles(failedFailures);
+
+ timer.stop(Timers.TOTAL);
+ printReport(paths);
+ return assignmentStats;
+ } finally {
+ if (client != null) {
+ ServerClient.close(client);
+ }
+ }
+ }
+
+ private void printReport(Set<Path> paths) {
+ long totalTime = 0;
+ for (Timers t : Timers.values()) {
+ if (t == Timers.TOTAL)
+ continue;
+
+ totalTime += timer.get(t);
+ }
+ List<String> files = new ArrayList<String>();
+ for (Path path : paths) {
+ files.add(path.getName());
+ }
+ Collections.sort(files);
+
+ log.debug("BULK IMPORT TIMING STATISTICS");
+ log.debug("Files: " + files);
+ log.debug(String.format("Examine map files : %,10.2f secs %6.2f%s", timer.getSecs(Timers.EXAMINE_MAP_FILES), 100.0 * timer.get(Timers.EXAMINE_MAP_FILES)
+ / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Query %-14s : %,10.2f secs %6.2f%s", MetadataTable.NAME, timer.getSecs(Timers.QUERY_METADATA),
+ 100.0 * timer.get(Timers.QUERY_METADATA) / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Import Map Files : %,10.2f secs %6.2f%s", timer.getSecs(Timers.IMPORT_MAP_FILES), 100.0 * timer.get(Timers.IMPORT_MAP_FILES)
+ / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Sleep : %,10.2f secs %6.2f%s", timer.getSecs(Timers.SLEEP),
+ 100.0 * timer.get(Timers.SLEEP) / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Misc : %,10.2f secs %6.2f%s", (timer.get(Timers.TOTAL) - totalTime) / 1000.0, 100.0
+ * (timer.get(Timers.TOTAL) - totalTime) / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Total : %,10.2f secs", timer.getSecs(Timers.TOTAL)));
+ }
+
+ private Set<Path> processFailures(Map<Path,List<KeyExtent>> completeFailures) {
+ // we should check if map file was not assigned to any tablets, then we
+ // should just move it; not currently being done?
+
+ Set<Entry<Path,List<KeyExtent>>> es = completeFailures.entrySet();
+
+ if (completeFailures.size() == 0)
+ return Collections.emptySet();
+
+ log.debug("The following map files failed ");
+
+ for (Entry<Path,List<KeyExtent>> entry : es) {
+ List<KeyExtent> extents = entry.getValue();
+
+ for (KeyExtent keyExtent : extents)
+ log.debug("\t" + entry.getKey() + " -> " + keyExtent);
+ }
+
+ return Collections.emptySet();
+ }
+
+ private class AssignmentInfo {
+ public AssignmentInfo(KeyExtent keyExtent, Long estSize) {
+ this.ke = keyExtent;
+ this.estSize = estSize;
+ }
+
+ KeyExtent ke;
+ long estSize;
+ }
+
+ private static List<KeyExtent> extentsOf(List<TabletLocation> locations) {
+ List<KeyExtent> result = new ArrayList<KeyExtent>(locations.size());
+ for (TabletLocation tl : locations)
+ result.add(tl.tablet_extent);
+ return result;
+ }
+
+ private Map<Path,List<AssignmentInfo>> estimateSizes(final AccumuloConfiguration acuConf, final Configuration conf, final VolumeManager vm,
+ Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads) {
+
+ long t1 = System.currentTimeMillis();
+ final Map<Path,Long> mapFileSizes = new TreeMap<Path,Long>();
+
+ try {
+ for (Path path : paths) {
+ FileSystem fs = vm.getVolumeByPath(path).getFileSystem();
+ mapFileSizes.put(path, fs.getContentSummary(path).getLength());
+ }
+ } catch (IOException e) {
+ log.error("Failed to get map files in for " + paths + ": " + e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+
+ final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new TreeMap<Path,List<AssignmentInfo>>());
+
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("estimateSizes"));
+
+ for (final Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
+ if (entry.getValue().size() == 1) {
+ TabletLocation tabletLocation = entry.getValue().get(0);
+
+ // if the tablet completely contains the map file, there is no
+ // need to estimate its
+ // size
+ ais.put(entry.getKey(), Collections.singletonList(new AssignmentInfo(tabletLocation.tablet_extent, mapFileSizes.get(entry.getKey()))));
+ continue;
+ }
+
+ Runnable estimationTask = new Runnable() {
+ @Override
+ public void run() {
+ Map<KeyExtent,Long> estimatedSizes = null;
+
+ try {
+ estimatedSizes = FileUtil.estimateSizes(acuConf, entry.getKey(), mapFileSizes.get(entry.getKey()), extentsOf(entry.getValue()), conf, vm);
+ } catch (IOException e) {
+ log.warn("Failed to estimate map file sizes " + e.getMessage());
+ }
+
+ if (estimatedSizes == null) {
+ // estimation failed, do a simple estimation
+ estimatedSizes = new TreeMap<KeyExtent,Long>();
+ long estSize = (long) (mapFileSizes.get(entry.getKey()) / (double) entry.getValue().size());
+ for (TabletLocation tl : entry.getValue())
+ estimatedSizes.put(tl.tablet_extent, estSize);
+ }
+
+ List<AssignmentInfo> assignmentInfoList = new ArrayList<AssignmentInfo>(estimatedSizes.size());
+
+ for (Entry<KeyExtent,Long> entry2 : estimatedSizes.entrySet())
+ assignmentInfoList.add(new AssignmentInfo(entry2.getKey(), entry2.getValue()));
+
+ ais.put(entry.getKey(), assignmentInfoList);
+ }
+ };
+
+ threadPool.submit(new TraceRunnable(new LoggingRunnable(log, estimationTask)));
+ }
+
+ threadPool.shutdown();
+
+ while (!threadPool.isTerminated()) {
+ try {
+ threadPool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ long t2 = System.currentTimeMillis();
+
+ log.debug(String.format("Estimated map files sizes in %6.2f secs", (t2 - t1) / 1000.0));
+
+ return ais;
+ }
+
+ private static Map<KeyExtent,String> locationsOf(Map<Path,List<TabletLocation>> assignments) {
+ Map<KeyExtent,String> result = new HashMap<KeyExtent,String>();
+ for (List<TabletLocation> entry : assignments.values()) {
+ for (TabletLocation tl : entry) {
+ result.put(tl.tablet_extent, tl.tablet_location);
+ }
+ }
+ return result;
+ }
+
+ private Map<Path,List<KeyExtent>> assignMapFiles(AccumuloConfiguration acuConf, Instance instance, Configuration conf, Credentials credentials,
+ VolumeManager fs, String tableId, Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads, int numMapThreads) {
+ timer.start(Timers.EXAMINE_MAP_FILES);
+ Map<Path,List<AssignmentInfo>> assignInfo = estimateSizes(acuConf, conf, fs, assignments, paths, numMapThreads);
+ timer.stop(Timers.EXAMINE_MAP_FILES);
+
+ Map<Path,List<KeyExtent>> ret;
+
+ timer.start(Timers.IMPORT_MAP_FILES);
+ ret = assignMapFiles(credentials, tableId, assignInfo, locationsOf(assignments), numThreads);
+ timer.stop(Timers.IMPORT_MAP_FILES);
+
+ return ret;
+ }
+
+ private class AssignmentTask implements Runnable {
+ final Map<Path,List<KeyExtent>> assignmentFailures;
+ String location;
+ Credentials credentials;
+ private Map<KeyExtent,List<PathSize>> assignmentsPerTablet;
+
+ public AssignmentTask(Credentials credentials, Map<Path,List<KeyExtent>> assignmentFailures, String tableName, String location,
+ Map<KeyExtent,List<PathSize>> assignmentsPerTablet) {
+ this.assignmentFailures = assignmentFailures;
+ this.location = location;
+ this.assignmentsPerTablet = assignmentsPerTablet;
+ this.credentials = credentials;
+ }
+
+ private void handleFailures(Collection<KeyExtent> failures, String message) {
+ for (KeyExtent ke : failures) {
+ List<PathSize> mapFiles = assignmentsPerTablet.get(ke);
+ synchronized (assignmentFailures) {
+ for (PathSize pathSize : mapFiles) {
+ List<KeyExtent> existingFailures = assignmentFailures.get(pathSize.path);
+ if (existingFailures == null) {
+ existingFailures = new ArrayList<KeyExtent>();
+ assignmentFailures.put(pathSize.path, existingFailures);
+ }
+
+ existingFailures.add(ke);
+ }
+ }
+
+ log.info("Could not assign " + mapFiles.size() + " map files to tablet " + ke + " because : " + message + ". Will retry ...");
+ }
+ }
+
+ @Override
+ public void run() {
+ HashSet<Path> uniqMapFiles = new HashSet<Path>();
+ for (List<PathSize> mapFiles : assignmentsPerTablet.values())
+ for (PathSize ps : mapFiles)
+ uniqMapFiles.add(ps.path);
+
+ log.debug("Assigning " + uniqMapFiles.size() + " map files to " + assignmentsPerTablet.size() + " tablets at " + location);
+
+ try {
+ List<KeyExtent> failures = assignMapFiles(credentials, location, assignmentsPerTablet);
+ handleFailures(failures, "Not Serving Tablet");
+ } catch (AccumuloException e) {
+ handleFailures(assignmentsPerTablet.keySet(), e.getMessage());
+ } catch (AccumuloSecurityException e) {
+ handleFailures(assignmentsPerTablet.keySet(), e.getMessage());
+ }
+ }
+
+ }
+
+ private class PathSize {
+ public PathSize(Path mapFile, long estSize) {
+ this.path = mapFile;
+ this.estSize = estSize;
+ }
+
+ Path path;
+ long estSize;
+
+ @Override
+ public String toString() {
+ return path + " " + estSize;
+ }
+ }
+
+ private Map<Path,List<KeyExtent>> assignMapFiles(Credentials credentials, String tableName, Map<Path,List<AssignmentInfo>> assignments,
+ Map<KeyExtent,String> locations, int numThreads) {
+
+ // group assignments by tablet
+ Map<KeyExtent,List<PathSize>> assignmentsPerTablet = new TreeMap<KeyExtent,List<PathSize>>();
+ for (Entry<Path,List<AssignmentInfo>> entry : assignments.entrySet()) {
+ Path mapFile = entry.getKey();
+ List<AssignmentInfo> tabletsToAssignMapFileTo = entry.getValue();
+
+ for (AssignmentInfo ai : tabletsToAssignMapFileTo) {
+ List<PathSize> mapFiles = assignmentsPerTablet.get(ai.ke);
+ if (mapFiles == null) {
+ mapFiles = new ArrayList<PathSize>();
+ assignmentsPerTablet.put(ai.ke, mapFiles);
+ }
+
+ mapFiles.add(new PathSize(mapFile, ai.estSize));
+ }
+ }
+
+ // group assignments by tabletserver
+
+ Map<Path,List<KeyExtent>> assignmentFailures = Collections.synchronizedMap(new TreeMap<Path,List<KeyExtent>>());
+
+ TreeMap<String,Map<KeyExtent,List<PathSize>>> assignmentsPerTabletServer = new TreeMap<String,Map<KeyExtent,List<PathSize>>>();
+
+ for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
+ KeyExtent ke = entry.getKey();
+ String location = locations.get(ke);
+
+ if (location == null) {
+ for (PathSize pathSize : entry.getValue()) {
+ synchronized (assignmentFailures) {
+ List<KeyExtent> failures = assignmentFailures.get(pathSize.path);
+ if (failures == null) {
+ failures = new ArrayList<KeyExtent>();
+ assignmentFailures.put(pathSize.path, failures);
+ }
+
+ failures.add(ke);
+ }
+ }
+
+ log.warn("Could not assign " + entry.getValue().size() + " map files to tablet " + ke + " because it had no location, will retry ...");
+
+ continue;
+ }
+
+ Map<KeyExtent,List<PathSize>> apt = assignmentsPerTabletServer.get(location);
+ if (apt == null) {
+ apt = new TreeMap<KeyExtent,List<PathSize>>();
+ assignmentsPerTabletServer.put(location, apt);
+ }
+
+ apt.put(entry.getKey(), entry.getValue());
+ }
+
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("submit"));
+
+ for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer.entrySet()) {
+ String location = entry.getKey();
+ threadPool.submit(new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue()));
+ }
+
+ threadPool.shutdown();
+
+ while (!threadPool.isTerminated()) {
+ try {
+ threadPool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ return assignmentFailures;
+ }
+
+ private List<KeyExtent> assignMapFiles(Credentials credentials, String location, Map<KeyExtent,List<PathSize>> assignmentsPerTablet)
+ throws AccumuloException, AccumuloSecurityException {
+ try {
+ long timeInMillis = ServerConfiguration.getSystemConfiguration(instance).getTimeInMillis(Property.TSERV_BULK_TIMEOUT);
+ TabletClientService.Iface client = ThriftUtil.getTServerClient(location, ServerConfiguration.getSystemConfiguration(instance), timeInMillis);
+ try {
+ HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>> files = new HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>();
+ for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
+ HashMap<String,org.apache.accumulo.core.data.thrift.MapFileInfo> tabletFiles = new HashMap<String,org.apache.accumulo.core.data.thrift.MapFileInfo>();
+ files.put(entry.getKey(), tabletFiles);
+
+ for (PathSize pathSize : entry.getValue()) {
+ org.apache.accumulo.core.data.thrift.MapFileInfo mfi = new org.apache.accumulo.core.data.thrift.MapFileInfo(pathSize.estSize);
+ tabletFiles.put(pathSize.path.toString(), mfi);
+ }
+ }
+
+ log.debug("Asking " + location + " to bulk load " + files);
+ List<TKeyExtent> failures = client.bulkImport(Tracer.traceInfo(), credentials.toThrift(instance), tid, Translator.translate(files, Translators.KET),
+ setTime);
+
+ return Translator.translate(failures, Translators.TKET);
+ } finally {
+ ThriftUtil.returnClient((TServiceClient) client);
+ }
+ } catch (ThriftSecurityException e) {
+ throw new AccumuloSecurityException(e.user, e.code, e);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ throw new AccumuloException(t);
+ }
+ }
+
+ public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager fs, TabletLocator locator, Path file,
+ Credentials credentials) throws Exception {
+ return findOverlappingTablets(acuConf, fs, locator, file, null, null, credentials);
+ }
+
+ public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager fs, TabletLocator locator, Path file,
+ KeyExtent failed, Credentials credentials) throws Exception {
+ locator.invalidateCache(failed);
+ Text start = getStartRowForExtent(failed);
+ return findOverlappingTablets(acuConf, fs, locator, file, start, failed.getEndRow(), credentials);
+ }
+
+ protected static Text getStartRowForExtent(KeyExtent extent) {
+ Text start = extent.getPrevEndRow();
+ if (start != null) {
+ start = new Text(start);
+ // ACCUMULO-3967 We want the first possible key in this tablet, not the following row from the previous tablet
+ start.append(byte0, 0, 1);
+ }
+ return start;
+ }
+
+ final static byte[] byte0 = {0};
+
+ public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager vm, TabletLocator locator, Path file, Text startRow,
+ Text endRow, Credentials credentials) throws Exception {
+ List<TabletLocation> result = new ArrayList<TabletLocation>();
+ Collection<ByteSequence> columnFamilies = Collections.emptyList();
+ String filename = file.toString();
+ // log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow);
+ FileSystem fs = vm.getVolumeByPath(file).getFileSystem();
+ FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), acuConf);
+ try {
+ Text row = startRow;
+ if (row == null)
+ row = new Text();
+ while (true) {
+ // log.debug(filename + " Seeking to row " + row);
+ reader.seek(new Range(row, null), columnFamilies, false);
+ if (!reader.hasTop()) {
+ // log.debug(filename + " not found");
+ break;
+ }
+ row = reader.getTopKey().getRow();
+ TabletLocation tabletLocation = locator.locateTablet(credentials, row, false, true);
+ // log.debug(filename + " found row " + row + " at location " + tabletLocation);
+ result.add(tabletLocation);
+ row = tabletLocation.tablet_extent.getEndRow();
+ if (row != null && (endRow == null || row.compareTo(endRow) < 0)) {
+ row = new Text(row);
+ row.append(byte0, 0, byte0.length);
+ } else
+ break;
+ }
+ } finally {
+ reader.close();
+ }
+ // log.debug(filename + " to be sent to " + result);
+ return result;
+ }
+
+ public static class AssignmentStats {
+ private Map<KeyExtent,Integer> counts;
+ private int numUniqueMapFiles;
+ private Map<Path,List<KeyExtent>> completeFailures = null;
+ private Set<Path> failedFailures = null;
+
+ AssignmentStats(int fileCount) {
+ counts = new HashMap<KeyExtent,Integer>();
+ numUniqueMapFiles = fileCount;
+ }
+
+ void attemptingAssignments(Map<Path,List<TabletLocation>> assignments) {
+ for (Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
+ for (TabletLocation tl : entry.getValue()) {
+
+ Integer count = getCount(tl.tablet_extent);
+
+ counts.put(tl.tablet_extent, count + 1);
+ }
+ }
+ }
+
+ void assignmentsFailed(Map<Path,List<KeyExtent>> assignmentFailures) {
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
+ for (KeyExtent ke : entry.getValue()) {
+
+ Integer count = getCount(ke);
+
+ counts.put(ke, count - 1);
+ }
+ }
+ }
+
+ void assignmentsAbandoned(Map<Path,List<KeyExtent>> completeFailures) {
+ this.completeFailures = completeFailures;
+ }
+
+ void tabletSplit(KeyExtent parent, Collection<KeyExtent> children) {
+ Integer count = getCount(parent);
+
+ counts.remove(parent);
+
+ for (KeyExtent keyExtent : children)
+ counts.put(keyExtent, count);
+ }
+
+ private Integer getCount(KeyExtent parent) {
+ Integer count = counts.get(parent);
+
+ if (count == null) {
+ count = 0;
+ }
+ return count;
+ }
+
+ void unrecoveredMapFiles(Set<Path> failedFailures) {
+ this.failedFailures = failedFailures;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ int totalAssignments = 0;
+ int tabletsImportedTo = 0;
+
+ int min = Integer.MAX_VALUE, max = Integer.MIN_VALUE;
+
+ for (Entry<KeyExtent,Integer> entry : counts.entrySet()) {
+ totalAssignments += entry.getValue();
+ if (entry.getValue() > 0)
+ tabletsImportedTo++;
+
+ if (entry.getValue() < min)
+ min = entry.getValue();
+
+ if (entry.getValue() > max)
+ max = entry.getValue();
+ }
+
+ double stddev = 0;
+
+ for (Entry<KeyExtent,Integer> entry : counts.entrySet())
+ stddev += Math.pow(entry.getValue() - totalAssignments / (double) counts.size(), 2);
+
+ stddev = stddev / counts.size();
+ stddev = Math.sqrt(stddev);
+
+ Set<KeyExtent> failedTablets = new HashSet<KeyExtent>();
+ for (List<KeyExtent> ft : completeFailures.values())
+ failedTablets.addAll(ft);
+
+ sb.append("BULK IMPORT ASSIGNMENT STATISTICS\n");
+ sb.append(String.format("# of map files : %,10d%n", numUniqueMapFiles));
+ sb.append(String.format("# map files with failures : %,10d %6.2f%s%n", completeFailures.size(), completeFailures.size() * 100.0 / numUniqueMapFiles, "%"));
+ sb.append(String.format("# failed failed map files : %,10d %s%n", failedFailures.size(), failedFailures.size() > 0 ? " <-- THIS IS BAD" : ""));
+ sb.append(String.format("# of tablets : %,10d%n", counts.size()));
+ sb.append(String.format("# tablets imported to : %,10d %6.2f%s%n", tabletsImportedTo, tabletsImportedTo * 100.0 / counts.size(), "%"));
+ sb.append(String.format("# tablets with failures : %,10d %6.2f%s%n", failedTablets.size(), failedTablets.size() * 100.0 / counts.size(), "%"));
+ sb.append(String.format("min map files per tablet : %,10d%n", min));
+ sb.append(String.format("max map files per tablet : %,10d%n", max));
+ sb.append(String.format("avg map files per tablet : %,10.2f (std dev = %.2f)%n", totalAssignments / (double) counts.size(), stddev));
+ return sb.toString();
+ }
+ }
+
+}
[18/19] accumulo git commit: Merge branch '1.6' into 1.7
Posted by el...@apache.org.
Merge branch '1.6' into 1.7
Conflicts:
server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/20330055
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/20330055
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/20330055
Branch: refs/heads/master
Commit: 20330055886fd3f093b06b8c5218220b575aaecf
Parents: 9b3d7c1 07ac4c8
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 1 23:28:35 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 1 23:28:35 2015 -0400
----------------------------------------------------------------------
----------------------------------------------------------------------
[11/19] accumulo git commit: ACCUMULO-3986 Eat some common warning
messages on simple.recoverWithEmpty.RecoverWithEmptyTest
Posted by el...@apache.org.
ACCUMULO-3986 Eat some common warning messages on simple.recoverWithEmpty.RecoverWithEmptyTest
This should make the tests a little easier to run successfully.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c6aae57a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c6aae57a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c6aae57a
Branch: refs/heads/1.7
Commit: c6aae57a44eefeba05a3569d2bb08fadecd11275
Parents: c18fe9e
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 1 23:24:26 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 1 23:24:26 2015 -0400
----------------------------------------------------------------------
test/system/auto/simple/recoverWithEmpty.py | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c6aae57a/test/system/auto/simple/recoverWithEmpty.py
----------------------------------------------------------------------
diff --git a/test/system/auto/simple/recoverWithEmpty.py b/test/system/auto/simple/recoverWithEmpty.py
index 18ac055..b5b1253 100755
--- a/test/system/auto/simple/recoverWithEmpty.py
+++ b/test/system/auto/simple/recoverWithEmpty.py
@@ -95,7 +95,13 @@ class RecoverWithEmptyTest(unittest.TestCase, TestUtilsMixin):
out,err,code = self.shell(self.masterHost(), 'online -t test_ingest\n')
self.processResult(out,err,code);
out,err = self.waitForStop(self.runOn(self.masterHost(), [self.accumulo_sh(), 'shell', '-u', ROOT, '-p', ROOT_PASSWORD, '-e', 'scan -t test_ingest']), waitTime)
- self.failUnless(len(out) == 0)
+ lines = out.strip().split('\n')
+ # Poor man's grep -v
+ if len(lines) > 0:
+ # Avoid failing the test if we have some log4j warnings
+ if 'util.NativeCodeLoader' in lines[0]:
+ lines = lines[1:]
+ self.failUnless(len(lines) == 0, "Expected no output, saw '" + out + "'")
self.shutdown_accumulo()
def suite():
[19/19] accumulo git commit: Merge branch '1.7'
Posted by el...@apache.org.
Merge branch '1.7'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5350dbc1
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5350dbc1
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5350dbc1
Branch: refs/heads/master
Commit: 5350dbc11803eb353fdbdd7e5361030e9cdf642b
Parents: 9d25381 2033005
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 1 23:28:45 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 1 23:28:45 2015 -0400
----------------------------------------------------------------------
----------------------------------------------------------------------
[17/19] accumulo git commit: Merge branch '1.6' into 1.7
Posted by el...@apache.org.
Merge branch '1.6' into 1.7
Conflicts:
server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/20330055
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/20330055
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/20330055
Branch: refs/heads/1.7
Commit: 20330055886fd3f093b06b8c5218220b575aaecf
Parents: 9b3d7c1 07ac4c8
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 1 23:28:35 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 1 23:28:35 2015 -0400
----------------------------------------------------------------------
----------------------------------------------------------------------
[06/19] accumulo git commit: ACCUMULO-3985 fix MapReduceTest on newer
Hadoop 2's
Posted by el...@apache.org.
ACCUMULO-3985 fix MapReduceTest on newer Hadoop 2's
To submit a job in newer versions of Hadoop, you need
some Jersey libs on the classpath.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c18fe9e9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c18fe9e9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c18fe9e9
Branch: refs/heads/1.5
Commit: c18fe9e9aef96e49fcd39e9d5e91458916d08ef4
Parents: 08b805b
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 1 23:02:29 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 1 23:02:29 2015 -0400
----------------------------------------------------------------------
test/system/auto/TestUtils.py | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c18fe9e9/test/system/auto/TestUtils.py
----------------------------------------------------------------------
diff --git a/test/system/auto/TestUtils.py b/test/system/auto/TestUtils.py
index 9bb139f..a5da1f4 100644
--- a/test/system/auto/TestUtils.py
+++ b/test/system/auto/TestUtils.py
@@ -86,6 +86,7 @@ $ACCUMULO_HOME/server/target/classes/,
$HADOOP_PREFIX/share/hadoop/hdfs/.*.jar,
$HADOOP_PREFIX/share/hadoop/mapreduce/.*.jar,
$HADOOP_PREFIX/share/hadoop/yarn/.*.jar,
+ $HADOOP_PREFIX/share/hadoop/yarn/lib/jersey.*.jar,
"""
[12/19] accumulo git commit: ACCUMULO-3986 Eat some common warning
messages on simple.recoverWithEmpty.RecoverWithEmptyTest
Posted by el...@apache.org.
ACCUMULO-3986 Eat some common warning messages on simple.recoverWithEmpty.RecoverWithEmptyTest
This should make the tests a little easier to run successfully.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c6aae57a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c6aae57a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c6aae57a
Branch: refs/heads/1.5
Commit: c6aae57a44eefeba05a3569d2bb08fadecd11275
Parents: c18fe9e
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 1 23:24:26 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 1 23:24:26 2015 -0400
----------------------------------------------------------------------
test/system/auto/simple/recoverWithEmpty.py | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c6aae57a/test/system/auto/simple/recoverWithEmpty.py
----------------------------------------------------------------------
diff --git a/test/system/auto/simple/recoverWithEmpty.py b/test/system/auto/simple/recoverWithEmpty.py
index 18ac055..b5b1253 100755
--- a/test/system/auto/simple/recoverWithEmpty.py
+++ b/test/system/auto/simple/recoverWithEmpty.py
@@ -95,7 +95,13 @@ class RecoverWithEmptyTest(unittest.TestCase, TestUtilsMixin):
out,err,code = self.shell(self.masterHost(), 'online -t test_ingest\n')
self.processResult(out,err,code);
out,err = self.waitForStop(self.runOn(self.masterHost(), [self.accumulo_sh(), 'shell', '-u', ROOT, '-p', ROOT_PASSWORD, '-e', 'scan -t test_ingest']), waitTime)
- self.failUnless(len(out) == 0)
+ lines = out.strip().split('\n')
+ # Poor man's grep -v
+ if len(lines) > 0:
+ # Avoid failing the test if we have some log4j warnings
+ if 'util.NativeCodeLoader' in lines[0]:
+ lines = lines[1:]
+ self.failUnless(len(lines) == 0, "Expected no output, saw '" + out + "'")
self.shutdown_accumulo()
def suite():
[07/19] accumulo git commit: ACCUMULO-3985 fix MapReduceTest on newer
Hadoop 2's
Posted by el...@apache.org.
ACCUMULO-3985 fix MapReduceTest on newer Hadoop 2's
To submit a job in newer versions of Hadoop, you need
some Jersey libs on the classpath.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c18fe9e9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c18fe9e9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c18fe9e9
Branch: refs/heads/1.6
Commit: c18fe9e9aef96e49fcd39e9d5e91458916d08ef4
Parents: 08b805b
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 1 23:02:29 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 1 23:02:29 2015 -0400
----------------------------------------------------------------------
test/system/auto/TestUtils.py | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c18fe9e9/test/system/auto/TestUtils.py
----------------------------------------------------------------------
diff --git a/test/system/auto/TestUtils.py b/test/system/auto/TestUtils.py
index 9bb139f..a5da1f4 100644
--- a/test/system/auto/TestUtils.py
+++ b/test/system/auto/TestUtils.py
@@ -86,6 +86,7 @@ $ACCUMULO_HOME/server/target/classes/,
$HADOOP_PREFIX/share/hadoop/hdfs/.*.jar,
$HADOOP_PREFIX/share/hadoop/mapreduce/.*.jar,
$HADOOP_PREFIX/share/hadoop/yarn/.*.jar,
+ $HADOOP_PREFIX/share/hadoop/yarn/lib/jersey.*.jar,
"""
[04/19] accumulo git commit: ACCUMULO-3979 make sure timers get
stopped in the presence of exceptions
Posted by el...@apache.org.
ACCUMULO-3979 make sure timers get stopped in the presence of exceptions
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c767e1d9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c767e1d9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c767e1d9
Branch: refs/heads/master
Commit: c767e1d9f2b9702aecafdf4b1ea30970da712b88
Parents: a1497d1 08b805b
Author: Eric C. Newton <er...@gmail.com>
Authored: Fri Aug 28 15:16:25 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Fri Aug 28 15:16:25 2015 -0400
----------------------------------------------------------------------
.../java/org/apache/accumulo/server/client/BulkImporter.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c767e1d9/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index 0c77367,0000000..32186ea
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@@ -1,791 -1,0 +1,791 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.ServerClient;
+import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.Translators;
+import org.apache.accumulo.core.client.impl.thrift.ClientService;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.accumulo.core.util.StopWatch;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.util.FileUtil;
+import org.apache.accumulo.trace.instrument.TraceRunnable;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TServiceClient;
+
+public class BulkImporter {
+
+ private static final Logger log = Logger.getLogger(BulkImporter.class);
+
+ public static List<String> bulkLoad(AccumuloConfiguration conf, Instance instance, Credentials creds, long tid, String tableId, List<String> files,
+ String errorDir, boolean setTime) throws IOException, AccumuloException, AccumuloSecurityException, ThriftTableOperationException {
+ AssignmentStats stats = new BulkImporter(conf, instance, creds, tid, tableId, setTime).importFiles(files, new Path(errorDir));
+ List<String> result = new ArrayList<String>();
+ for (Path p : stats.completeFailures.keySet()) {
+ result.add(p.toString());
+ }
+ return result;
+ }
+
+ private StopWatch<Timers> timer;
+
+ private static enum Timers {
+ EXAMINE_MAP_FILES, QUERY_METADATA, IMPORT_MAP_FILES, SLEEP, TOTAL
+ }
+
+ private Instance instance;
+ private Credentials credentials;
+ private String tableId;
+ private long tid;
+ private AccumuloConfiguration acuConf;
+ private boolean setTime;
+
+ public BulkImporter(AccumuloConfiguration conf, Instance instance, Credentials credentials, long tid, String tableId, boolean setTime) {
+ this.instance = instance;
+ this.credentials = credentials;
+ this.tid = tid;
+ this.tableId = tableId;
+ this.acuConf = conf;
+ this.setTime = setTime;
+ }
+
+ public AssignmentStats importFiles(List<String> files, Path failureDir) throws IOException, AccumuloException, AccumuloSecurityException,
+ ThriftTableOperationException {
+
+ int numThreads = acuConf.getCount(Property.TSERV_BULK_PROCESS_THREADS);
+ int numAssignThreads = acuConf.getCount(Property.TSERV_BULK_ASSIGNMENT_THREADS);
+
+ timer = new StopWatch<Timers>(Timers.class);
+ timer.start(Timers.TOTAL);
+
+ Configuration conf = CachedConfiguration.getInstance();
+ VolumeManagerImpl.get(acuConf);
+ final VolumeManager fs = VolumeManagerImpl.get(acuConf);
+
+ Set<Path> paths = new HashSet<Path>();
+ for (String file : files) {
+ paths.add(new Path(file));
+ }
+ AssignmentStats assignmentStats = new AssignmentStats(paths.size());
+
+ final Map<Path,List<KeyExtent>> completeFailures = Collections.synchronizedSortedMap(new TreeMap<Path,List<KeyExtent>>());
+
+ ClientService.Client client = null;
+ final TabletLocator locator = TabletLocator.getLocator(instance, new Text(tableId));
+
+ try {
+ final Map<Path,List<TabletLocation>> assignments = Collections.synchronizedSortedMap(new TreeMap<Path,List<TabletLocation>>());
+
+ timer.start(Timers.EXAMINE_MAP_FILES);
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("findOverlapping"));
+
+ for (Path path : paths) {
+ final Path mapFile = path;
+ Runnable getAssignments = new Runnable() {
+ @Override
+ public void run() {
+ List<TabletLocation> tabletsToAssignMapFileTo = Collections.emptyList();
+ try {
+ tabletsToAssignMapFileTo = findOverlappingTablets(ServerConfiguration.getSystemConfiguration(instance), fs, locator, mapFile, credentials);
+ } catch (Exception ex) {
+ log.warn("Unable to find tablets that overlap file " + mapFile.toString(), ex);
+ }
+ log.debug("Map file " + mapFile + " found to overlap " + tabletsToAssignMapFileTo.size() + " tablets");
+ if (tabletsToAssignMapFileTo.size() == 0) {
+ List<KeyExtent> empty = Collections.emptyList();
+ completeFailures.put(mapFile, empty);
+ } else
+ assignments.put(mapFile, tabletsToAssignMapFileTo);
+ }
+ };
+ threadPool.submit(new TraceRunnable(new LoggingRunnable(log, getAssignments)));
+ }
+ threadPool.shutdown();
+ while (!threadPool.isTerminated()) {
+ try {
+ threadPool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ timer.stop(Timers.EXAMINE_MAP_FILES);
+
+ assignmentStats.attemptingAssignments(assignments);
+ Map<Path,List<KeyExtent>> assignmentFailures = assignMapFiles(acuConf, instance, conf, credentials, fs, tableId, assignments, paths, numAssignThreads,
+ numThreads);
+ assignmentStats.assignmentsFailed(assignmentFailures);
+
+ Map<Path,Integer> failureCount = new TreeMap<Path,Integer>();
+
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet())
+ failureCount.put(entry.getKey(), 1);
+
+ long sleepTime = 2 * 1000;
+ while (assignmentFailures.size() > 0) {
+ sleepTime = Math.min(sleepTime * 2, 60 * 1000);
+ locator.invalidateCache();
+ // assumption about assignment failures is that it caused by a split
+ // happening or a missing location
+ //
+ // for splits we need to find children key extents that cover the
+ // same key range and are contiguous (no holes, no overlap)
+
+ timer.start(Timers.SLEEP);
+ UtilWaitThread.sleep(sleepTime);
+ timer.stop(Timers.SLEEP);
+
+ log.debug("Trying to assign " + assignmentFailures.size() + " map files that previously failed on some key extents");
+ assignments.clear();
+
+ // for failed key extents, try to find children key extents to
+ // assign to
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
+ Iterator<KeyExtent> keListIter = entry.getValue().iterator();
+
+ List<TabletLocation> tabletsToAssignMapFileTo = new ArrayList<TabletLocation>();
+
+ while (keListIter.hasNext()) {
+ KeyExtent ke = keListIter.next();
+
++ timer.start(Timers.QUERY_METADATA);
+ try {
- timer.start(Timers.QUERY_METADATA);
+ tabletsToAssignMapFileTo.addAll(findOverlappingTablets(ServerConfiguration.getSystemConfiguration(instance), fs, locator, entry.getKey(), ke,
+ credentials));
- timer.stop(Timers.QUERY_METADATA);
+ keListIter.remove();
+ } catch (Exception ex) {
+ log.warn("Exception finding overlapping tablets, will retry tablet " + ke, ex);
+ }
++ timer.stop(Timers.QUERY_METADATA);
+ }
+
+ if (tabletsToAssignMapFileTo.size() > 0)
+ assignments.put(entry.getKey(), tabletsToAssignMapFileTo);
+ }
+
+ assignmentStats.attemptingAssignments(assignments);
+ Map<Path,List<KeyExtent>> assignmentFailures2 = assignMapFiles(acuConf, instance, conf, credentials, fs, tableId, assignments, paths, numAssignThreads,
+ numThreads);
+ assignmentStats.assignmentsFailed(assignmentFailures2);
+
+ // merge assignmentFailures2 into assignmentFailures
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures2.entrySet()) {
+ assignmentFailures.get(entry.getKey()).addAll(entry.getValue());
+
+ Integer fc = failureCount.get(entry.getKey());
+ if (fc == null)
+ fc = 0;
+
+ failureCount.put(entry.getKey(), fc + 1);
+ }
+
+ // remove map files that have no more key extents to assign
+ Iterator<Entry<Path,List<KeyExtent>>> afIter = assignmentFailures.entrySet().iterator();
+ while (afIter.hasNext()) {
+ Entry<Path,List<KeyExtent>> entry = afIter.next();
+ if (entry.getValue().size() == 0)
+ afIter.remove();
+ }
+
+ Set<Entry<Path,Integer>> failureIter = failureCount.entrySet();
+ for (Entry<Path,Integer> entry : failureIter) {
+ int retries = acuConf.getCount(Property.TSERV_BULK_RETRY);
+ if (entry.getValue() > retries && assignmentFailures.get(entry.getKey()) != null) {
+ log.error("Map file " + entry.getKey() + " failed more than " + retries + " times, giving up.");
+ completeFailures.put(entry.getKey(), assignmentFailures.get(entry.getKey()));
+ assignmentFailures.remove(entry.getKey());
+ }
+ }
+ }
+ assignmentStats.assignmentsAbandoned(completeFailures);
+ Set<Path> failedFailures = processFailures(completeFailures);
+ assignmentStats.unrecoveredMapFiles(failedFailures);
+
+ timer.stop(Timers.TOTAL);
+ printReport(paths);
+ return assignmentStats;
+ } finally {
+ if (client != null) {
+ ServerClient.close(client);
+ }
+ }
+ }
+
+ private void printReport(Set<Path> paths) {
+ long totalTime = 0;
+ for (Timers t : Timers.values()) {
+ if (t == Timers.TOTAL)
+ continue;
+
+ totalTime += timer.get(t);
+ }
+ List<String> files = new ArrayList<String>();
+ for (Path path : paths) {
+ files.add(path.getName());
+ }
+ Collections.sort(files);
+
+ log.debug("BULK IMPORT TIMING STATISTICS");
+ log.debug("Files: " + files);
+ log.debug(String.format("Examine map files : %,10.2f secs %6.2f%s", timer.getSecs(Timers.EXAMINE_MAP_FILES), 100.0 * timer.get(Timers.EXAMINE_MAP_FILES)
+ / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Query %-14s : %,10.2f secs %6.2f%s", MetadataTable.NAME, timer.getSecs(Timers.QUERY_METADATA),
+ 100.0 * timer.get(Timers.QUERY_METADATA) / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Import Map Files : %,10.2f secs %6.2f%s", timer.getSecs(Timers.IMPORT_MAP_FILES), 100.0 * timer.get(Timers.IMPORT_MAP_FILES)
+ / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Sleep : %,10.2f secs %6.2f%s", timer.getSecs(Timers.SLEEP),
+ 100.0 * timer.get(Timers.SLEEP) / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Misc : %,10.2f secs %6.2f%s", (timer.get(Timers.TOTAL) - totalTime) / 1000.0, 100.0
+ * (timer.get(Timers.TOTAL) - totalTime) / timer.get(Timers.TOTAL), "%"));
+ log.debug(String.format("Total : %,10.2f secs", timer.getSecs(Timers.TOTAL)));
+ }
+
+ private Set<Path> processFailures(Map<Path,List<KeyExtent>> completeFailures) {
+ // we should check if map file was not assigned to any tablets, then we
+ // should just move it; not currently being done?
+
+ Set<Entry<Path,List<KeyExtent>>> es = completeFailures.entrySet();
+
+ if (completeFailures.size() == 0)
+ return Collections.emptySet();
+
+ log.debug("The following map files failed ");
+
+ for (Entry<Path,List<KeyExtent>> entry : es) {
+ List<KeyExtent> extents = entry.getValue();
+
+ for (KeyExtent keyExtent : extents)
+ log.debug("\t" + entry.getKey() + " -> " + keyExtent);
+ }
+
+ return Collections.emptySet();
+ }
+
+ private class AssignmentInfo {
+ public AssignmentInfo(KeyExtent keyExtent, Long estSize) {
+ this.ke = keyExtent;
+ this.estSize = estSize;
+ }
+
+ KeyExtent ke;
+ long estSize;
+ }
+
+ private static List<KeyExtent> extentsOf(List<TabletLocation> locations) {
+ List<KeyExtent> result = new ArrayList<KeyExtent>(locations.size());
+ for (TabletLocation tl : locations)
+ result.add(tl.tablet_extent);
+ return result;
+ }
+
+ private Map<Path,List<AssignmentInfo>> estimateSizes(final AccumuloConfiguration acuConf, final Configuration conf, final VolumeManager vm,
+ Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads) {
+
+ long t1 = System.currentTimeMillis();
+ final Map<Path,Long> mapFileSizes = new TreeMap<Path,Long>();
+
+ try {
+ for (Path path : paths) {
+ FileSystem fs = vm.getVolumeByPath(path).getFileSystem();
+ mapFileSizes.put(path, fs.getContentSummary(path).getLength());
+ }
+ } catch (IOException e) {
+ log.error("Failed to get map files in for " + paths + ": " + e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+
+ final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new TreeMap<Path,List<AssignmentInfo>>());
+
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("estimateSizes"));
+
+ for (final Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
+ if (entry.getValue().size() == 1) {
+ TabletLocation tabletLocation = entry.getValue().get(0);
+
+ // if the tablet completely contains the map file, there is no
+ // need to estimate its
+ // size
+ ais.put(entry.getKey(), Collections.singletonList(new AssignmentInfo(tabletLocation.tablet_extent, mapFileSizes.get(entry.getKey()))));
+ continue;
+ }
+
+ Runnable estimationTask = new Runnable() {
+ @Override
+ public void run() {
+ Map<KeyExtent,Long> estimatedSizes = null;
+
+ try {
+ estimatedSizes = FileUtil.estimateSizes(acuConf, entry.getKey(), mapFileSizes.get(entry.getKey()), extentsOf(entry.getValue()), conf, vm);
+ } catch (IOException e) {
+ log.warn("Failed to estimate map file sizes " + e.getMessage());
+ }
+
+ if (estimatedSizes == null) {
+ // estimation failed, do a simple estimation
+ estimatedSizes = new TreeMap<KeyExtent,Long>();
+ long estSize = (long) (mapFileSizes.get(entry.getKey()) / (double) entry.getValue().size());
+ for (TabletLocation tl : entry.getValue())
+ estimatedSizes.put(tl.tablet_extent, estSize);
+ }
+
+ List<AssignmentInfo> assignmentInfoList = new ArrayList<AssignmentInfo>(estimatedSizes.size());
+
+ for (Entry<KeyExtent,Long> entry2 : estimatedSizes.entrySet())
+ assignmentInfoList.add(new AssignmentInfo(entry2.getKey(), entry2.getValue()));
+
+ ais.put(entry.getKey(), assignmentInfoList);
+ }
+ };
+
+ threadPool.submit(new TraceRunnable(new LoggingRunnable(log, estimationTask)));
+ }
+
+ threadPool.shutdown();
+
+ while (!threadPool.isTerminated()) {
+ try {
+ threadPool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ long t2 = System.currentTimeMillis();
+
+ log.debug(String.format("Estimated map files sizes in %6.2f secs", (t2 - t1) / 1000.0));
+
+ return ais;
+ }
+
+ private static Map<KeyExtent,String> locationsOf(Map<Path,List<TabletLocation>> assignments) {
+ Map<KeyExtent,String> result = new HashMap<KeyExtent,String>();
+ for (List<TabletLocation> entry : assignments.values()) {
+ for (TabletLocation tl : entry) {
+ result.put(tl.tablet_extent, tl.tablet_location);
+ }
+ }
+ return result;
+ }
+
+ private Map<Path,List<KeyExtent>> assignMapFiles(AccumuloConfiguration acuConf, Instance instance, Configuration conf, Credentials credentials,
+ VolumeManager fs, String tableId, Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads, int numMapThreads) {
+ timer.start(Timers.EXAMINE_MAP_FILES);
+ Map<Path,List<AssignmentInfo>> assignInfo = estimateSizes(acuConf, conf, fs, assignments, paths, numMapThreads);
+ timer.stop(Timers.EXAMINE_MAP_FILES);
+
+ Map<Path,List<KeyExtent>> ret;
+
+ timer.start(Timers.IMPORT_MAP_FILES);
+ ret = assignMapFiles(credentials, tableId, assignInfo, locationsOf(assignments), numThreads);
+ timer.stop(Timers.IMPORT_MAP_FILES);
+
+ return ret;
+ }
+
+ private class AssignmentTask implements Runnable {
+ final Map<Path,List<KeyExtent>> assignmentFailures;
+ String location;
+ Credentials credentials;
+ private Map<KeyExtent,List<PathSize>> assignmentsPerTablet;
+
+ public AssignmentTask(Credentials credentials, Map<Path,List<KeyExtent>> assignmentFailures, String tableName, String location,
+ Map<KeyExtent,List<PathSize>> assignmentsPerTablet) {
+ this.assignmentFailures = assignmentFailures;
+ this.location = location;
+ this.assignmentsPerTablet = assignmentsPerTablet;
+ this.credentials = credentials;
+ }
+
+ private void handleFailures(Collection<KeyExtent> failures, String message) {
+ for (KeyExtent ke : failures) {
+ List<PathSize> mapFiles = assignmentsPerTablet.get(ke);
+ synchronized (assignmentFailures) {
+ for (PathSize pathSize : mapFiles) {
+ List<KeyExtent> existingFailures = assignmentFailures.get(pathSize.path);
+ if (existingFailures == null) {
+ existingFailures = new ArrayList<KeyExtent>();
+ assignmentFailures.put(pathSize.path, existingFailures);
+ }
+
+ existingFailures.add(ke);
+ }
+ }
+
+ log.info("Could not assign " + mapFiles.size() + " map files to tablet " + ke + " because : " + message + ". Will retry ...");
+ }
+ }
+
+ @Override
+ public void run() {
+ HashSet<Path> uniqMapFiles = new HashSet<Path>();
+ for (List<PathSize> mapFiles : assignmentsPerTablet.values())
+ for (PathSize ps : mapFiles)
+ uniqMapFiles.add(ps.path);
+
+ log.debug("Assigning " + uniqMapFiles.size() + " map files to " + assignmentsPerTablet.size() + " tablets at " + location);
+
+ try {
+ List<KeyExtent> failures = assignMapFiles(credentials, location, assignmentsPerTablet);
+ handleFailures(failures, "Not Serving Tablet");
+ } catch (AccumuloException e) {
+ handleFailures(assignmentsPerTablet.keySet(), e.getMessage());
+ } catch (AccumuloSecurityException e) {
+ handleFailures(assignmentsPerTablet.keySet(), e.getMessage());
+ }
+ }
+
+ }
+
+ private class PathSize {
+ public PathSize(Path mapFile, long estSize) {
+ this.path = mapFile;
+ this.estSize = estSize;
+ }
+
+ Path path;
+ long estSize;
+
+ @Override
+ public String toString() {
+ return path + " " + estSize;
+ }
+ }
+
+ private Map<Path,List<KeyExtent>> assignMapFiles(Credentials credentials, String tableName, Map<Path,List<AssignmentInfo>> assignments,
+ Map<KeyExtent,String> locations, int numThreads) {
+
+ // group assignments by tablet
+ Map<KeyExtent,List<PathSize>> assignmentsPerTablet = new TreeMap<KeyExtent,List<PathSize>>();
+ for (Entry<Path,List<AssignmentInfo>> entry : assignments.entrySet()) {
+ Path mapFile = entry.getKey();
+ List<AssignmentInfo> tabletsToAssignMapFileTo = entry.getValue();
+
+ for (AssignmentInfo ai : tabletsToAssignMapFileTo) {
+ List<PathSize> mapFiles = assignmentsPerTablet.get(ai.ke);
+ if (mapFiles == null) {
+ mapFiles = new ArrayList<PathSize>();
+ assignmentsPerTablet.put(ai.ke, mapFiles);
+ }
+
+ mapFiles.add(new PathSize(mapFile, ai.estSize));
+ }
+ }
+
+ // group assignments by tabletserver
+
+ Map<Path,List<KeyExtent>> assignmentFailures = Collections.synchronizedMap(new TreeMap<Path,List<KeyExtent>>());
+
+ TreeMap<String,Map<KeyExtent,List<PathSize>>> assignmentsPerTabletServer = new TreeMap<String,Map<KeyExtent,List<PathSize>>>();
+
+ for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
+ KeyExtent ke = entry.getKey();
+ String location = locations.get(ke);
+
+ if (location == null) {
+ for (PathSize pathSize : entry.getValue()) {
+ synchronized (assignmentFailures) {
+ List<KeyExtent> failures = assignmentFailures.get(pathSize.path);
+ if (failures == null) {
+ failures = new ArrayList<KeyExtent>();
+ assignmentFailures.put(pathSize.path, failures);
+ }
+
+ failures.add(ke);
+ }
+ }
+
+ log.warn("Could not assign " + entry.getValue().size() + " map files to tablet " + ke + " because it had no location, will retry ...");
+
+ continue;
+ }
+
+ Map<KeyExtent,List<PathSize>> apt = assignmentsPerTabletServer.get(location);
+ if (apt == null) {
+ apt = new TreeMap<KeyExtent,List<PathSize>>();
+ assignmentsPerTabletServer.put(location, apt);
+ }
+
+ apt.put(entry.getKey(), entry.getValue());
+ }
+
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("submit"));
+
+ for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer.entrySet()) {
+ String location = entry.getKey();
+ threadPool.submit(new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue()));
+ }
+
+ threadPool.shutdown();
+
+ while (!threadPool.isTerminated()) {
+ try {
+ threadPool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ return assignmentFailures;
+ }
+
+ private List<KeyExtent> assignMapFiles(Credentials credentials, String location, Map<KeyExtent,List<PathSize>> assignmentsPerTablet)
+ throws AccumuloException, AccumuloSecurityException {
+ try {
+ long timeInMillis = ServerConfiguration.getSystemConfiguration(instance).getTimeInMillis(Property.TSERV_BULK_TIMEOUT);
+ TabletClientService.Iface client = ThriftUtil.getTServerClient(location, ServerConfiguration.getSystemConfiguration(instance), timeInMillis);
+ try {
+ HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>> files = new HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>();
+ for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
+ HashMap<String,org.apache.accumulo.core.data.thrift.MapFileInfo> tabletFiles = new HashMap<String,org.apache.accumulo.core.data.thrift.MapFileInfo>();
+ files.put(entry.getKey(), tabletFiles);
+
+ for (PathSize pathSize : entry.getValue()) {
+ org.apache.accumulo.core.data.thrift.MapFileInfo mfi = new org.apache.accumulo.core.data.thrift.MapFileInfo(pathSize.estSize);
+ tabletFiles.put(pathSize.path.toString(), mfi);
+ }
+ }
+
+ log.debug("Asking " + location + " to bulk load " + files);
+ List<TKeyExtent> failures = client.bulkImport(Tracer.traceInfo(), credentials.toThrift(instance), tid, Translator.translate(files, Translators.KET),
+ setTime);
+
+ return Translator.translate(failures, Translators.TKET);
+ } finally {
+ ThriftUtil.returnClient((TServiceClient) client);
+ }
+ } catch (ThriftSecurityException e) {
+ throw new AccumuloSecurityException(e.user, e.code, e);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ throw new AccumuloException(t);
+ }
+ }
+
+ public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager fs, TabletLocator locator, Path file,
+ Credentials credentials) throws Exception {
+ return findOverlappingTablets(acuConf, fs, locator, file, null, null, credentials);
+ }
+
+ public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager fs, TabletLocator locator, Path file,
+ KeyExtent failed, Credentials credentials) throws Exception {
+ locator.invalidateCache(failed);
+ Text start = getStartRowForExtent(failed);
+ return findOverlappingTablets(acuConf, fs, locator, file, start, failed.getEndRow(), credentials);
+ }
+
+ protected static Text getStartRowForExtent(KeyExtent extent) {
+ Text start = extent.getPrevEndRow();
+ if (start != null) {
+ start = new Text(start);
+ // ACCUMULO-3967 We want the first possible key in this tablet, not the following row from the previous tablet
+ start.append(byte0, 0, 1);
+ }
+ return start;
+ }
+
+ final static byte[] byte0 = {0};
+
+ public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf, VolumeManager vm, TabletLocator locator, Path file, Text startRow,
+ Text endRow, Credentials credentials) throws Exception {
+ List<TabletLocation> result = new ArrayList<TabletLocation>();
+ Collection<ByteSequence> columnFamilies = Collections.emptyList();
+ String filename = file.toString();
+ // log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow);
+ FileSystem fs = vm.getVolumeByPath(file).getFileSystem();
+ FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), acuConf);
+ try {
+ Text row = startRow;
+ if (row == null)
+ row = new Text();
+ while (true) {
+ // log.debug(filename + " Seeking to row " + row);
+ reader.seek(new Range(row, null), columnFamilies, false);
+ if (!reader.hasTop()) {
+ // log.debug(filename + " not found");
+ break;
+ }
+ row = reader.getTopKey().getRow();
+ TabletLocation tabletLocation = locator.locateTablet(credentials, row, false, true);
+ // log.debug(filename + " found row " + row + " at location " + tabletLocation);
+ result.add(tabletLocation);
+ row = tabletLocation.tablet_extent.getEndRow();
+ if (row != null && (endRow == null || row.compareTo(endRow) < 0)) {
+ row = new Text(row);
+ row.append(byte0, 0, byte0.length);
+ } else
+ break;
+ }
+ } finally {
+ reader.close();
+ }
+ // log.debug(filename + " to be sent to " + result);
+ return result;
+ }
+
+ public static class AssignmentStats {
+ private Map<KeyExtent,Integer> counts;
+ private int numUniqueMapFiles;
+ private Map<Path,List<KeyExtent>> completeFailures = null;
+ private Set<Path> failedFailures = null;
+
+ AssignmentStats(int fileCount) {
+ counts = new HashMap<KeyExtent,Integer>();
+ numUniqueMapFiles = fileCount;
+ }
+
+ void attemptingAssignments(Map<Path,List<TabletLocation>> assignments) {
+ for (Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
+ for (TabletLocation tl : entry.getValue()) {
+
+ Integer count = getCount(tl.tablet_extent);
+
+ counts.put(tl.tablet_extent, count + 1);
+ }
+ }
+ }
+
+ void assignmentsFailed(Map<Path,List<KeyExtent>> assignmentFailures) {
+ for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
+ for (KeyExtent ke : entry.getValue()) {
+
+ Integer count = getCount(ke);
+
+ counts.put(ke, count - 1);
+ }
+ }
+ }
+
+ void assignmentsAbandoned(Map<Path,List<KeyExtent>> completeFailures) {
+ this.completeFailures = completeFailures;
+ }
+
+ void tabletSplit(KeyExtent parent, Collection<KeyExtent> children) {
+ Integer count = getCount(parent);
+
+ counts.remove(parent);
+
+ for (KeyExtent keyExtent : children)
+ counts.put(keyExtent, count);
+ }
+
+ private Integer getCount(KeyExtent parent) {
+ Integer count = counts.get(parent);
+
+ if (count == null) {
+ count = 0;
+ }
+ return count;
+ }
+
+ void unrecoveredMapFiles(Set<Path> failedFailures) {
+ this.failedFailures = failedFailures;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ int totalAssignments = 0;
+ int tabletsImportedTo = 0;
+
+ int min = Integer.MAX_VALUE, max = Integer.MIN_VALUE;
+
+ for (Entry<KeyExtent,Integer> entry : counts.entrySet()) {
+ totalAssignments += entry.getValue();
+ if (entry.getValue() > 0)
+ tabletsImportedTo++;
+
+ if (entry.getValue() < min)
+ min = entry.getValue();
+
+ if (entry.getValue() > max)
+ max = entry.getValue();
+ }
+
+ double stddev = 0;
+
+ for (Entry<KeyExtent,Integer> entry : counts.entrySet())
+ stddev += Math.pow(entry.getValue() - totalAssignments / (double) counts.size(), 2);
+
+ stddev = stddev / counts.size();
+ stddev = Math.sqrt(stddev);
+
+ Set<KeyExtent> failedTablets = new HashSet<KeyExtent>();
+ for (List<KeyExtent> ft : completeFailures.values())
+ failedTablets.addAll(ft);
+
+ sb.append("BULK IMPORT ASSIGNMENT STATISTICS\n");
+ sb.append(String.format("# of map files : %,10d%n", numUniqueMapFiles));
+ sb.append(String.format("# map files with failures : %,10d %6.2f%s%n", completeFailures.size(), completeFailures.size() * 100.0 / numUniqueMapFiles, "%"));
+ sb.append(String.format("# failed failed map files : %,10d %s%n", failedFailures.size(), failedFailures.size() > 0 ? " <-- THIS IS BAD" : ""));
+ sb.append(String.format("# of tablets : %,10d%n", counts.size()));
+ sb.append(String.format("# tablets imported to : %,10d %6.2f%s%n", tabletsImportedTo, tabletsImportedTo * 100.0 / counts.size(), "%"));
+ sb.append(String.format("# tablets with failures : %,10d %6.2f%s%n", failedTablets.size(), failedTablets.size() * 100.0 / counts.size(), "%"));
+ sb.append(String.format("min map files per tablet : %,10d%n", min));
+ sb.append(String.format("max map files per tablet : %,10d%n", max));
+ sb.append(String.format("avg map files per tablet : %,10.2f (std dev = %.2f)%n", totalAssignments / (double) counts.size(), stddev));
+ return sb.toString();
+ }
+ }
+
+}
[08/19] accumulo git commit: ACCUMULO-3985 fix MapReduceTest on newer
Hadoop 2's
Posted by el...@apache.org.
ACCUMULO-3985 fix MapReduceTest on newer Hadoop 2's
To submit a job in newer versions of Hadoop, you need
some Jersey libs on the classpath.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c18fe9e9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c18fe9e9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c18fe9e9
Branch: refs/heads/1.7
Commit: c18fe9e9aef96e49fcd39e9d5e91458916d08ef4
Parents: 08b805b
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 1 23:02:29 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 1 23:02:29 2015 -0400
----------------------------------------------------------------------
test/system/auto/TestUtils.py | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c18fe9e9/test/system/auto/TestUtils.py
----------------------------------------------------------------------
diff --git a/test/system/auto/TestUtils.py b/test/system/auto/TestUtils.py
index 9bb139f..a5da1f4 100644
--- a/test/system/auto/TestUtils.py
+++ b/test/system/auto/TestUtils.py
@@ -86,6 +86,7 @@ $ACCUMULO_HOME/server/target/classes/,
$HADOOP_PREFIX/share/hadoop/hdfs/.*.jar,
$HADOOP_PREFIX/share/hadoop/mapreduce/.*.jar,
$HADOOP_PREFIX/share/hadoop/yarn/.*.jar,
+ $HADOOP_PREFIX/share/hadoop/yarn/lib/jersey.*.jar,
"""
[13/19] accumulo git commit: ACCUMULO-3986 Eat some common warning
messages on simple.recoverWithEmpty.RecoverWithEmptyTest
Posted by el...@apache.org.
ACCUMULO-3986 Eat some common warning messages on simple.recoverWithEmpty.RecoverWithEmptyTest
This should make the tests a little easier to run successfully.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c6aae57a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c6aae57a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c6aae57a
Branch: refs/heads/1.6
Commit: c6aae57a44eefeba05a3569d2bb08fadecd11275
Parents: c18fe9e
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 1 23:24:26 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 1 23:24:26 2015 -0400
----------------------------------------------------------------------
test/system/auto/simple/recoverWithEmpty.py | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c6aae57a/test/system/auto/simple/recoverWithEmpty.py
----------------------------------------------------------------------
diff --git a/test/system/auto/simple/recoverWithEmpty.py b/test/system/auto/simple/recoverWithEmpty.py
index 18ac055..b5b1253 100755
--- a/test/system/auto/simple/recoverWithEmpty.py
+++ b/test/system/auto/simple/recoverWithEmpty.py
@@ -95,7 +95,13 @@ class RecoverWithEmptyTest(unittest.TestCase, TestUtilsMixin):
out,err,code = self.shell(self.masterHost(), 'online -t test_ingest\n')
self.processResult(out,err,code);
out,err = self.waitForStop(self.runOn(self.masterHost(), [self.accumulo_sh(), 'shell', '-u', ROOT, '-p', ROOT_PASSWORD, '-e', 'scan -t test_ingest']), waitTime)
- self.failUnless(len(out) == 0)
+ lines = out.strip().split('\n')
+ # Poor man's grep -v
+ if len(lines) > 0:
+ # Avoid failing the test if we have some log4j warnings
+ if 'util.NativeCodeLoader' in lines[0]:
+ lines = lines[1:]
+ self.failUnless(len(lines) == 0, "Expected no output, saw '" + out + "'")
self.shutdown_accumulo()
def suite():
[10/19] accumulo git commit: ACCUMULO-3986 Eat some common warning
messages on simple.recoverWithEmpty.RecoverWithEmptyTest
Posted by el...@apache.org.
ACCUMULO-3986 Eat some common warning messages on simple.recoverWithEmpty.RecoverWithEmptyTest
This should make the tests a little easier to run successfully.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c6aae57a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c6aae57a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c6aae57a
Branch: refs/heads/master
Commit: c6aae57a44eefeba05a3569d2bb08fadecd11275
Parents: c18fe9e
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 1 23:24:26 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 1 23:24:26 2015 -0400
----------------------------------------------------------------------
test/system/auto/simple/recoverWithEmpty.py | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c6aae57a/test/system/auto/simple/recoverWithEmpty.py
----------------------------------------------------------------------
diff --git a/test/system/auto/simple/recoverWithEmpty.py b/test/system/auto/simple/recoverWithEmpty.py
index 18ac055..b5b1253 100755
--- a/test/system/auto/simple/recoverWithEmpty.py
+++ b/test/system/auto/simple/recoverWithEmpty.py
@@ -95,7 +95,13 @@ class RecoverWithEmptyTest(unittest.TestCase, TestUtilsMixin):
out,err,code = self.shell(self.masterHost(), 'online -t test_ingest\n')
self.processResult(out,err,code);
out,err = self.waitForStop(self.runOn(self.masterHost(), [self.accumulo_sh(), 'shell', '-u', ROOT, '-p', ROOT_PASSWORD, '-e', 'scan -t test_ingest']), waitTime)
- self.failUnless(len(out) == 0)
+ lines = out.strip().split('\n')
+ # Poor man's grep -v
+ if len(lines) > 0:
+ # Avoid failing the test if we have some log4j warnings
+ if 'util.NativeCodeLoader' in lines[0]:
+ lines = lines[1:]
+ self.failUnless(len(lines) == 0, "Expected no output, saw '" + out + "'")
self.shutdown_accumulo()
def suite():
[14/19] accumulo git commit: Merge branch '1.5' into 1.6
Posted by el...@apache.org.
Merge branch '1.5' into 1.6
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/07ac4c85
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/07ac4c85
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/07ac4c85
Branch: refs/heads/1.6
Commit: 07ac4c85f30e57d094df8c1b481fe02b3237d2a7
Parents: c767e1d c6aae57
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 1 23:27:36 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 1 23:27:36 2015 -0400
----------------------------------------------------------------------
----------------------------------------------------------------------
[15/19] accumulo git commit: Merge branch '1.5' into 1.6
Posted by el...@apache.org.
Merge branch '1.5' into 1.6
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/07ac4c85
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/07ac4c85
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/07ac4c85
Branch: refs/heads/1.7
Commit: 07ac4c85f30e57d094df8c1b481fe02b3237d2a7
Parents: c767e1d c6aae57
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 1 23:27:36 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 1 23:27:36 2015 -0400
----------------------------------------------------------------------
----------------------------------------------------------------------
[02/19] accumulo git commit: ACCUMULO-3979 make sure timers get
stopped in the presence of exceptions
Posted by el...@apache.org.
ACCUMULO-3979 make sure timers get stopped in the presence of exceptions
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/08b805bd
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/08b805bd
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/08b805bd
Branch: refs/heads/master
Commit: 08b805bd9eca422a1f0423dd5e1734b4321c4b0e
Parents: 5cd2063
Author: Eric C. Newton <er...@gmail.com>
Authored: Fri Aug 28 15:14:07 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Fri Aug 28 15:14:07 2015 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/server/client/BulkImporter.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/08b805bd/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index 0f89e73..a68b9ec 100644
--- a/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -143,6 +143,7 @@ public class BulkImporter {
for (Path path : paths) {
final Path mapFile = path;
Runnable getAssignments = new Runnable() {
+ @Override
public void run() {
List<TabletLocation> tabletsToAssignMapFileTo = Collections.emptyList();
try {
@@ -207,14 +208,14 @@ public class BulkImporter {
while (keListIter.hasNext()) {
KeyExtent ke = keListIter.next();
+ timer.start(Timers.QUERY_METADATA);
try {
- timer.start(Timers.QUERY_METADATA);
tabletsToAssignMapFileTo.addAll(findOverlappingTablets(instance.getConfiguration(), fs, locator, entry.getKey(), ke, credentials));
- timer.stop(Timers.QUERY_METADATA);
keListIter.remove();
} catch (Exception ex) {
log.warn("Exception finding overlapping tablets, will retry tablet " + ke);
}
+ timer.stop(Timers.QUERY_METADATA);
}
if (tabletsToAssignMapFileTo.size() > 0)
@@ -361,6 +362,7 @@ public class BulkImporter {
}
Runnable estimationTask = new Runnable() {
+ @Override
public void run() {
Map<KeyExtent,Long> estimatedSizes = null;
@@ -466,6 +468,7 @@ public class BulkImporter {
}
}
+ @Override
public void run() {
HashSet<Path> uniqMapFiles = new HashSet<Path>();
for (List<PathSize> mapFiles : assignmentsPerTablet.values())
@@ -495,6 +498,7 @@ public class BulkImporter {
Path path;
long estSize;
+ @Override
public String toString() {
return path + " " + estSize;
}
@@ -729,6 +733,7 @@ public class BulkImporter {
this.failedFailures = failedFailures;
}
+ @Override
public String toString() {
StringBuilder sb = new StringBuilder();
int totalAssignments = 0;
[09/19] accumulo git commit: ACCUMULO-3985 fix MapReduceTest on newer
Hadoop 2's
Posted by el...@apache.org.
ACCUMULO-3985 fix MapReduceTest on newer Hadoop 2's
To submit a job in newer versions of Hadoop, you need
some Jersey libs on the classpath.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c18fe9e9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c18fe9e9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c18fe9e9
Branch: refs/heads/master
Commit: c18fe9e9aef96e49fcd39e9d5e91458916d08ef4
Parents: 08b805b
Author: Josh Elser <el...@apache.org>
Authored: Tue Sep 1 23:02:29 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue Sep 1 23:02:29 2015 -0400
----------------------------------------------------------------------
test/system/auto/TestUtils.py | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c18fe9e9/test/system/auto/TestUtils.py
----------------------------------------------------------------------
diff --git a/test/system/auto/TestUtils.py b/test/system/auto/TestUtils.py
index 9bb139f..a5da1f4 100644
--- a/test/system/auto/TestUtils.py
+++ b/test/system/auto/TestUtils.py
@@ -86,6 +86,7 @@ $ACCUMULO_HOME/server/target/classes/,
$HADOOP_PREFIX/share/hadoop/hdfs/.*.jar,
$HADOOP_PREFIX/share/hadoop/mapreduce/.*.jar,
$HADOOP_PREFIX/share/hadoop/yarn/.*.jar,
+ $HADOOP_PREFIX/share/hadoop/yarn/lib/jersey.*.jar,
"""