You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/01/09 03:44:25 UTC
[21/66] [abbrv] accumulo git commit: ACCUMULO-3451 Format master
branch (1.7.0-SNAPSHOT)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
index c424f1a..b55cfd7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -107,12 +107,13 @@ public class Accumulo {
}
/**
- * Finds the best log4j configuration file. A generic file is used only if an
- * application-specific file is not available. An XML file is preferred over
- * a properties file, if possible.
+ * Finds the best log4j configuration file. A generic file is used only if an application-specific file is not available. An XML file is preferred over a
+ * properties file, if possible.
*
- * @param confDir directory where configuration files should reside
- * @param application application name for configuration file name
+ * @param confDir
+ * directory where configuration files should reside
+ * @param application
+ * application name for configuration file name
* @return configuration file name
*/
static String locateLogConfig(String confDir, String application) {
@@ -120,13 +121,9 @@ public class Accumulo {
if (explicitConfigFile != null) {
return explicitConfigFile;
}
- String[] configFiles = {
- String.format("%s/%s_logger.xml", confDir, application),
- String.format("%s/%s_logger.properties", confDir, application),
- String.format("%s/generic_logger.xml", confDir),
- String.format("%s/generic_logger.properties", confDir)
- };
- String defaultConfigFile = configFiles[2]; // generic_logger.xml
+ String[] configFiles = {String.format("%s/%s_logger.xml", confDir, application), String.format("%s/%s_logger.properties", confDir, application),
+ String.format("%s/generic_logger.xml", confDir), String.format("%s/generic_logger.properties", confDir)};
+ String defaultConfigFile = configFiles[2]; // generic_logger.xml
for (String f : configFiles) {
if (new File(f).exists()) {
return f;
@@ -196,7 +193,8 @@ public class Accumulo {
// Encourage users to configure TLS
final String SSL = "SSL";
- for (Property sslProtocolProperty : Arrays.asList(Property.RPC_SSL_CLIENT_PROTOCOL, Property.RPC_SSL_ENABLED_PROTOCOLS, Property.MONITOR_SSL_INCLUDE_PROTOCOLS)) {
+ for (Property sslProtocolProperty : Arrays.asList(Property.RPC_SSL_CLIENT_PROTOCOL, Property.RPC_SSL_ENABLED_PROTOCOLS,
+ Property.MONITOR_SSL_INCLUDE_PROTOCOLS)) {
String value = conf.get(sslProtocolProperty);
if (value.contains(SSL)) {
log.warn("It is recommended that " + sslProtocolProperty + " only allow TLS");
@@ -206,7 +204,9 @@ public class Accumulo {
/**
* Sanity check that the current persistent version is allowed to upgrade to the version of Accumulo running.
- * @param dataVersion the version that is persisted in the backing Volumes
+ *
+ * @param dataVersion
+ * the version that is persisted in the backing Volumes
*/
public static boolean canUpgradeFromDataVersion(final int dataVersion) {
return ServerConstants.CAN_UPGRADE.get(dataVersion);
@@ -280,7 +280,7 @@ public class Accumulo {
if (unknownHostTries > 0) {
log.warn("Unable to connect to HDFS, will retry. cause: " + exception.getCause());
/* We need to make sure our sleep period is long enough to avoid getting a cached failure of the host lookup. */
- sleep = Math.max(sleep, (AddressUtil.getAddressCacheNegativeTtl((UnknownHostException)(exception.getCause()))+1)*1000);
+ sleep = Math.max(sleep, (AddressUtil.getAddressCacheNegativeTtl((UnknownHostException) (exception.getCause())) + 1) * 1000);
} else {
log.error("Unable to connect to HDFS and have exceeded max number of retries.", exception);
throw exception;
@@ -299,10 +299,9 @@ public class Accumulo {
}
/**
- * Exit loudly if there are outstanding Fate operations.
- * Since Fate serializes class names, we need to make sure there are no queued
- * transactions from a previous version before continuing an upgrade. The status of the operations is
- * irrelevant; those in SUCCESSFUL status cause the same problem as those just queued.
+ * Exit loudly if there are outstanding Fate operations. Since Fate serializes class names, we need to make sure there are no queued transactions from a
+ * previous version before continuing an upgrade. The status of the operations is irrelevant; those in SUCCESSFUL status cause the same problem as those just
+ * queued.
*
* Note that the Master should not allow write access to Fate until after all upgrade steps are complete.
*
@@ -312,10 +311,11 @@ public class Accumulo {
*/
public static void abortIfFateTransactions() {
try {
- final ReadOnlyTStore<Accumulo> fate = new ReadOnlyStore<Accumulo>(new ZooStore<Accumulo>(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZFATE,
- ZooReaderWriter.getInstance()));
+ final ReadOnlyTStore<Accumulo> fate = new ReadOnlyStore<Accumulo>(new ZooStore<Accumulo>(
+ ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZFATE, ZooReaderWriter.getInstance()));
if (!(fate.list().isEmpty())) {
- throw new AccumuloException("Aborting upgrade because there are outstanding FATE transactions from a previous Accumulo version. Please see the README document for instructions on what to do under your previous version.");
+ throw new AccumuloException(
+ "Aborting upgrade because there are outstanding FATE transactions from a previous Accumulo version. Please see the README document for instructions on what to do under your previous version.");
}
} catch (Exception exception) {
log.fatal("Problem verifying Fate readiness", exception);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/ServerOpts.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerOpts.java b/server/base/src/main/java/org/apache/accumulo/server/ServerOpts.java
index 95fee8f..bbe0dd2 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerOpts.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerOpts.java
@@ -21,12 +21,12 @@ import org.apache.accumulo.core.cli.Help;
import com.beust.jcommander.Parameter;
public class ServerOpts extends Help {
- @Parameter(names={"-a", "--address"}, description = "address to bind to")
+ @Parameter(names = {"-a", "--address"}, description = "address to bind to")
String address = null;
-
+
public String getAddress() {
if (address != null)
return address;
return "0.0.0.0";
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnDefaultTable.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnDefaultTable.java b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnDefaultTable.java
index 588c35c..c347994 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnDefaultTable.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnDefaultTable.java
@@ -30,7 +30,7 @@ public class ClientOnDefaultTable extends org.apache.accumulo.core.cli.ClientOnD
synchronized public Instance getInstance() {
if (cachedInstance != null)
return cachedInstance;
-
+
if (mock)
return cachedInstance = new MockInstance(instance);
if (instance == null) {
@@ -38,6 +38,7 @@ public class ClientOnDefaultTable extends org.apache.accumulo.core.cli.ClientOnD
}
return cachedInstance = new ZooKeeperInstance(this.getClientConfiguration());
}
+
public ClientOnDefaultTable(String table) {
super(table);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnRequiredTable.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnRequiredTable.java b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnRequiredTable.java
index f2e04e4..38926be 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnRequiredTable.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOnRequiredTable.java
@@ -25,12 +25,12 @@ public class ClientOnRequiredTable extends org.apache.accumulo.core.cli.ClientOn
{
principal = "root";
}
-
+
@Override
synchronized public Instance getInstance() {
if (cachedInstance != null)
return cachedInstance;
-
+
if (mock)
return cachedInstance = new MockInstance(instance);
if (instance == null) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOpts.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOpts.java b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOpts.java
index c19b7b0..0a7714d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOpts.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/cli/ClientOpts.java
@@ -22,7 +22,7 @@ import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.server.client.HdfsZooInstance;
public class ClientOpts extends org.apache.accumulo.core.cli.ClientOpts {
-
+
{
principal = "root";
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index 8171555..01d03ed 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -87,34 +87,34 @@ public class BulkImporter {
}
return result;
}
-
+
private StopWatch<Timers> timer;
-
+
private static enum Timers {
EXAMINE_MAP_FILES, QUERY_METADATA, IMPORT_MAP_FILES, SLEEP, TOTAL
}
-
+
private final ClientContext context;
private String tableId;
private long tid;
private boolean setTime;
-
+
public BulkImporter(ClientContext context, long tid, String tableId, boolean setTime) {
this.context = context;
this.tid = tid;
this.tableId = tableId;
this.setTime = setTime;
}
-
+
public AssignmentStats importFiles(List<String> files, Path failureDir) throws IOException, AccumuloException, AccumuloSecurityException,
ThriftTableOperationException {
-
+
int numThreads = context.getConfiguration().getCount(Property.TSERV_BULK_PROCESS_THREADS);
int numAssignThreads = context.getConfiguration().getCount(Property.TSERV_BULK_ASSIGNMENT_THREADS);
-
+
timer = new StopWatch<Timers>(Timers.class);
timer.start(Timers.TOTAL);
-
+
Configuration conf = CachedConfiguration.getInstance();
VolumeManagerImpl.get(context.getConfiguration());
final VolumeManager fs = VolumeManagerImpl.get(context.getConfiguration());
@@ -124,18 +124,18 @@ public class BulkImporter {
paths.add(new Path(file));
}
AssignmentStats assignmentStats = new AssignmentStats(paths.size());
-
+
final Map<Path,List<KeyExtent>> completeFailures = Collections.synchronizedSortedMap(new TreeMap<Path,List<KeyExtent>>());
-
+
ClientService.Client client = null;
final TabletLocator locator = TabletLocator.getLocator(context, new Text(tableId));
-
+
try {
final Map<Path,List<TabletLocation>> assignments = Collections.synchronizedSortedMap(new TreeMap<Path,List<TabletLocation>>());
-
+
timer.start(Timers.EXAMINE_MAP_FILES);
ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("findOverlapping"));
-
+
for (Path path : paths) {
final Path mapFile = path;
Runnable getAssignments = new Runnable() {
@@ -166,16 +166,16 @@ public class BulkImporter {
}
}
timer.stop(Timers.EXAMINE_MAP_FILES);
-
+
assignmentStats.attemptingAssignments(assignments);
Map<Path,List<KeyExtent>> assignmentFailures = assignMapFiles(context, conf, fs, tableId, assignments, paths, numAssignThreads, numThreads);
assignmentStats.assignmentsFailed(assignmentFailures);
-
+
Map<Path,Integer> failureCount = new TreeMap<Path,Integer>();
-
+
for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet())
failureCount.put(entry.getKey(), 1);
-
+
long sleepTime = 2 * 1000;
while (assignmentFailures.size() > 0) {
sleepTime = Math.min(sleepTime * 2, 60 * 1000);
@@ -185,24 +185,24 @@ public class BulkImporter {
//
// for splits we need to find children key extents that cover the
// same key range and are contiguous (no holes, no overlap)
-
+
timer.start(Timers.SLEEP);
UtilWaitThread.sleep(sleepTime);
timer.stop(Timers.SLEEP);
-
+
log.debug("Trying to assign " + assignmentFailures.size() + " map files that previously failed on some key extents");
assignments.clear();
-
+
// for failed key extents, try to find children key extents to
// assign to
for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
Iterator<KeyExtent> keListIter = entry.getValue().iterator();
-
+
List<TabletLocation> tabletsToAssignMapFileTo = new ArrayList<TabletLocation>();
-
+
while (keListIter.hasNext()) {
KeyExtent ke = keListIter.next();
-
+
try {
timer.start(Timers.QUERY_METADATA);
tabletsToAssignMapFileTo.addAll(findOverlappingTablets(context, fs, locator, entry.getKey(), ke));
@@ -212,26 +212,26 @@ public class BulkImporter {
log.warn("Exception finding overlapping tablets, will retry tablet " + ke, ex);
}
}
-
+
if (tabletsToAssignMapFileTo.size() > 0)
assignments.put(entry.getKey(), tabletsToAssignMapFileTo);
}
-
+
assignmentStats.attemptingAssignments(assignments);
Map<Path,List<KeyExtent>> assignmentFailures2 = assignMapFiles(context, conf, fs, tableId, assignments, paths, numAssignThreads, numThreads);
assignmentStats.assignmentsFailed(assignmentFailures2);
-
+
// merge assignmentFailures2 into assignmentFailures
for (Entry<Path,List<KeyExtent>> entry : assignmentFailures2.entrySet()) {
assignmentFailures.get(entry.getKey()).addAll(entry.getValue());
-
+
Integer fc = failureCount.get(entry.getKey());
if (fc == null)
fc = 0;
-
+
failureCount.put(entry.getKey(), fc + 1);
}
-
+
// remove map files that have no more key extents to assign
Iterator<Entry<Path,List<KeyExtent>>> afIter = assignmentFailures.entrySet().iterator();
while (afIter.hasNext()) {
@@ -239,7 +239,7 @@ public class BulkImporter {
if (entry.getValue().size() == 0)
afIter.remove();
}
-
+
Set<Entry<Path,Integer>> failureIter = failureCount.entrySet();
for (Entry<Path,Integer> entry : failureIter) {
int retries = context.getConfiguration().getCount(Property.TSERV_BULK_RETRY);
@@ -253,7 +253,7 @@ public class BulkImporter {
assignmentStats.assignmentsAbandoned(completeFailures);
Set<Path> failedFailures = processFailures(completeFailures);
assignmentStats.unrecoveredMapFiles(failedFailures);
-
+
timer.stop(Timers.TOTAL);
printReport(paths);
return assignmentStats;
@@ -263,13 +263,13 @@ public class BulkImporter {
locator.invalidateCache();
}
}
-
+
private void printReport(Set<Path> paths) {
long totalTime = 0;
for (Timers t : Timers.values()) {
if (t == Timers.TOTAL)
continue;
-
+
totalTime += timer.get(t);
}
List<String> files = new ArrayList<String>();
@@ -277,7 +277,7 @@ public class BulkImporter {
files.add(path.getName());
}
Collections.sort(files);
-
+
log.debug("BULK IMPORT TIMING STATISTICS");
log.debug("Files: " + files);
log.debug(String.format("Examine map files : %,10.2f secs %6.2f%s", timer.getSecs(Timers.EXAMINE_MAP_FILES), 100.0 * timer.get(Timers.EXAMINE_MAP_FILES)
@@ -292,51 +292,51 @@ public class BulkImporter {
* (timer.get(Timers.TOTAL) - totalTime) / timer.get(Timers.TOTAL), "%"));
log.debug(String.format("Total : %,10.2f secs", timer.getSecs(Timers.TOTAL)));
}
-
+
private Set<Path> processFailures(Map<Path,List<KeyExtent>> completeFailures) {
// we should check if map file was not assigned to any tablets, then we
// should just move it; not currently being done?
-
+
Set<Entry<Path,List<KeyExtent>>> es = completeFailures.entrySet();
-
+
if (completeFailures.size() == 0)
return Collections.emptySet();
-
+
log.debug("The following map files failed ");
-
+
for (Entry<Path,List<KeyExtent>> entry : es) {
List<KeyExtent> extents = entry.getValue();
-
+
for (KeyExtent keyExtent : extents)
log.debug("\t" + entry.getKey() + " -> " + keyExtent);
}
-
+
return Collections.emptySet();
}
-
+
private class AssignmentInfo {
public AssignmentInfo(KeyExtent keyExtent, Long estSize) {
this.ke = keyExtent;
this.estSize = estSize;
}
-
+
KeyExtent ke;
long estSize;
}
-
+
private static List<KeyExtent> extentsOf(List<TabletLocation> locations) {
List<KeyExtent> result = new ArrayList<KeyExtent>(locations.size());
for (TabletLocation tl : locations)
result.add(tl.tablet_extent);
return result;
}
-
+
private Map<Path,List<AssignmentInfo>> estimateSizes(final AccumuloConfiguration acuConf, final Configuration conf, final VolumeManager vm,
Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads) {
-
+
long t1 = System.currentTimeMillis();
final Map<Path,Long> mapFileSizes = new TreeMap<Path,Long>();
-
+
try {
for (Path path : paths) {
FileSystem fs = vm.getVolumeByPath(path).getFileSystem();
@@ -346,33 +346,33 @@ public class BulkImporter {
log.error("Failed to get map files in for " + paths + ": " + e.getMessage(), e);
throw new RuntimeException(e);
}
-
+
final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new TreeMap<Path,List<AssignmentInfo>>());
-
+
ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("estimateSizes"));
-
+
for (final Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
if (entry.getValue().size() == 1) {
TabletLocation tabletLocation = entry.getValue().get(0);
-
+
// if the tablet completely contains the map file, there is no
// need to estimate its
// size
ais.put(entry.getKey(), Collections.singletonList(new AssignmentInfo(tabletLocation.tablet_extent, mapFileSizes.get(entry.getKey()))));
continue;
}
-
+
Runnable estimationTask = new Runnable() {
@Override
public void run() {
Map<KeyExtent,Long> estimatedSizes = null;
-
+
try {
estimatedSizes = FileUtil.estimateSizes(acuConf, entry.getKey(), mapFileSizes.get(entry.getKey()), extentsOf(entry.getValue()), conf, vm);
} catch (IOException e) {
log.warn("Failed to estimate map file sizes " + e.getMessage());
}
-
+
if (estimatedSizes == null) {
// estimation failed, do a simple estimation
estimatedSizes = new TreeMap<KeyExtent,Long>();
@@ -380,21 +380,21 @@ public class BulkImporter {
for (TabletLocation tl : entry.getValue())
estimatedSizes.put(tl.tablet_extent, estSize);
}
-
+
List<AssignmentInfo> assignmentInfoList = new ArrayList<AssignmentInfo>(estimatedSizes.size());
-
+
for (Entry<KeyExtent,Long> entry2 : estimatedSizes.entrySet())
assignmentInfoList.add(new AssignmentInfo(entry2.getKey(), entry2.getValue()));
-
+
ais.put(entry.getKey(), assignmentInfoList);
}
};
-
+
threadPool.submit(new TraceRunnable(new LoggingRunnable(log, estimationTask)));
}
-
+
threadPool.shutdown();
-
+
while (!threadPool.isTerminated()) {
try {
threadPool.awaitTermination(60, TimeUnit.SECONDS);
@@ -403,14 +403,14 @@ public class BulkImporter {
throw new RuntimeException(e);
}
}
-
+
long t2 = System.currentTimeMillis();
-
+
log.debug(String.format("Estimated map files sizes in %6.2f secs", (t2 - t1) / 1000.0));
-
+
return ais;
}
-
+
private static Map<KeyExtent,String> locationsOf(Map<Path,List<TabletLocation>> assignments) {
Map<KeyExtent,String> result = new HashMap<KeyExtent,String>();
for (List<TabletLocation> entry : assignments.values()) {
@@ -420,33 +420,33 @@ public class BulkImporter {
}
return result;
}
-
+
private Map<Path,List<KeyExtent>> assignMapFiles(ClientContext context, Configuration conf, VolumeManager fs, String tableId,
Map<Path,List<TabletLocation>> assignments, Collection<Path> paths, int numThreads, int numMapThreads) {
timer.start(Timers.EXAMINE_MAP_FILES);
Map<Path,List<AssignmentInfo>> assignInfo = estimateSizes(context.getConfiguration(), conf, fs, assignments, paths, numMapThreads);
timer.stop(Timers.EXAMINE_MAP_FILES);
-
+
Map<Path,List<KeyExtent>> ret;
-
+
timer.start(Timers.IMPORT_MAP_FILES);
ret = assignMapFiles(tableId, assignInfo, locationsOf(assignments), numThreads);
timer.stop(Timers.IMPORT_MAP_FILES);
-
+
return ret;
}
-
+
private class AssignmentTask implements Runnable {
final Map<Path,List<KeyExtent>> assignmentFailures;
HostAndPort location;
private Map<KeyExtent,List<PathSize>> assignmentsPerTablet;
-
+
public AssignmentTask(Map<Path,List<KeyExtent>> assignmentFailures, String tableName, String location, Map<KeyExtent,List<PathSize>> assignmentsPerTablet) {
this.assignmentFailures = assignmentFailures;
this.location = HostAndPort.fromString(location);
this.assignmentsPerTablet = assignmentsPerTablet;
}
-
+
private void handleFailures(Collection<KeyExtent> failures, String message) {
for (KeyExtent ke : failures) {
List<PathSize> mapFiles = assignmentsPerTablet.get(ke);
@@ -457,24 +457,24 @@ public class BulkImporter {
existingFailures = new ArrayList<KeyExtent>();
assignmentFailures.put(pathSize.path, existingFailures);
}
-
+
existingFailures.add(ke);
}
}
-
+
log.info("Could not assign " + mapFiles.size() + " map files to tablet " + ke + " because : " + message + ". Will retry ...");
}
}
-
+
@Override
public void run() {
HashSet<Path> uniqMapFiles = new HashSet<Path>();
for (List<PathSize> mapFiles : assignmentsPerTablet.values())
for (PathSize ps : mapFiles)
uniqMapFiles.add(ps.path);
-
+
log.debug("Assigning " + uniqMapFiles.size() + " map files to " + assignmentsPerTablet.size() + " tablets at " + location);
-
+
try {
List<KeyExtent> failures = assignMapFiles(context, location, assignmentsPerTablet);
handleFailures(failures, "Not Serving Tablet");
@@ -484,53 +484,53 @@ public class BulkImporter {
handleFailures(assignmentsPerTablet.keySet(), e.getMessage());
}
}
-
+
}
-
+
private class PathSize {
public PathSize(Path mapFile, long estSize) {
this.path = mapFile;
this.estSize = estSize;
}
-
+
Path path;
long estSize;
-
+
@Override
public String toString() {
return path + " " + estSize;
}
}
-
+
private Map<Path,List<KeyExtent>> assignMapFiles(String tableName, Map<Path,List<AssignmentInfo>> assignments, Map<KeyExtent,String> locations, int numThreads) {
-
+
// group assignments by tablet
Map<KeyExtent,List<PathSize>> assignmentsPerTablet = new TreeMap<KeyExtent,List<PathSize>>();
for (Entry<Path,List<AssignmentInfo>> entry : assignments.entrySet()) {
Path mapFile = entry.getKey();
List<AssignmentInfo> tabletsToAssignMapFileTo = entry.getValue();
-
+
for (AssignmentInfo ai : tabletsToAssignMapFileTo) {
List<PathSize> mapFiles = assignmentsPerTablet.get(ai.ke);
if (mapFiles == null) {
mapFiles = new ArrayList<PathSize>();
assignmentsPerTablet.put(ai.ke, mapFiles);
}
-
+
mapFiles.add(new PathSize(mapFile, ai.estSize));
}
}
-
+
// group assignments by tabletserver
-
+
Map<Path,List<KeyExtent>> assignmentFailures = Collections.synchronizedMap(new TreeMap<Path,List<KeyExtent>>());
-
+
TreeMap<String,Map<KeyExtent,List<PathSize>>> assignmentsPerTabletServer = new TreeMap<String,Map<KeyExtent,List<PathSize>>>();
-
+
for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
KeyExtent ke = entry.getKey();
String location = locations.get(ke);
-
+
if (location == null) {
for (PathSize pathSize : entry.getValue()) {
synchronized (assignmentFailures) {
@@ -539,34 +539,34 @@ public class BulkImporter {
failures = new ArrayList<KeyExtent>();
assignmentFailures.put(pathSize.path, failures);
}
-
+
failures.add(ke);
}
}
-
+
log.warn("Could not assign " + entry.getValue().size() + " map files to tablet " + ke + " because it had no location, will retry ...");
-
+
continue;
}
-
+
Map<KeyExtent,List<PathSize>> apt = assignmentsPerTabletServer.get(location);
if (apt == null) {
apt = new TreeMap<KeyExtent,List<PathSize>>();
assignmentsPerTabletServer.put(location, apt);
}
-
+
apt.put(entry.getKey(), entry.getValue());
}
-
+
ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("submit"));
-
+
for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer.entrySet()) {
String location = entry.getKey();
threadPool.submit(new AssignmentTask(assignmentFailures, tableName, location, entry.getValue()));
}
-
+
threadPool.shutdown();
-
+
while (!threadPool.isTerminated()) {
try {
threadPool.awaitTermination(60, TimeUnit.SECONDS);
@@ -575,7 +575,7 @@ public class BulkImporter {
throw new RuntimeException(e);
}
}
-
+
return assignmentFailures;
}
@@ -589,16 +589,16 @@ public class BulkImporter {
for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet()) {
HashMap<String,org.apache.accumulo.core.data.thrift.MapFileInfo> tabletFiles = new HashMap<String,org.apache.accumulo.core.data.thrift.MapFileInfo>();
files.put(entry.getKey(), tabletFiles);
-
+
for (PathSize pathSize : entry.getValue()) {
org.apache.accumulo.core.data.thrift.MapFileInfo mfi = new org.apache.accumulo.core.data.thrift.MapFileInfo(pathSize.estSize);
tabletFiles.put(pathSize.path.toString(), mfi);
}
}
-
+
log.debug("Asking " + location + " to bulk load " + files);
List<TKeyExtent> failures = client.bulkImport(Tracer.traceInfo(), context.rpcCreds(), tid, Translator.translate(files, Translators.KET), setTime);
-
+
return Translator.translate(failures, Translators.TKET);
} finally {
ThriftUtil.returnClient((TServiceClient) client);
@@ -610,11 +610,11 @@ public class BulkImporter {
throw new AccumuloException(t);
}
}
-
+
public static List<TabletLocation> findOverlappingTablets(ClientContext context, VolumeManager fs, TabletLocator locator, Path file) throws Exception {
return findOverlappingTablets(context, fs, locator, file, null, null);
}
-
+
public static List<TabletLocation> findOverlappingTablets(ClientContext context, VolumeManager fs, TabletLocator locator, Path file, KeyExtent failed)
throws Exception {
locator.invalidateCache(failed);
@@ -623,9 +623,9 @@ public class BulkImporter {
start = Range.followingPrefix(start);
return findOverlappingTablets(context, fs, locator, file, start, failed.getEndRow());
}
-
+
final static byte[] byte0 = {0};
-
+
public static List<TabletLocation> findOverlappingTablets(ClientContext context, VolumeManager vm, TabletLocator locator, Path file, Text startRow,
Text endRow) throws Exception {
List<TabletLocation> result = new ArrayList<TabletLocation>();
@@ -662,98 +662,98 @@ public class BulkImporter {
// log.debug(filename + " to be sent to " + result);
return result;
}
-
+
public static class AssignmentStats {
private Map<KeyExtent,Integer> counts;
private int numUniqueMapFiles;
private Map<Path,List<KeyExtent>> completeFailures = null;
private Set<Path> failedFailures = null;
-
+
AssignmentStats(int fileCount) {
counts = new HashMap<KeyExtent,Integer>();
numUniqueMapFiles = fileCount;
}
-
+
void attemptingAssignments(Map<Path,List<TabletLocation>> assignments) {
for (Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
for (TabletLocation tl : entry.getValue()) {
-
+
Integer count = getCount(tl.tablet_extent);
-
+
counts.put(tl.tablet_extent, count + 1);
}
}
}
-
+
void assignmentsFailed(Map<Path,List<KeyExtent>> assignmentFailures) {
for (Entry<Path,List<KeyExtent>> entry : assignmentFailures.entrySet()) {
for (KeyExtent ke : entry.getValue()) {
-
+
Integer count = getCount(ke);
-
+
counts.put(ke, count - 1);
}
}
}
-
+
void assignmentsAbandoned(Map<Path,List<KeyExtent>> completeFailures) {
this.completeFailures = completeFailures;
}
-
+
void tabletSplit(KeyExtent parent, Collection<KeyExtent> children) {
Integer count = getCount(parent);
-
+
counts.remove(parent);
-
+
for (KeyExtent keyExtent : children)
counts.put(keyExtent, count);
}
-
+
private Integer getCount(KeyExtent parent) {
Integer count = counts.get(parent);
-
+
if (count == null) {
count = 0;
}
return count;
}
-
+
void unrecoveredMapFiles(Set<Path> failedFailures) {
this.failedFailures = failedFailures;
}
-
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
int totalAssignments = 0;
int tabletsImportedTo = 0;
-
+
int min = Integer.MAX_VALUE, max = Integer.MIN_VALUE;
-
+
for (Entry<KeyExtent,Integer> entry : counts.entrySet()) {
totalAssignments += entry.getValue();
if (entry.getValue() > 0)
tabletsImportedTo++;
-
+
if (entry.getValue() < min)
min = entry.getValue();
-
+
if (entry.getValue() > max)
max = entry.getValue();
}
-
+
double stddev = 0;
-
+
for (Entry<KeyExtent,Integer> entry : counts.entrySet())
stddev += Math.pow(entry.getValue() - totalAssignments / (double) counts.size(), 2);
-
+
stddev = stddev / counts.size();
stddev = Math.sqrt(stddev);
-
+
Set<KeyExtent> failedTablets = new HashSet<KeyExtent>();
for (List<KeyExtent> ft : completeFailures.values())
failedTablets.addAll(ft);
-
+
sb.append("BULK IMPORT ASSIGNMENT STATISTICS\n");
sb.append(String.format("# of map files : %,10d%n", numUniqueMapFiles));
sb.append(String.format("# map files with failures : %,10d %6.2f%s%n", completeFailures.size(), completeFailures.size() * 100.0 / numUniqueMapFiles, "%"));
@@ -767,5 +767,5 @@ public class BulkImporter {
return sb.toString();
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java b/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
index 4ab9f90..3175fff 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
@@ -58,7 +58,7 @@ import com.google.common.base.Joiner;
/**
* An implementation of Instance that looks in HDFS and ZooKeeper to find the master and root tablet location.
- *
+ *
*/
public class HdfsZooInstance implements Instance {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
index b90051f..658d249 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
@@ -19,9 +19,9 @@ package org.apache.accumulo.server.conf;
import org.apache.accumulo.server.client.HdfsZooInstance;
public class ConfigSanityCheck {
-
+
public static void main(String[] args) {
new ServerConfigurationFactory(HdfsZooInstance.getInstance()).getConfiguration();
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java
index 65d9388..945e904 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java
@@ -48,9 +48,8 @@ class NamespaceConfWatcher implements Watcher {
}
static String toString(WatchedEvent event) {
- return new StringBuilder("{path=").append(event.getPath()).append(",state=")
- .append(event.getState()).append(",type=").append(event.getType())
- .append("}").toString();
+ return new StringBuilder("{path=").append(event.getPath()).append(",state=").append(event.getState()).append(",type=").append(event.getType()).append("}")
+ .toString();
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
index 342aebe..f2b2042 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
@@ -21,7 +21,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.KeyExtent;
public abstract class ServerConfiguration {
-
+
abstract public TableConfiguration getTableConfiguration(String tableId);
abstract public TableConfiguration getTableConfiguration(KeyExtent extent);
@@ -31,5 +31,5 @@ public abstract class ServerConfiguration {
abstract public AccumuloConfiguration getConfiguration();
abstract public Instance getInstance();
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
index 128f74e..2ec9ba1 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
@@ -154,14 +154,14 @@ public class ServerConfigurationFactory extends ServerConfiguration {
synchronized (tableConfigs) {
conf = tableConfigs.get(instanceID).get(tableId);
}
- // can't hold the lock during the construction and validation of the config,
+ // can't hold the lock during the construction and validation of the config,
// which may result in creating multiple objects for the same id, but that's ok.
if (conf == null && Tables.exists(instance, tableId)) {
- conf = new TableConfiguration(instance, tableId, getNamespaceConfigurationForTable(tableId));
- ConfigSanityCheck.validate(conf);
- synchronized (tableConfigs) {
- tableConfigs.get(instanceID).put(tableId, conf);
- }
+ conf = new TableConfiguration(instance, tableId, getNamespaceConfigurationForTable(tableId));
+ ConfigSanityCheck.validate(conf);
+ synchronized (tableConfigs) {
+ tableConfigs.get(instanceID).put(tableId, conf);
+ }
}
return conf;
}
@@ -177,7 +177,7 @@ public class ServerConfigurationFactory extends ServerConfiguration {
synchronized (tableParentConfigs) {
conf = tableParentConfigs.get(instanceID).get(tableId);
}
- // can't hold the lock during the construction and validation of the config,
+ // can't hold the lock during the construction and validation of the config,
// which may result in creating multiple objects for the same id, but that's ok.
if (conf == null) {
// changed - include instance in constructor call
@@ -194,7 +194,7 @@ public class ServerConfigurationFactory extends ServerConfiguration {
public NamespaceConfiguration getNamespaceConfiguration(String namespaceId) {
checkPermissions();
NamespaceConfiguration conf;
- // can't hold the lock during the construction and validation of the config,
+ // can't hold the lock during the construction and validation of the config,
// which may result in creating multiple objects for the same id, but that's ok.
synchronized (namespaceConfigs) {
conf = namespaceConfigs.get(instanceID).get(namespaceId);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
index b657056..3c8d45d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
@@ -29,7 +29,7 @@ class TableConfWatcher implements Watcher {
Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN);
Logger.getLogger("org.apache.hadoop.io.compress").setLevel(Level.WARN);
}
-
+
private static final Logger log = Logger.getLogger(TableConfWatcher.class);
private final Instance instance;
private final String tablesPrefix;
@@ -46,9 +46,8 @@ class TableConfWatcher implements Watcher {
}
static String toString(WatchedEvent event) {
- return new StringBuilder("{path=").append(event.getPath()).append(",state=")
- .append(event.getState()).append(",type=").append(event.getType())
- .append("}").toString();
+ return new StringBuilder("{path=").append(event.getPath()).append(",state=").append(event.getState()).append(",type=").append(event.getType()).append("}")
+ .toString();
}
@Override
@@ -56,10 +55,10 @@ class TableConfWatcher implements Watcher {
String path = event.getPath();
if (log.isTraceEnabled())
log.trace("WatchedEvent : " + toString(event));
-
+
String tableId = null;
String key = null;
-
+
if (path != null) {
if (path.startsWith(tablesPrefix)) {
tableId = path.substring(tablesPrefix.length());
@@ -69,13 +68,13 @@ class TableConfWatcher implements Watcher {
key = path.substring((tablesPrefix + tableId + Constants.ZTABLE_CONF + "/").length());
}
}
-
+
if (tableId == null) {
log.warn("Zookeeper told me about a path I was not watching: " + path + ", event " + toString(event));
return;
}
}
-
+
switch (event.getType()) {
case NodeDataChanged:
if (log.isTraceEnabled())
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/conf/TableParentConfiguration.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableParentConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableParentConfiguration.java
index 2a2bfce..bd2e5ab 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableParentConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableParentConfiguration.java
@@ -32,6 +32,7 @@ public class TableParentConfiguration extends NamespaceConfiguration {
this.tableId = tableId;
this.namespaceId = getNamespaceId();
}
+
public TableParentConfiguration(String tableId, Instance inst, AccumuloConfiguration parent) {
super(null, inst, parent);
this.tableId = tableId;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/data/ServerColumnUpdate.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/data/ServerColumnUpdate.java b/server/base/src/main/java/org/apache/accumulo/server/data/ServerColumnUpdate.java
index af992a6..f891065 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/data/ServerColumnUpdate.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/data/ServerColumnUpdate.java
@@ -19,7 +19,7 @@ package org.apache.accumulo.server.data;
import org.apache.accumulo.core.data.ColumnUpdate;
public class ServerColumnUpdate extends ColumnUpdate {
-
+
ServerMutation parent;
public ServerColumnUpdate(byte[] cf, byte[] cq, byte[] cv, boolean hasts, long ts, boolean deleted, byte[] val, ServerMutation serverMutation) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java b/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
index 389cc33..cb4fa97 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.io.WritableUtils;
*/
public class ServerMutation extends Mutation {
private long systemTime = 0l;
-
+
public ServerMutation(TMutation tmutation) {
super(tmutation);
}
@@ -40,8 +40,7 @@ public class ServerMutation extends Mutation {
super(key);
}
- public ServerMutation() {
- }
+ public ServerMutation() {}
protected void droppingOldTimestamp(long ts) {
this.systemTime = ts;
@@ -54,7 +53,7 @@ public class ServerMutation extends Mutation {
if (getSerializedFormat() == SERIALIZED_FORMAT.VERSION2)
systemTime = WritableUtils.readVLong(in);
}
-
+
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
@@ -64,7 +63,7 @@ public class ServerMutation extends Mutation {
public void setSystemTimestamp(long v) {
this.systemTime = v;
}
-
+
public long getSystemTimestamp() {
return this.systemTime;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java
index c0bb275..eb42a11 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java
@@ -21,40 +21,37 @@ import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-
/**
- * This is a glue object, to convert short file references to long references.
- * The metadata may contain old relative file references. This class keeps
- * track of the short file reference, so it can be removed properly from the
- * metadata tables.
+ * This is a glue object, to convert short file references to long references. The metadata may contain old relative file references. This class keeps track of
+ * the short file reference, so it can be removed properly from the metadata tables.
*/
public class FileRef implements Comparable<FileRef> {
private String metaReference; // something like ../2/d-00000/A00001.rf
private Path fullReference; // something like hdfs://nn:9001/accumulo/tables/2/d-00000/A00001.rf
private Path suffix;
-
+
public FileRef(VolumeManager fs, Key key) {
this(key.getColumnQualifier().toString(), fs.getFullPath(key));
}
-
+
public FileRef(String metaReference, Path fullReference) {
this.metaReference = metaReference;
this.fullReference = fullReference;
this.suffix = extractSuffix(fullReference);
}
-
+
public FileRef(String path) {
this(path, new Path(path));
}
-
+
public String toString() {
return fullReference.toString();
}
-
+
public Path path() {
return fullReference;
}
-
+
public Text meta() {
return new Text(metaReference);
}
@@ -89,10 +86,9 @@ public class FileRef implements Comparable<FileRef> {
@Override
public boolean equals(Object obj) {
if (obj instanceof FileRef) {
- return compareTo((FileRef)obj) == 0;
+ return compareTo((FileRef) obj) == 0;
}
return false;
}
-
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/fs/PerTableVolumeChooser.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/PerTableVolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/PerTableVolumeChooser.java
index a579cc8..e51df03 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/PerTableVolumeChooser.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/PerTableVolumeChooser.java
@@ -24,15 +24,15 @@ import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.conf.TableConfiguration;
/**
- * A {@link VolumeChooser} that delegates to another volume chooser based on the presence of an experimental table
- * property, {@link Property#TABLE_VOLUME_CHOOSER}. If it isn't found, defaults back to {@link RandomVolumeChooser}.
+ * A {@link VolumeChooser} that delegates to another volume chooser based on the presence of an experimental table property,
+ * {@link Property#TABLE_VOLUME_CHOOSER}. If it isn't found, defaults back to {@link RandomVolumeChooser}.
*/
public class PerTableVolumeChooser implements VolumeChooser {
private final VolumeChooser fallbackVolumeChooser = new RandomVolumeChooser();
// TODO Add hint of expected size to construction, see ACCUMULO-3410
/* Track VolumeChooser instances so they can keep state. */
- private final ConcurrentHashMap<String, VolumeChooser> tableSpecificChooser = new ConcurrentHashMap<String, VolumeChooser>();
+ private final ConcurrentHashMap<String,VolumeChooser> tableSpecificChooser = new ConcurrentHashMap<String,VolumeChooser>();
// TODO has to be lazily initialized currently because of the reliance on HdfsZooInstance. see ACCUMULO-3411
private volatile ServerConfigurationFactory serverConfs;
@@ -60,7 +60,7 @@ public class PerTableVolumeChooser implements VolumeChooser {
// the configuration for this table's chooser has been updated. In the case of failure to instantiate we'll repeat here next call.
// TODO stricter definition of when the updated property is used, ref ACCUMULO-3412
VolumeChooser temp = Property.createTableInstanceFromPropertyName(tableConf, Property.TABLE_VOLUME_CHOOSER, VolumeChooser.class, fallbackVolumeChooser);
- VolumeChooser last = tableSpecificChooser.replace(env.getTableId(), temp);
+ VolumeChooser last = tableSpecificChooser.replace(env.getTableId(), temp);
if (chooser.equals(last)) {
chooser = temp;
} else {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/fs/PreferredVolumeChooser.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/PreferredVolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/PreferredVolumeChooser.java
index 4ddf9bb..68621fb 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/PreferredVolumeChooser.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/PreferredVolumeChooser.java
@@ -37,10 +37,9 @@ import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
/**
- * A {@link RandomVolumeChooser} that limits its choices from a given set of options to the subset of those options preferred for a
- * particular table. Defaults to selecting from all of the options presented. Can be customized via the table property
- * {@value #PREFERRED_VOLUMES_CUSTOM_KEY}, which should contain a comma separated list of {@link Volume} URIs. Note that both the property
- * name and the format of its value are specific to this particular implementation.
+ * A {@link RandomVolumeChooser} that limits its choices from a given set of options to the subset of those options preferred for a particular table. Defaults
+ * to selecting from all of the options presented. Can be customized via the table property {@value #PREFERRED_VOLUMES_CUSTOM_KEY}, which should contain a comma
+ * separated list of {@link Volume} URIs. Note that both the property name and the format of its value are specific to this particular implementation.
*/
public class PreferredVolumeChooser extends RandomVolumeChooser implements VolumeChooser {
private static final Logger log = Logger.getLogger(PreferredVolumeChooser.class);
@@ -55,7 +54,7 @@ public class PreferredVolumeChooser extends RandomVolumeChooser implements Volum
};
@SuppressWarnings("unchecked")
- private final Map<String, Set<String>> parsedPreferredVolumes = Collections.synchronizedMap(new LRUMap(1000));
+ private final Map<String,Set<String>> parsedPreferredVolumes = Collections.synchronizedMap(new LRUMap(1000));
// TODO has to be lazily initialized currently because of the reliance on HdfsZooInstance. see ACCUMULO-3411
private volatile ServerConfigurationFactory serverConfs;
@@ -73,7 +72,7 @@ public class PreferredVolumeChooser extends RandomVolumeChooser implements Volum
serverConfs = localConf;
}
TableConfiguration tableConf = localConf.getTableConfiguration(env.getTableId());
- final Map<String,String> props = new HashMap<String, String>();
+ final Map<String,String> props = new HashMap<String,String>();
tableConf.getProperties(props, PREFERRED_VOLUMES_FILTER);
if (props.isEmpty()) {
log.warn("No preferred volumes specified. Defaulting to randomly choosing from instance volumes");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java b/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java
index 34912f3..73535d9 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/ViewFSUtils.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
- *
+ *
*/
public class ViewFSUtils {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java
index 9865512..8b70721 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java
@@ -19,8 +19,8 @@ package org.apache.accumulo.server.fs;
import org.apache.accumulo.core.volume.Volume;
/**
- * Helper used by {@link VolumeManager}s to select from a set of {@link Volume} URIs.
- * N.B. implemenations must be threadsafe. VolumeChooser.equals will be used for internal caching.
+ * Helper used by {@link VolumeManager}s to select from a set of {@link Volume} URIs. N.B. implemenations must be threadsafe. VolumeChooser.equals will be used
+ * for internal caching.
*/
public interface VolumeChooser {
String choose(VolumeChooserEnvironment env, String[] options);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
index e2353d4..e761e4f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
@@ -32,8 +32,7 @@ import com.google.common.base.Optional;
/**
* A wrapper around multiple hadoop FileSystem objects, which are assumed to be different volumes. This also concentrates a bunch of meta-operations like
- * waiting for SAFE_MODE, and closing WALs.
- * N.B. implementations must be thread safe.
+ * waiting for SAFE_MODE, and closing WALs. N.B. implementations must be thread safe.
*/
public interface VolumeManager {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index 8202d27..4423495 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -582,8 +582,8 @@ public class VolumeManagerImpl implements VolumeManager {
final VolumeChooserEnvironment env = new VolumeChooserEnvironment(tableId);
final String choice = chooser.choose(env, options);
if (!(ArrayUtils.contains(options, choice))) {
- log.error("The configured volume chooser, '" + chooser.getClass() + "', or one of its delegates returned a volume not in the set of options provided; " +
- "will continue by relying on a RandomVolumeChooser. You should investigate and correct the named chooser.");
+ log.error("The configured volume chooser, '" + chooser.getClass() + "', or one of its delegates returned a volume not in the set of options provided; "
+ + "will continue by relying on a RandomVolumeChooser. You should investigate and correct the named chooser.");
return failsafeChooser.choose(env, options);
}
return choice;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index e229209..d40106d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@ -265,8 +265,8 @@ public class VolumeUtil {
throw new IllegalArgumentException("Unexpected table dir " + dir);
}
- Path newDir = new Path(vm.choose(Optional.of(extent.getTableId().toString()), ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.TABLE_DIR +
- Path.SEPARATOR + dir.getParent().getName() + Path.SEPARATOR + dir.getName());
+ Path newDir = new Path(vm.choose(Optional.of(extent.getTableId().toString()), ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.TABLE_DIR
+ + Path.SEPARATOR + dir.getParent().getName() + Path.SEPARATOR + dir.getName());
log.info("Updating directory for " + extent + " from " + dir + " to " + newDir);
if (extent.isRootTablet()) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 046cfb5..e14ef72 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -89,11 +89,11 @@ import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
import org.apache.accumulo.server.replication.StatusCombiner;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.server.tables.TableManager;
import org.apache.accumulo.server.tablets.TabletTime;
import org.apache.accumulo.server.util.ReplicationTableUtil;
import org.apache.accumulo.server.util.TablePropUtil;
-import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java b/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
index 536c617..c689fd3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
@@ -34,23 +34,23 @@ import org.apache.log4j.Logger;
/**
* A special iterator for the metadata table that removes inactive bulk load flags
- *
+ *
*/
public class MetadataBulkLoadFilter extends Filter {
private static final Logger log = Logger.getLogger(MetadataBulkLoadFilter.class);
-
+
enum Status {
ACTIVE, INACTIVE
}
-
+
Map<Long,Status> bulkTxStatusCache;
Arbitrator arbitrator;
-
+
@Override
public boolean accept(Key k, Value v) {
if (!k.isDeleted() && k.compareColumnFamily(TabletsSection.BulkFileColumnFamily.NAME) == 0) {
long txid = Long.valueOf(v.toString());
-
+
Status status = bulkTxStatusCache.get(txid);
if (status == null) {
try {
@@ -63,28 +63,28 @@ public class MetadataBulkLoadFilter extends Filter {
status = Status.ACTIVE;
log.error(e, e);
}
-
+
bulkTxStatusCache.put(txid, status);
}
-
+
return status == Status.ACTIVE;
}
-
+
return true;
}
-
+
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
super.init(source, options, env);
-
+
if (env.getIteratorScope() == IteratorScope.scan) {
throw new IOException("This iterator not intended for use at scan time");
}
-
+
bulkTxStatusCache = new HashMap<Long,MetadataBulkLoadFilter.Status>();
arbitrator = getArbitrator();
}
-
+
protected Arbitrator getArbitrator() {
return new ZooArbitrator();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java b/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java
index 49d744e..c0580ac 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/log/SortedLogState.java
@@ -19,8 +19,8 @@ package org.apache.accumulo.server.log;
import org.apache.hadoop.fs.Path;
/**
- * A file is written in the destination directory for the sorting of write-ahead logs that need recovering. The value of {@link #getMarker()} is the name of the file
- * that will exist in the sorted output directory.
+ * A file is written in the destination directory for the sorting of write-ahead logs that need recovering. The value of {@link #getMarker()} is the name of the
+ * file that will exist in the sorted output directory.
*/
public enum SortedLogState {
FINISHED("finished"), FAILED("failed");
@@ -54,7 +54,7 @@ public enum SortedLogState {
public static Path getFailedMarkerPath(String rootPath) {
return new Path(rootPath, FAILED.getMarker());
}
-
+
public static Path getFailedMarkerPath(Path rootPath) {
return new Path(rootPath, FAILED.getMarker());
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
index 1820d8e..7d11066 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
@@ -45,9 +45,9 @@ import org.apache.thrift.TException;
*/
public class ChaoticLoadBalancer extends TabletBalancer {
private static final Logger log = Logger.getLogger(ChaoticLoadBalancer.class);
-
+
Random r = new Random();
-
+
@Override
public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
Map<KeyExtent,TServerInstance> assignments) {
@@ -65,7 +65,7 @@ public class ChaoticLoadBalancer extends TabletBalancer {
toAssign.put(e.getKey(), avg - numTablets);
}
}
-
+
for (KeyExtent ke : unassigned.keySet()) {
int index = r.nextInt(tServerArray.size());
TServerInstance dest = tServerArray.get(index);
@@ -79,7 +79,7 @@ public class ChaoticLoadBalancer extends TabletBalancer {
}
}
}
-
+
protected final OutstandingMigrations outstandingMigrations = new OutstandingMigrations(log);
/**
@@ -111,7 +111,7 @@ public class ChaoticLoadBalancer extends TabletBalancer {
// totalTablets is fuzzy due to asynchronicity of the stats
// *1.2 to handle fuzziness, and prevent locking for 'perfect' balancing scenarios
long avg = (long) Math.ceil(((double) totalTablets) / current.size() * 1.2);
-
+
for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
for (String table : e.getValue().getTableMap().keySet()) {
if (!moveMetadata && MetadataTable.NAME.equals(table))
@@ -128,7 +128,7 @@ public class ChaoticLoadBalancer extends TabletBalancer {
underCapacityTServer.remove(index);
if (numTablets.put(e.getKey(), numTablets.get(e.getKey()) - 1) <= avg && !underCapacityTServer.contains(e.getKey()))
underCapacityTServer.add(e.getKey());
-
+
// We can get some craziness with only 1 tserver, so lets make sure there's always an option!
if (underCapacityTServer.isEmpty())
underCapacityTServer.addAll(numTablets.keySet());
@@ -142,17 +142,16 @@ public class ChaoticLoadBalancer extends TabletBalancer {
}
}
}
-
+
return 100;
}
-
+
@Override
public void init(ServerConfiguration conf) {
throw new NotImplementedException();
}
@Override
- public void init(ServerConfigurationFactory conf) {
- }
-
+ public void init(ServerConfigurationFactory conf) {}
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
index 9822d0f..ecf59b3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
@@ -43,11 +43,11 @@ import org.apache.thrift.transport.TTransportException;
import com.google.common.collect.Iterables;
public abstract class TabletBalancer {
-
+
private static final Logger log = Logger.getLogger(TabletBalancer.class);
-
+
protected ServerConfigurationFactory configuration;
-
+
protected AccumuloServerContext context;
/**
@@ -64,7 +64,7 @@ public abstract class TabletBalancer {
/**
* Assign tablets to tablet servers. This method is called whenever the master finds tablets that are unassigned.
- *
+ *
* @param current
* The current table-summary state of all the online tablet servers. Read-only. The TabletServerStatus for each server may be null if the tablet
* server has not yet responded to a recent request for status.
@@ -75,14 +75,14 @@ public abstract class TabletBalancer {
*/
abstract public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
Map<KeyExtent,TServerInstance> assignments);
-
+
/**
* Ask the balancer if any migrations are necessary.
- *
+ *
* If the balancer is going to self-abort due to some environmental constraint (e.g. it requires some minimum number of tservers, or a maximum number of
* outstanding migrations), it should issue a log message to alert operators. The message should be at WARN normally and at ERROR if the balancer knows that
* the problem can not self correct. It should not issue these messages more than once a minute.
- *
+ *
* @param current
* The current table-summary state of all the online tablet servers. Read-only.
* @param migrations
@@ -90,7 +90,7 @@ public abstract class TabletBalancer {
* @param migrationsOut
* new migrations to perform; should not contain tablets in the current set of migrations. Write-only.
* @return the time, in milliseconds, to wait before re-balancing.
- *
+ *
* This method will not be called when there are unassigned tablets.
*/
public abstract long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut);
@@ -102,25 +102,24 @@ public abstract class TabletBalancer {
protected static final long TIME_BETWEEN_BALANCER_WARNINGS = 60 * ONE_SECOND;
/**
- * A deferred call descendent TabletBalancers use to log why they can't continue.
- * The call is deferred so that TabletBalancer can limit how often messages happen.
+ * A deferred call descendent TabletBalancers use to log why they can't continue. The call is deferred so that TabletBalancer can limit how often messages
+ * happen.
*
* Implementations should be reused as much as possible.
*
- * Be sure to pass in a properly scoped Logger instance so that messages indicate
- * what part of the system is having trouble.
+ * Be sure to pass in a properly scoped Logger instance so that messages indicate what part of the system is having trouble.
*/
protected static abstract class BalancerProblem implements Runnable {
protected final Logger balancerLog;
+
public BalancerProblem(Logger logger) {
balancerLog = logger;
}
}
/**
- * If a TabletBalancer requires active tservers, it should use this problem to indicate when there are none.
- * NoTservers is safe to share with anyone who uses the same Logger. TabletBalancers should have a single
- * static instance.
+ * If a TabletBalancer requires active tservers, it should use this problem to indicate when there are none. NoTservers is safe to share with anyone who uses
+ * the same Logger. TabletBalancers should have a single static instance.
*/
protected static class NoTservers extends BalancerProblem {
public NoTservers(Logger logger) {
@@ -134,14 +133,12 @@ public abstract class TabletBalancer {
}
/**
- * If a TabletBalancer only balances when there are no outstanding migrations, it should use this problem
- * to indicate when they exist.
+ * If a TabletBalancer only balances when there are no outstanding migrations, it should use this problem to indicate when they exist.
*
- * Iff a TabletBalancer makes use of the migrations member to provide samples, then OutstandingMigrations
- * is not thread safe.
+ * Iff a TabletBalancer makes use of the migrations member to provide samples, then OutstandingMigrations is not thread safe.
*/
protected static class OutstandingMigrations extends BalancerProblem {
- public Set<KeyExtent> migrations = Collections.<KeyExtent>emptySet();
+ public Set<KeyExtent> migrations = Collections.<KeyExtent> emptySet();
public OutstandingMigrations(Logger logger) {
super(logger);
@@ -156,8 +153,8 @@ public abstract class TabletBalancer {
}
/**
- * Warn that a Balancer can't work because of some external restriction.
- * Will not call the provided logging handler more often than TIME_BETWEEN_BALANCER_WARNINGS
+ * Warn that a Balancer can't work because of some external restriction. Will not call the provided logging handler more often than
+ * TIME_BETWEEN_BALANCER_WARNINGS
*/
protected void constraintNotMet(BalancerProblem cause) {
if (!stuck) {
@@ -177,11 +174,11 @@ public abstract class TabletBalancer {
protected void resetBalancerErrors() {
stuck = false;
}
-
+
/**
* Fetch the tablets for the given table by asking the tablet server. Useful if your balance strategy needs details at the tablet level to decide what tablets
* to move.
- *
+ *
* @param tserver
* The tablet server to ask.
* @param tableId
@@ -204,14 +201,14 @@ public abstract class TabletBalancer {
}
return null;
}
-
+
/**
* Utility to ensure that the migrations from balance() are consistent:
* <ul>
* <li>Tablet objects are not null
* <li>Source and destination tablet servers are not null and current
* </ul>
- *
+ *
* @return A list of TabletMigration object that passed sanity checks.
*/
public static List<TabletMigration> checkMigrationSanity(Set<TServerInstance> current, List<TabletMigration> migrations) {
@@ -241,5 +238,5 @@ public abstract class TabletBalancer {
}
return result;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
index 4a6638a..cc9ac3e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
@@ -23,7 +23,7 @@ import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.hadoop.fs.Path;
/**
- *
+ *
*/
public class RecoveryPath {
@@ -39,21 +39,21 @@ public class RecoveryPath {
// drop server
walPath = walPath.getParent();
}
-
+
if (!walPath.getName().equals(FileType.WAL.getDirectory()))
throw new IllegalArgumentException("Bad path " + walPath);
-
+
// drop wal
walPath = walPath.getParent();
-
+
walPath = new Path(walPath, FileType.RECOVERY.getDirectory());
walPath = new Path(walPath, uuid);
return walPath;
}
-
+
throw new IllegalArgumentException("Bad path " + walPath);
-
+
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/master/state/Assignment.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/Assignment.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/Assignment.java
index 40b7a93..f1a9b3f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/Assignment.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/Assignment.java
@@ -21,7 +21,7 @@ import org.apache.accumulo.core.data.KeyExtent;
public class Assignment {
public KeyExtent tablet;
public TServerInstance server;
-
+
public Assignment(KeyExtent tablet, TServerInstance server) {
this.tablet = tablet;
this.server = server;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/master/state/ClosableIterator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ClosableIterator.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ClosableIterator.java
index 18af9ed..3e77d93 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ClosableIterator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ClosableIterator.java
@@ -19,5 +19,4 @@ package org.apache.accumulo.server.master.state;
import java.io.Closeable;
import java.util.Iterator;
-public interface ClosableIterator<T> extends Iterator<T>, Closeable {
-}
+public interface ClosableIterator<T> extends Iterator<T>, Closeable {}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
index f4d98bf..501d66a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
@@ -20,11 +20,11 @@ import java.util.Collection;
import java.util.Set;
public interface CurrentState {
-
+
Set<String> onlineTables();
-
+
Set<TServerInstance> onlineTabletServers();
-
+
Collection<MergeInfo> merges();
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
index 3f7f167..9a2441b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java
@@ -27,13 +27,13 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.log4j.Logger;
-import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
public class DeadServerList {
private static final Logger log = Logger.getLogger(DeadServerList.class);
private final String path;
-
+
public DeadServerList(String path) {
this.path = path;
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
@@ -43,7 +43,7 @@ public class DeadServerList {
log.error("Unable to make parent directories of " + path, ex);
}
}
-
+
public List<DeadServer> getList() {
List<DeadServer> result = new ArrayList<DeadServer>();
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
@@ -70,7 +70,7 @@ public class DeadServerList {
}
return result;
}
-
+
public void delete(String server) {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
try {
@@ -79,7 +79,7 @@ public class DeadServerList {
log.error(ex, ex);
}
}
-
+
public void post(String server, String cause) {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
try {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java
index 10a1311..3276945 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStore.java
@@ -22,13 +22,13 @@ import java.util.List;
* An abstract version of ZooKeeper that we can write tests against.
*/
public interface DistributedStore {
-
+
List<String> getChildren(String path) throws DistributedStoreException;
-
+
byte[] get(String path) throws DistributedStoreException;
-
+
void put(String path, byte[] bs) throws DistributedStoreException;
-
+
void remove(String path) throws DistributedStoreException;
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
index 3d3a725..3290075 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java
@@ -17,17 +17,17 @@
package org.apache.accumulo.server.master.state;
public class DistributedStoreException extends Exception {
-
+
private static final long serialVersionUID = 1L;
-
+
public DistributedStoreException(String why) {
super(why);
}
-
+
public DistributedStoreException(Exception cause) {
super(cause);
}
-
+
public DistributedStoreException(String why, Exception cause) {
super(why, cause);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java
index 708b1b7..388da05 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java
@@ -25,21 +25,21 @@ import org.apache.hadoop.io.Writable;
/**
* Information about the current merge/rangeDelete.
- *
+ *
* Writable to serialize for zookeeper and the Tablet
*/
public class MergeInfo implements Writable {
-
+
public enum Operation {
MERGE, DELETE,
}
-
+
MergeState state = MergeState.NONE;
KeyExtent extent;
Operation operation = Operation.MERGE;
-
+
public MergeInfo() {}
-
+
@Override
public void readFields(DataInput in) throws IOException {
extent = new KeyExtent();
@@ -47,39 +47,39 @@ public class MergeInfo implements Writable {
state = MergeState.values()[in.readInt()];
operation = Operation.values()[in.readInt()];
}
-
+
@Override
public void write(DataOutput out) throws IOException {
extent.write(out);
out.writeInt(state.ordinal());
out.writeInt(operation.ordinal());
}
-
+
public MergeInfo(KeyExtent range, Operation op) {
this.extent = range;
this.operation = op;
}
-
+
public MergeState getState() {
return state;
}
-
+
public KeyExtent getExtent() {
return extent;
}
-
+
public Operation getOperation() {
return operation;
}
-
+
public void setState(MergeState state) {
this.state = state;
}
-
+
public boolean isDelete() {
return this.operation.equals(Operation.DELETE);
}
-
+
public boolean needsToBeChopped(KeyExtent otherExtent) {
// During a delete, the block after the merge will be stretched to cover the deleted area.
// Therefore, it needs to be chopped
@@ -90,14 +90,14 @@ public class MergeInfo implements Writable {
else
return this.extent.overlaps(otherExtent);
}
-
+
public boolean overlaps(KeyExtent otherExtent) {
boolean result = this.extent.overlaps(otherExtent);
if (!result && needsToBeChopped(otherExtent))
return true;
return result;
}
-
+
@Override
public String toString() {
if (!state.equals(MergeState.NONE))
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeState.java
index 29b6ae3..47bfd95 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MergeState.java
@@ -45,5 +45,5 @@ public enum MergeState {
* merge is complete, the resulting tablet can be brought online, remove the marker in zookeeper
*/
COMPLETE;
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index d57a3ef..bf56a7a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -31,38 +31,38 @@ import org.apache.accumulo.server.AccumuloServerContext;
public class MetaDataStateStore extends TabletStateStore {
// private static final Logger log = Logger.getLogger(MetaDataStateStore.class);
-
+
private static final int THREADS = 4;
private static final int LATENCY = 1000;
private static final int MAX_MEMORY = 200 * 1024 * 1024;
-
+
final protected ClientContext context;
final protected CurrentState state;
final private String targetTableName;
-
+
protected MetaDataStateStore(ClientContext context, CurrentState state, String targetTableName) {
this.context = context;
this.state = state;
this.targetTableName = targetTableName;
}
-
+
public MetaDataStateStore(ClientContext context, CurrentState state) {
this(context, state, MetadataTable.NAME);
}
-
+
protected MetaDataStateStore(AccumuloServerContext context, String tableName) {
this(context, null, tableName);
}
-
+
public MetaDataStateStore(AccumuloServerContext context) {
this(context, MetadataTable.NAME);
}
-
+
@Override
public ClosableIterator<TabletLocationState> iterator() {
return new MetaDataTableScanner(context, MetadataSchema.TabletsSection.getRange(), state);
}
-
+
@Override
public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException {
BatchWriter writer = createBatchWriter();
@@ -83,7 +83,7 @@ public class MetaDataStateStore extends TabletStateStore {
}
}
}
-
+
BatchWriter createBatchWriter() {
try {
return context.getConnector().createBatchWriter(targetTableName,
@@ -95,7 +95,7 @@ public class MetaDataStateStore extends TabletStateStore {
throw new RuntimeException(e);
}
}
-
+
@Override
public void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException {
BatchWriter writer = createBatchWriter();
@@ -115,10 +115,10 @@ public class MetaDataStateStore extends TabletStateStore {
}
}
}
-
+
@Override
public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
-
+
BatchWriter writer = createBatchWriter();
try {
for (TabletLocationState tls : tablets) {
@@ -141,7 +141,7 @@ public class MetaDataStateStore extends TabletStateStore {
}
}
}
-
+
@Override
public String name() {
return "Normal Tablets";