You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2013/06/24 23:34:25 UTC
svn commit: r1496226 [6/13] - in /accumulo/branches/ACCUMULO-CURATOR: ./
assemble/ conf/examples/1GB/native-standalone/
conf/examples/1GB/standalone/ conf/examples/2GB/native-standalone/
conf/examples/2GB/standalone/ conf/examples/3GB/native-standalone...
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java Mon Jun 24 21:34:20 2013
@@ -34,11 +34,10 @@ import org.apache.accumulo.core.conf.Pro
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
@@ -65,42 +64,41 @@ public class RecoveryManager {
log.warn(e, e);
}
}
-
+
private class LogSortTask implements Runnable {
- private String filename;
- private String host;
+ private String source;
+ private String destination;
+ private String sortId;
private LogCloser closer;
- public LogSortTask(LogCloser closer, String host, String filename) {
+ public LogSortTask(LogCloser closer, String source, String destination, String sortId) {
this.closer = closer;
- this.host = host;
- this.filename = filename;
+ this.source = source;
+ this.destination = destination;
+ this.sortId = sortId;
}
-
+
@Override
public void run() {
boolean rescheduled = false;
try {
- FileSystem localFs = master.getFileSystem();
- if (localFs instanceof TraceFileSystem)
- localFs = ((TraceFileSystem) localFs).getImplementation();
-
- long time = closer.close(master, localFs, getSource(host, filename));
-
+
+ long time = closer.close(master, master.getFileSystem(), new Path(source));
+
if (time > 0) {
executor.schedule(this, time, TimeUnit.MILLISECONDS);
rescheduled = true;
} else {
- initiateSort(host, filename);
+ initiateSort(sortId, source, destination);
}
} catch (FileNotFoundException e) {
- log.debug("Unable to initate log sort for " + filename + ": " + e);
+ log.debug("Unable to initate log sort for " + source + ": " + e);
} catch (Exception e) {
- log.warn("Failed to initiate log sort " + filename, e);
+ log.warn("Failed to initiate log sort " + source, e);
} finally {
if (!rescheduled) {
synchronized (RecoveryManager.this) {
- closeTasksQueued.remove(filename);
+ closeTasksQueued.remove(sortId);
}
}
}
@@ -108,73 +106,68 @@ public class RecoveryManager {
}
- private void initiateSort(String host, final String file) throws KeeperException, InterruptedException {
- String source = getSource(host, file).toString();
- new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(file, source.getBytes());
+ private void initiateSort(String sortId, String source, final String destination) throws KeeperException, InterruptedException, IOException {
+ String work = source + "|" + destination;
+ new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(sortId, work.getBytes());
synchronized (this) {
- sortsQueued.add(file);
+ sortsQueued.add(sortId);
}
-
- final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + file;
- log.info("Created zookeeper entry " + path + " with data " + source);
+
+ final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId;
+ log.info("Created zookeeper entry " + path + " with data " + work);
}
- private Path getSource(String server, String file) {
- String source = Constants.getWalDirectory(master.getSystemConfiguration()) + "/" + server + "/" + file;
- if (server.contains(":")) {
- // old-style logger log, copied from local file systems by tservers, unsorted into the wal base dir
- source = Constants.getWalDirectory(master.getSystemConfiguration()) + "/" + file;
- }
- return new Path(source);
- }
-
public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
boolean recoveryNeeded = false;
+ ;
for (Collection<String> logs : walogs) {
for (String walog : logs) {
- String parts[] = walog.split("/");
- String host = parts[0];
- String filename = parts[1];
+ String hostFilename[] = walog.split("/", 2);
+ String host = hostFilename[0];
+ String filename = hostFilename[1];
+ String parts[] = filename.split("/");
+ String sortId = parts[parts.length - 1];
+ String dest = master.getFileSystem().choose(ServerConstants.getRecoveryDirs()) + "/" + sortId;
+ log.debug("Recovering " + filename + " to " + dest);
boolean sortQueued;
synchronized (this) {
- sortQueued = sortsQueued.contains(filename);
+ sortQueued = sortsQueued.contains(sortId);
}
- if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + filename) == null) {
+ if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId) == null) {
synchronized (this) {
- sortsQueued.remove(filename);
+ sortsQueued.remove(sortId);
}
}
- if (master.getFileSystem().exists(new Path(Constants.getRecoveryDir(master.getSystemConfiguration()) + "/" + filename + "/finished"))) {
+ if (master.getFileSystem().exists(new Path(dest, "finished"))) {
synchronized (this) {
- closeTasksQueued.remove(filename);
- recoveryDelay.remove(filename);
- sortsQueued.remove(filename);
+ closeTasksQueued.remove(sortId);
+ recoveryDelay.remove(sortId);
+ sortsQueued.remove(sortId);
}
continue;
}
recoveryNeeded = true;
synchronized (this) {
- if (!closeTasksQueued.contains(filename) && !sortsQueued.contains(filename)) {
+ if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) {
AccumuloConfiguration aconf = master.getConfiguration().getConfiguration();
- LogCloser closer = Master.createInstanceFromPropertyName(aconf, Property.MASTER_WALOG_CLOSER_IMPLEMETATION, LogCloser.class,
- new HadoopLogCloser());
- Long delay = recoveryDelay.get(filename);
+ LogCloser closer = Master.createInstanceFromPropertyName(aconf, Property.MASTER_WALOG_CLOSER_IMPLEMETATION, LogCloser.class, new HadoopLogCloser());
+ Long delay = recoveryDelay.get(sortId);
if (delay == null) {
delay = master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY);
} else {
delay = Math.min(2 * delay, 1000 * 60 * 5l);
}
-
+
log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s) created for " + host + ", tablet " + extent + " holds a reference");
-
- executor.schedule(new LogSortTask(closer, host, filename), delay, TimeUnit.MILLISECONDS);
- closeTasksQueued.add(filename);
- recoveryDelay.put(filename, delay);
+
+ executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS);
+ closeTasksQueued.add(sortId);
+ recoveryDelay.put(sortId, delay);
}
}
}
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MergeInfo.java Mon Jun 24 21:34:20 2013
@@ -28,7 +28,6 @@ import org.apache.hadoop.io.Writable;
*
* Writable to serialize for zookeeper and the Tablet
*/
-
public class MergeInfo implements Writable {
public enum Operation {
@@ -36,28 +35,28 @@ public class MergeInfo implements Writab
}
MergeState state = MergeState.NONE;
- KeyExtent range;
+ KeyExtent extent;
Operation operation = Operation.MERGE;
public MergeInfo() {}
@Override
public void readFields(DataInput in) throws IOException {
- range = new KeyExtent();
- range.readFields(in);
+ extent = new KeyExtent();
+ extent.readFields(in);
state = MergeState.values()[in.readInt()];
operation = Operation.values()[in.readInt()];
}
@Override
public void write(DataOutput out) throws IOException {
- range.write(out);
+ extent.write(out);
out.writeInt(state.ordinal());
out.writeInt(operation.ordinal());
}
public MergeInfo(KeyExtent range, Operation op) {
- this.range = range;
+ this.extent = range;
this.operation = op;
}
@@ -65,8 +64,8 @@ public class MergeInfo implements Writab
return state;
}
- public KeyExtent getRange() {
- return range;
+ public KeyExtent getExtent() {
+ return extent;
}
public Operation getOperation() {
@@ -81,27 +80,28 @@ public class MergeInfo implements Writab
return this.operation.equals(Operation.DELETE);
}
- public boolean needsToBeChopped(KeyExtent extent) {
+ public boolean needsToBeChopped(KeyExtent otherExtent) {
// During a delete, the block after the merge will be stretched to cover the deleted area.
// Therefore, it needs to be chopped
- if (!extent.getTableId().equals(range.getTableId()))
+ if (!otherExtent.getTableId().equals(extent.getTableId()))
return false;
if (isDelete())
- return extent.getPrevEndRow() != null && extent.getPrevEndRow().equals(range.getEndRow());
+ return otherExtent.getPrevEndRow() != null && otherExtent.getPrevEndRow().equals(extent.getEndRow());
else
- return this.range.overlaps(extent);
+ return this.extent.overlaps(otherExtent);
}
- public boolean overlaps(KeyExtent extent) {
- boolean result = this.range.overlaps(extent);
- if (!result && needsToBeChopped(extent))
+ public boolean overlaps(KeyExtent otherExtent) {
+ boolean result = this.extent.overlaps(otherExtent);
+ if (!result && needsToBeChopped(otherExtent))
return true;
return result;
}
+ @Override
public String toString() {
if (!state.equals(MergeState.NONE))
- return "Merge " + operation.toString() + " of " + range + " State: " + state;
+ return "Merge " + operation.toString() + " of " + extent + " State: " + state;
return "No Merge in progress";
}
}
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java Mon Jun 24 21:34:20 2013
@@ -26,9 +26,11 @@ import org.apache.accumulo.core.client.S
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.MetadataTable;
+import org.apache.accumulo.core.util.RootTable;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
@@ -53,9 +55,9 @@ public class MergeStats {
this.info = info;
if (info.getState().equals(MergeState.NONE))
return;
- if (info.getRange().getEndRow() == null)
+ if (info.getExtent().getEndRow() == null)
upperSplit = true;
- if (info.getRange().getPrevEndRow() == null)
+ if (info.getExtent().getPrevEndRow() == null)
lowerSplit = true;
}
@@ -68,11 +70,11 @@ public class MergeStats {
return;
if (info.getState().equals(MergeState.NONE))
return;
- if (!upperSplit && info.getRange().getEndRow().equals(ke.getPrevEndRow())) {
+ if (!upperSplit && info.getExtent().getEndRow().equals(ke.getPrevEndRow())) {
log.info("Upper split found");
upperSplit = true;
}
- if (!lowerSplit && info.getRange().getPrevEndRow().equals(ke.getEndRow())) {
+ if (!lowerSplit && info.getExtent().getPrevEndRow().equals(ke.getEndRow())) {
log.info("Lower split found");
lowerSplit = true;
}
@@ -100,79 +102,79 @@ public class MergeStats {
if (state == MergeState.NONE)
return state;
if (total == 0) {
- log.trace("failed to see any tablets for this range, ignoring " + info.getRange());
+ log.trace("failed to see any tablets for this range, ignoring " + info.getExtent());
return state;
}
- log.info("Computing next merge state for " + info.getRange() + " which is presently " + state + " isDelete : " + info.isDelete());
+ log.info("Computing next merge state for " + info.getExtent() + " which is presently " + state + " isDelete : " + info.isDelete());
if (state == MergeState.STARTED) {
state = MergeState.SPLITTING;
}
if (state == MergeState.SPLITTING) {
log.info(hosted + " are hosted, total " + total);
if (!info.isDelete() && total == 1) {
- log.info("Merge range is already contained in a single tablet " + info.getRange());
+ log.info("Merge range is already contained in a single tablet " + info.getExtent());
state = MergeState.COMPLETE;
} else if (hosted == total) {
if (info.isDelete()) {
if (!lowerSplit)
- log.info("Waiting for " + info + " lower split to occur " + info.getRange());
+ log.info("Waiting for " + info + " lower split to occur " + info.getExtent());
else if (!upperSplit)
- log.info("Waiting for " + info + " upper split to occur " + info.getRange());
+ log.info("Waiting for " + info + " upper split to occur " + info.getExtent());
else
state = MergeState.WAITING_FOR_CHOPPED;
} else {
state = MergeState.WAITING_FOR_CHOPPED;
}
} else {
- log.info("Waiting for " + hosted + " hosted tablets to be " + total + " " + info.getRange());
+ log.info("Waiting for " + hosted + " hosted tablets to be " + total + " " + info.getExtent());
}
}
if (state == MergeState.WAITING_FOR_CHOPPED) {
- log.info(chopped + " tablets are chopped " + info.getRange());
+ log.info(chopped + " tablets are chopped " + info.getExtent());
if (chopped == needsToBeChopped) {
state = MergeState.WAITING_FOR_OFFLINE;
} else {
- log.info("Waiting for " + chopped + " chopped tablets to be " + needsToBeChopped + " " + info.getRange());
+ log.info("Waiting for " + chopped + " chopped tablets to be " + needsToBeChopped + " " + info.getExtent());
}
}
if (state == MergeState.WAITING_FOR_OFFLINE) {
if (chopped != needsToBeChopped) {
- log.warn("Unexpected state: chopped tablets should be " + needsToBeChopped + " was " + chopped + " merge " + info.getRange());
+ log.warn("Unexpected state: chopped tablets should be " + needsToBeChopped + " was " + chopped + " merge " + info.getExtent());
// Perhaps a split occurred after we chopped, but before we went offline: start over
state = MergeState.WAITING_FOR_CHOPPED;
} else {
- log.info(chopped + " tablets are chopped, " + unassigned + " are offline " + info.getRange());
+ log.info(chopped + " tablets are chopped, " + unassigned + " are offline " + info.getExtent());
if (unassigned == total && chopped == needsToBeChopped) {
if (verifyMergeConsistency(connector, master))
state = MergeState.MERGING;
else
- log.info("Merge consistency check failed " + info.getRange());
+ log.info("Merge consistency check failed " + info.getExtent());
} else {
- log.info("Waiting for " + unassigned + " unassigned tablets to be " + total + " " + info.getRange());
+ log.info("Waiting for " + unassigned + " unassigned tablets to be " + total + " " + info.getExtent());
}
}
}
if (state == MergeState.MERGING) {
if (hosted != 0) {
// Shouldn't happen
- log.error("Unexpected state: hosted tablets should be zero " + hosted + " merge " + info.getRange());
+ log.error("Unexpected state: hosted tablets should be zero " + hosted + " merge " + info.getExtent());
state = MergeState.WAITING_FOR_OFFLINE;
}
if (unassigned != total) {
// Shouldn't happen
- log.error("Unexpected state: unassigned tablets should be " + total + " was " + unassigned + " merge " + info.getRange());
+ log.error("Unexpected state: unassigned tablets should be " + total + " was " + unassigned + " merge " + info.getExtent());
state = MergeState.WAITING_FOR_CHOPPED;
}
- log.info(unassigned + " tablets are unassigned " + info.getRange());
+ log.info(unassigned + " tablets are unassigned " + info.getExtent());
}
return state;
}
private boolean verifyMergeConsistency(Connector connector, CurrentState master) throws TableNotFoundException, IOException {
MergeStats verify = new MergeStats(info);
- Scanner scanner = connector.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+ KeyExtent extent = info.getExtent();
+ Scanner scanner = connector.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY);
MetaDataTableScanner.configureScanner(scanner, master);
- KeyExtent extent = info.getRange();
Text start = extent.getPrevEndRow();
if (start == null) {
start = new Text();
@@ -180,10 +182,6 @@ public class MergeStats {
Text tableId = extent.getTableId();
Text first = KeyExtent.getMetadataEntry(tableId, start);
Range range = new Range(first, false, null, true);
- if (extent.isMeta()) {
- // don't go off the root tablet
- range = new Range(new Key(first).followingKey(PartialKey.ROW), false, Constants.METADATA_ROOT_TABLET_KEYSPACE.getEndKey(), false);
- }
scanner.setRange(range);
KeyExtent prevExtent = null;
@@ -219,7 +217,7 @@ public class MergeStats {
}
} else if (!tls.extent.isPreviousExtent(prevExtent)) {
- log.debug("hole in " + Constants.METADATA_TABLE_NAME);
+ log.debug("hole in " + MetadataTable.NAME);
return false;
}
@@ -252,7 +250,7 @@ public class MergeStats {
in.reset(data, data.length);
info.readFields(in);
}
- System.out.println(String.format("%25s %10s %10s %s", table, info.state, info.operation, info.range));
+ System.out.println(String.format("%25s %10s %10s %s", table, info.state, info.operation, info.extent));
}
}
}
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java Mon Jun 24 21:34:20 2013
@@ -20,7 +20,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Instance;
@@ -29,6 +28,8 @@ import org.apache.accumulo.core.client.T
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.CredentialHelper;
import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.MetadataTable;
+import org.apache.accumulo.core.util.RootTable;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.hadoop.io.Text;
@@ -43,20 +44,30 @@ public class MetaDataStateStore extends
final protected Instance instance;
final protected CurrentState state;
final protected TCredentials auths;
+ final private String targetTableName;
- public MetaDataStateStore(Instance instance, TCredentials auths, CurrentState state) {
+ protected MetaDataStateStore(Instance instance, TCredentials auths, CurrentState state, String targetTableName) {
this.instance = instance;
this.state = state;
this.auths = auths;
+ this.targetTableName = targetTableName;
+ }
+
+ public MetaDataStateStore(Instance instance, TCredentials auths, CurrentState state) {
+ this(instance, auths, state, MetadataTable.NAME);
+ }
+
+ protected MetaDataStateStore(String tableName) {
+ this(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), null, tableName);
}
public MetaDataStateStore() {
- this(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), null);
+ this(MetadataTable.NAME);
}
-
+
@Override
public Iterator<TabletLocationState> iterator() {
- return new MetaDataTableScanner(instance, auths, Constants.NON_ROOT_METADATA_KEYSPACE, state);
+ return new MetaDataTableScanner(instance, auths, RootTable.METADATA_TABLETS_RANGE, state);
}
@Override
@@ -66,8 +77,8 @@ public class MetaDataStateStore extends
for (Assignment assignment : assignments) {
Mutation m = new Mutation(assignment.tablet.getMetadataEntry());
Text cq = assignment.server.asColumnQualifier();
- m.put(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY, cq, assignment.server.asMutationValue());
- m.putDelete(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY, cq);
+ m.put(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY, cq, assignment.server.asMutationValue());
+ m.putDelete(MetadataTable.FUTURE_LOCATION_COLUMN_FAMILY, cq);
writer.addMutation(m);
}
} catch (Exception ex) {
@@ -83,7 +94,7 @@ public class MetaDataStateStore extends
BatchWriter createBatchWriter() {
try {
- return instance.getConnector(auths.getPrincipal(), CredentialHelper.extractToken(auths)).createBatchWriter(Constants.METADATA_TABLE_NAME,
+ return instance.getConnector(auths.getPrincipal(), CredentialHelper.extractToken(auths)).createBatchWriter(targetTableName,
new BatchWriterConfig().setMaxMemory(MAX_MEMORY).setMaxLatency(LATENCY, TimeUnit.MILLISECONDS).setMaxWriteThreads(THREADS));
} catch (TableNotFoundException e) {
// ya, I don't think so
@@ -99,7 +110,7 @@ public class MetaDataStateStore extends
try {
for (Assignment assignment : assignments) {
Mutation m = new Mutation(assignment.tablet.getMetadataEntry());
- m.put(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY, assignment.server.asColumnQualifier(), assignment.server.asMutationValue());
+ m.put(MetadataTable.FUTURE_LOCATION_COLUMN_FAMILY, assignment.server.asColumnQualifier(), assignment.server.asMutationValue());
writer.addMutation(m);
}
} catch (Exception ex) {
@@ -121,10 +132,10 @@ public class MetaDataStateStore extends
for (TabletLocationState tls : tablets) {
Mutation m = new Mutation(tls.extent.getMetadataEntry());
if (tls.current != null) {
- m.putDelete(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY, tls.current.asColumnQualifier());
+ m.putDelete(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY, tls.current.asColumnQualifier());
}
if (tls.future != null) {
- m.putDelete(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY, tls.future.asColumnQualifier());
+ m.putDelete(MetadataTable.FUTURE_LOCATION_COLUMN_FAMILY, tls.future.asColumnQualifier());
}
writer.addMutation(m);
}
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java Mon Jun 24 21:34:20 2013
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.SortedMap;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
@@ -37,8 +36,10 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.CredentialHelper;
import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.MetadataTable;
import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -50,10 +51,14 @@ public class MetaDataTableScanner implem
Iterator<Entry<Key,Value>> iter;
public MetaDataTableScanner(Instance instance, TCredentials auths, Range range, CurrentState state) {
+ this(instance, auths, range, state, MetadataTable.NAME);
+ }
+
+ MetaDataTableScanner(Instance instance, TCredentials auths, Range range, CurrentState state, String tableName) {
// scan over metadata table, looking for tablets in the wrong state based on the live servers and online tables
try {
Connector connector = instance.getConnector(auths.getPrincipal(), CredentialHelper.extractToken(auths));
- mdScanner = connector.createBatchScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS, 8);
+ mdScanner = connector.createBatchScanner(tableName, Authorizations.EMPTY, 8);
configureScanner(mdScanner, state);
mdScanner.setRanges(Collections.singletonList(range));
iter = mdScanner.iterator();
@@ -64,13 +69,13 @@ public class MetaDataTableScanner implem
throw new RuntimeException(ex);
}
}
-
+
static public void configureScanner(ScannerBase scanner, CurrentState state) {
- Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
- scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
- scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY);
- scanner.fetchColumnFamily(Constants.METADATA_LOG_COLUMN_FAMILY);
- scanner.fetchColumnFamily(Constants.METADATA_CHOPPED_COLUMN_FAMILY);
+ MetadataTable.PREV_ROW_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY);
+ scanner.fetchColumnFamily(MetadataTable.FUTURE_LOCATION_COLUMN_FAMILY);
+ scanner.fetchColumnFamily(MetadataTable.LOG_COLUMN_FAMILY);
+ scanner.fetchColumnFamily(MetadataTable.CHOPPED_COLUMN_FAMILY);
scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class));
IteratorSetting tabletChange = new IteratorSetting(1001, "tabletChange", TabletStateChangeIterator.class);
if (state != null) {
@@ -82,7 +87,11 @@ public class MetaDataTableScanner implem
}
public MetaDataTableScanner(Instance instance, TCredentials auths, Range range) {
- this(instance, auths, range, null);
+ this(instance, auths, range, MetadataTable.NAME);
+ }
+
+ public MetaDataTableScanner(Instance instance, TCredentials auths, Range range, String tableName) {
+ this(instance, auths, range, null, tableName);
}
public void close() {
@@ -92,6 +101,7 @@ public class MetaDataTableScanner implem
}
}
+ @Override
public void finalize() {
close();
}
@@ -116,7 +126,7 @@ public class MetaDataTableScanner implem
log.error(ex, ex);
mdScanner.close();
return null;
- }
+ }
}
public static TabletLocationState createTabletLocationState(Key k, Value v) throws IOException, BadLocationStateException {
@@ -134,30 +144,30 @@ public class MetaDataTableScanner implem
Text cf = key.getColumnFamily();
Text cq = key.getColumnQualifier();
- if (cf.compareTo(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY) == 0) {
+ if (cf.compareTo(MetadataTable.FUTURE_LOCATION_COLUMN_FAMILY) == 0) {
TServerInstance location = new TServerInstance(entry.getValue(), cq);
if (future != null) {
throw new BadLocationStateException("found two assignments for the same extent " + key.getRow() + ": " + future + " and " + location);
}
future = location;
- } else if (cf.compareTo(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY) == 0) {
+ } else if (cf.compareTo(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY) == 0) {
TServerInstance location = new TServerInstance(entry.getValue(), cq);
if (current != null) {
throw new BadLocationStateException("found two locations for the same extent " + key.getRow() + ": " + current + " and " + location);
}
current = location;
- } else if (cf.compareTo(Constants.METADATA_LOG_COLUMN_FAMILY) == 0) {
+ } else if (cf.compareTo(MetadataTable.LOG_COLUMN_FAMILY) == 0) {
String[] split = entry.getValue().toString().split("\\|")[0].split(";");
walogs.add(Arrays.asList(split));
- } else if (cf.compareTo(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY) == 0) {
+ } else if (cf.compareTo(MetadataTable.LAST_LOCATION_COLUMN_FAMILY) == 0) {
TServerInstance location = new TServerInstance(entry.getValue(), cq);
if (last != null) {
throw new BadLocationStateException("found two last locations for the same extent " + key.getRow() + ": " + last + " and " + location);
}
last = new TServerInstance(entry.getValue(), cq);
- } else if (cf.compareTo(Constants.METADATA_CHOPPED_COLUMN_FAMILY) == 0) {
+ } else if (cf.compareTo(MetadataTable.CHOPPED_COLUMN_FAMILY) == 0) {
chopped = true;
- } else if (Constants.METADATA_PREV_ROW_COLUMN.equals(cf, cq)) {
+ } else if (MetadataTable.PREV_ROW_COLUMN.equals(cf, cq)) {
extent = new KeyExtent(row, entry.getValue());
}
}
@@ -176,7 +186,7 @@ public class MetaDataTableScanner implem
throw new RuntimeException(ex);
} catch (BadLocationStateException ex) {
throw new RuntimeException(ex);
- }
+ }
}
@Override
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java Mon Jun 24 21:34:20 2013
@@ -18,19 +18,23 @@ package org.apache.accumulo.server.maste
import java.util.Iterator;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.RootTable;
public class RootTabletStateStore extends MetaDataStateStore {
public RootTabletStateStore(Instance instance, TCredentials auths, CurrentState state) {
- super(instance, auths, state);
+ super(instance, auths, state, RootTable.NAME);
+ }
+
+ public RootTabletStateStore() {
+ super(RootTable.NAME);
}
@Override
public Iterator<TabletLocationState> iterator() {
- return new MetaDataTableScanner(instance, auths, Constants.METADATA_ROOT_TABLET_KEYSPACE, state);
+ return new MetaDataTableScanner(instance, auths, RootTable.METADATA_TABLETS_RANGE, state, RootTable.NAME);
}
@Override
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java Mon Jun 24 21:34:20 2013
@@ -17,17 +17,15 @@
package org.apache.accumulo.server.master.state;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.master.thrift.MasterGoalState;
import org.apache.accumulo.core.security.SecurityUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.FileSystem;
public class SetGoalState {
@@ -41,7 +39,7 @@ public class SetGoalState {
}
SecurityUtil.serverLogin();
- FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+ VolumeManager fs = VolumeManagerImpl.get();
Accumulo.waitForZookeeperAndHdfs(fs);
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZMASTER_GOAL_STATE, args[0].getBytes(),
NodeExistsPolicy.OVERWRITE);
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java Mon Jun 24 21:34:20 2013
@@ -19,10 +19,10 @@ package org.apache.accumulo.server.maste
import java.io.Serializable;
import java.net.InetSocketAddress;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.MetadataTable;
import org.apache.accumulo.server.util.AddressUtil;
import org.apache.hadoop.io.Text;
@@ -58,19 +58,19 @@ public class TServerInstance implements
}
public void putLocation(Mutation m) {
- m.put(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY, asColumnQualifier(), asMutationValue());
+ m.put(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY, asColumnQualifier(), asMutationValue());
}
public void putFutureLocation(Mutation m) {
- m.put(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY, asColumnQualifier(), asMutationValue());
+ m.put(MetadataTable.FUTURE_LOCATION_COLUMN_FAMILY, asColumnQualifier(), asMutationValue());
}
public void putLastLocation(Mutation m) {
- m.put(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY, asColumnQualifier(), asMutationValue());
+ m.put(MetadataTable.LAST_LOCATION_COLUMN_FAMILY, asColumnQualifier(), asMutationValue());
}
public void clearLastLocation(Mutation m) {
- m.putDelete(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY, asColumnQualifier());
+ m.putDelete(MetadataTable.LAST_LOCATION_COLUMN_FAMILY, asColumnQualifier());
}
@Override
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java Mon Jun 24 21:34:20 2013
@@ -98,7 +98,7 @@ public class TabletStateChangeIterator e
while (buffer.available() > 0) {
MergeInfo mergeInfo = new MergeInfo();
mergeInfo.readFields(buffer);
- result.put(mergeInfo.range.getTableId(), mergeInfo);
+ result.put(mergeInfo.extent.getTableId(), mergeInfo);
}
return result;
} catch (Exception ex) {
@@ -126,7 +126,7 @@ public class TabletStateChangeIterator e
}
// we always want data about merges
MergeInfo merge = merges.get(tls.extent.getTableId());
- if (merge != null && merge.getRange() != null && merge.getRange().overlaps(tls.extent)) {
+ if (merge != null && merge.getExtent() != null && merge.getExtent().overlaps(tls.extent)) {
return;
}
// is the table supposed to be online or offline?
@@ -173,7 +173,7 @@ public class TabletStateChangeIterator e
DataOutputBuffer buffer = new DataOutputBuffer();
try {
for (MergeInfo info : merges) {
- KeyExtent extent = info.getRange();
+ KeyExtent extent = info.getExtent();
if (extent != null && !info.getState().equals(MergeState.NONE)) {
info.write(buffer);
}
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java Mon Jun 24 21:34:20 2013
@@ -37,6 +37,7 @@ public abstract class TabletStateStore i
/**
* Scan the information about the tablets covered by this store
*/
+ @Override
abstract public Iterator<TabletLocationState> iterator();
/**
@@ -68,6 +69,8 @@ public abstract class TabletStateStore i
TabletStateStore store;
if (tls.extent.isRootTablet()) {
store = new ZooTabletStateStore();
+ } else if (tls.extent.isMeta()) {
+ store = new RootTabletStateStore();
} else {
store = new MetaDataStateStore();
}
@@ -78,6 +81,8 @@ public abstract class TabletStateStore i
TabletStateStore store;
if (assignment.tablet.isRootTablet()) {
store = new ZooTabletStateStore();
+ } else if (assignment.tablet.isMeta()) {
+ store = new RootTabletStateStore();
} else {
store = new MetaDataStateStore();
}
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java Mon Jun 24 21:34:20 2013
@@ -23,8 +23,8 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.RootTable;
import org.apache.accumulo.server.util.MetadataTable;
import org.apache.commons.lang.NotImplementedException;
import org.apache.log4j.Logger;
@@ -60,9 +60,9 @@ public class ZooTabletStateStore extends
public TabletLocationState next() {
finished = true;
try {
- byte[] future = store.get(Constants.ZROOT_TABLET_FUTURE_LOCATION);
- byte[] current = store.get(Constants.ZROOT_TABLET_LOCATION);
- byte[] last = store.get(Constants.ZROOT_TABLET_LAST_LOCATION);
+ byte[] future = store.get(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
+ byte[] current = store.get(RootTable.ZROOT_TABLET_LOCATION);
+ byte[] last = store.get(RootTable.ZROOT_TABLET_LAST_LOCATION);
TServerInstance currentSession = null;
TServerInstance futureSession = null;
@@ -79,8 +79,8 @@ public class ZooTabletStateStore extends
futureSession = null;
}
List<Collection<String>> logs = new ArrayList<Collection<String>>();
- for (String entry : store.getChildren(Constants.ZROOT_TABLET_WALOGS)) {
- byte[] logInfo = store.get(Constants.ZROOT_TABLET_WALOGS + "/" + entry);
+ for (String entry : store.getChildren(RootTable.ZROOT_TABLET_WALOGS)) {
+ byte[] logInfo = store.get(RootTable.ZROOT_TABLET_WALOGS + "/" + entry);
if (logInfo != null) {
MetadataTable.LogEntry logEntry = new MetadataTable.LogEntry();
logEntry.fromBytes(logInfo);
@@ -88,7 +88,7 @@ public class ZooTabletStateStore extends
log.debug("root tablet logSet " + logEntry.logSet);
}
}
- TabletLocationState result = new TabletLocationState(Constants.ROOT_TABLET_EXTENT, futureSession, currentSession, lastSession, logs, false);
+ TabletLocationState result = new TabletLocationState(RootTable.EXTENT, futureSession, currentSession, lastSession, logs, false);
log.debug("Returning root tablet state: " + result);
return result;
} catch (Exception ex) {
@@ -120,7 +120,7 @@ public class ZooTabletStateStore extends
if (assignments.size() != 1)
throw new IllegalArgumentException("There is only one root tablet");
Assignment assignment = assignments.iterator().next();
- if (assignment.tablet.compareTo(Constants.ROOT_TABLET_EXTENT) != 0)
+ if (assignment.tablet.compareTo(RootTable.EXTENT) != 0)
throw new IllegalArgumentException("You can only store the root tablet location");
String value = AddressUtil.toString(assignment.server.getLocation()) + "|" + assignment.server.getSession();
Iterator<TabletLocationState> currentIter = iterator();
@@ -128,7 +128,7 @@ public class ZooTabletStateStore extends
if (current.current != null) {
throw new IllegalDSException("Trying to set the root tablet location: it is already set to " + current.current);
}
- store.put(Constants.ZROOT_TABLET_FUTURE_LOCATION, value.getBytes());
+ store.put(RootTable.ZROOT_TABLET_FUTURE_LOCATION, value.getBytes());
}
@Override
@@ -136,7 +136,7 @@ public class ZooTabletStateStore extends
if (assignments.size() != 1)
throw new IllegalArgumentException("There is only one root tablet");
Assignment assignment = assignments.iterator().next();
- if (assignment.tablet.compareTo(Constants.ROOT_TABLET_EXTENT) != 0)
+ if (assignment.tablet.compareTo(RootTable.EXTENT) != 0)
throw new IllegalArgumentException("You can only store the root tablet location");
String value = AddressUtil.toString(assignment.server.getLocation()) + "|" + assignment.server.getSession();
Iterator<TabletLocationState> currentIter = iterator();
@@ -147,10 +147,10 @@ public class ZooTabletStateStore extends
if (!current.future.equals(assignment.server)) {
throw new IllegalDSException("Root tablet is already assigned to " + current.future);
}
- store.put(Constants.ZROOT_TABLET_LOCATION, value.getBytes());
- store.put(Constants.ZROOT_TABLET_LAST_LOCATION, value.getBytes());
+ store.put(RootTable.ZROOT_TABLET_LOCATION, value.getBytes());
+ store.put(RootTable.ZROOT_TABLET_LAST_LOCATION, value.getBytes());
// Make the following unnecessary by making the entire update atomic
- store.remove(Constants.ZROOT_TABLET_FUTURE_LOCATION);
+ store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
log.debug("Put down root tablet location");
}
@@ -159,10 +159,10 @@ public class ZooTabletStateStore extends
if (tablets.size() != 1)
throw new IllegalArgumentException("There is only one root tablet");
TabletLocationState tls = tablets.iterator().next();
- if (tls.extent.compareTo(Constants.ROOT_TABLET_EXTENT) != 0)
+ if (tls.extent.compareTo(RootTable.EXTENT) != 0)
throw new IllegalArgumentException("You can only store the root tablet location");
- store.remove(Constants.ZROOT_TABLET_LOCATION);
- store.remove(Constants.ZROOT_TABLET_FUTURE_LOCATION);
+ store.remove(RootTable.ZROOT_TABLET_LOCATION);
+ store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
log.debug("unassign root tablet location");
}
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Mon Jun 24 21:34:20 2013
@@ -35,8 +35,6 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
-import org.apache.accumulo.trace.instrument.TraceExecutorService;
-import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
@@ -56,6 +54,7 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.UtilWaitThread;
@@ -63,6 +62,7 @@ import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.TServerInstance;
@@ -71,17 +71,17 @@ import org.apache.accumulo.server.tablet
import org.apache.accumulo.server.util.MetadataTable;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.accumulo.trace.instrument.TraceExecutorService;
+import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
-
/*
* Bulk import makes requests of tablet servers, and those requests can take a
* long time. Our communications to the tablet server may fail, so we won't know
@@ -104,7 +104,7 @@ import org.apache.thrift.TException;
public class BulkImport extends MasterRepo {
public static final String FAILURES_TXT = "failures.txt";
-
+
private static final long serialVersionUID = 1L;
private static final Logger log = Logger.getLogger(BulkImport.class);
@@ -146,8 +146,8 @@ public class BulkImport extends MasterRe
Utils.getReadLock(tableId, tid).lock();
// check that the error directory exists and is empty
- FileSystem fs = master.getFileSystem();
-
+ VolumeManager fs = master.getFileSystem();
+
Path errorPath = new Path(errorDir);
FileStatus errorStatus = null;
try {
@@ -179,8 +179,23 @@ public class BulkImport extends MasterRe
}
}
- private Path createNewBulkDir(FileSystem fs, String tableId) throws IOException {
- Path directory = new Path(ServerConstants.getTablesDir() + "/" + tableId);
+ private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException {
+ String tableDir = null;
+ loop:
+ for (String dir : fs.getFileSystems().keySet()) {
+ if (this.sourceDir.startsWith(dir)) {
+ for (String path : ServerConstants.getTablesDirs()) {
+ if (path.startsWith(dir)) {
+ tableDir = path;
+ break loop;
+ }
+ }
+ break;
+ }
+ }
+ if (tableDir == null)
+ throw new IllegalStateException(sourceDir + " is not in a known namespace");
+ Path directory = new Path(tableDir + "/" + tableId);
fs.mkdirs(directory);
// only one should be able to create the lock file
@@ -203,7 +218,7 @@ public class BulkImport extends MasterRe
}
}
- private String prepareBulkImport(FileSystem fs, String dir, String tableId) throws IOException {
+ private String prepareBulkImport(VolumeManager fs, String dir, String tableId) throws IOException {
Path bulkDir = createNewBulkDir(fs, tableId);
MetadataTable.addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
@@ -334,7 +349,7 @@ class CompleteBulkImport extends MasterR
class CopyFailed extends MasterRepo {
private static final long serialVersionUID = 1L;
-
+
private String tableId;
private String source;
private String bulk;
@@ -367,10 +382,10 @@ class CopyFailed extends MasterRepo {
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
- //This needs to execute after the arbiter is stopped
-
- FileSystem fs = master.getFileSystem();
-
+ // This needs to execute after the arbiter is stopped
+
+ VolumeManager fs = master.getFileSystem();
+
if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
return new CleanUpBulkImport(tableId, source, bulk, error);
@@ -394,12 +409,12 @@ class CopyFailed extends MasterRepo {
* I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that
* have no loaded markers.
*/
-
+
// determine which failed files were loaded
Connector conn = master.getConnector();
- Scanner mscanner = new IsolatedScanner(conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS));
+ Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
- mscanner.fetchColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY);
+ mscanner.fetchColumnFamily(MetadataTable.BULKFILE_COLUMN_FAMILY);
for (Entry<Key,Value> entry : mscanner) {
if (Long.parseLong(entry.getValue().toString()) == tid) {
@@ -439,8 +454,8 @@ class CopyFailed extends MasterRepo {
bifCopyQueue.waitUntilDone(workIds);
}
-
- fs.delete(new Path(error, BulkImport.FAILURES_TXT), true);
+
+ fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
return new CleanUpBulkImport(tableId, source, bulk, error);
}
@@ -452,7 +467,7 @@ class LoadFiles extends MasterRepo {
private static ExecutorService threadPool = null;
static {
-
+
}
private static final Logger log = Logger.getLogger(BulkImport.class);
@@ -485,27 +500,27 @@ class LoadFiles extends MasterRepo {
threadPool = new TraceExecutorService(pool);
}
}
-
+
@Override
public Repo<Master> call(final long tid, final Master master) throws Exception {
initializeThreadPool(master);
final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
- FileSystem fs = master.getFileSystem();
+ VolumeManager fs = master.getFileSystem();
List<FileStatus> files = new ArrayList<FileStatus>();
for (FileStatus entry : fs.listStatus(new Path(bulk))) {
files.add(entry);
}
log.debug("tid " + tid + " importing " + files.size() + " files");
-
+
Path writable = new Path(this.errorDir, ".iswritable");
if (!fs.createNewFile(writable)) {
// Maybe this is a re-try... clear the flag and try again
- fs.delete(writable, false);
+ fs.delete(writable);
if (!fs.createNewFile(writable))
throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
"Unable to write to " + this.errorDir);
}
- fs.delete(writable, false);
+ fs.delete(writable);
final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
for (FileStatus f : files)
@@ -576,7 +591,7 @@ class LoadFiles extends MasterRepo {
} finally {
out.close();
}
-
+
// return the next step, which will perform cleanup
return new CompleteBulkImport(tableId, source, bulk, errorDir);
}
@@ -600,5 +615,5 @@ class LoadFiles extends MasterRepo {
result.append("]");
return result.toString();
}
-
+
}
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java Mon Jun 24 21:34:20 2013
@@ -41,6 +41,9 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.MetadataTable;
+import org.apache.accumulo.core.util.RootTable;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;
@@ -87,20 +90,20 @@ class CompactionDriver extends MasterRep
// compaction was canceled
throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Compaction canceled");
}
-
+
MapCounter<TServerInstance> serversToFlush = new MapCounter<TServerInstance>();
Connector conn = master.getConnector();
- Scanner scanner = new IsolatedScanner(conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS));
+ Scanner scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
Range range = new KeyExtent(new Text(tableId), null, startRow == null ? null : new Text(startRow)).toMetadataRange();
- if (tableId.equals(Constants.METADATA_TABLE_ID))
- range = range.clip(new Range(Constants.ROOT_TABLET_EXTENT.getMetadataEntry(), false, null, true));
+ if (tableId.equals(MetadataTable.ID))
+ range = range.clip(new Range(RootTable.EXTENT.getMetadataEntry(), false, null, true));
scanner.setRange(range);
- Constants.METADATA_COMPACT_COLUMN.fetch(scanner);
- Constants.METADATA_DIRECTORY_COLUMN.fetch(scanner);
- scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
+ MetadataTable.COMPACT_COLUMN.fetch(scanner);
+ MetadataTable.DIRECTORY_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY);
long t1 = System.currentTimeMillis();
RowIterator ri = new RowIterator(scanner);
@@ -119,10 +122,10 @@ class CompactionDriver extends MasterRep
entry = row.next();
Key key = entry.getKey();
- if (Constants.METADATA_COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
+ if (MetadataTable.COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
tabletCompactID = Long.parseLong(entry.getValue().toString());
- if (Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY.equals(key.getColumnFamily()))
+ if (MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY.equals(key.getColumnFamily()))
server = new TServerInstance(entry.getValue(), key.getColumnQualifier());
}
@@ -189,7 +192,6 @@ class CompactionDriver extends MasterRep
}
-
public class CompactRange extends MasterRepo {
private static final long serialVersionUID = 1L;
@@ -214,7 +216,7 @@ public class CompactRange extends Master
endRow = null;
iterators = Collections.emptyList();
}
-
+
@Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(startRow != null);
@@ -275,7 +277,7 @@ public class CompactRange extends Master
return iterators;
}
}
-
+
public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) throws ThriftTableOperationException {
this.tableId = tableId;
this.startRow = startRow.length == 0 ? null : startRow;
@@ -286,7 +288,7 @@ public class CompactRange extends Master
} else {
iterators = null;
}
-
+
if (this.startRow != null && this.endRow != null && new Text(startRow).compareTo(new Text(endRow)) >= 0)
throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.BAD_RANGE,
"start row must be less than end row");
@@ -317,13 +319,13 @@ public class CompactRange extends Master
for (int i = 1; i < tokens.length; i++) {
if (tokens[i].startsWith(txidString))
continue; // skip self
-
+
throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER,
"Another compaction with iterators is running");
}
-
+
StringBuilder encodedIterators = new StringBuilder();
-
+
if (iterators != null) {
Hex hex = new Hex();
encodedIterators.append(",");
@@ -354,7 +356,7 @@ public class CompactRange extends Master
String cvs = new String(currentValue);
String[] tokens = cvs.split(",");
long flushID = Long.parseLong(new String(tokens[0]));
-
+
String txidString = String.format("%016x", txid);
StringBuilder encodedIterators = new StringBuilder();
@@ -368,9 +370,9 @@ public class CompactRange extends Master
return ("" + flushID + encodedIterators).getBytes();
}
});
-
+
}
-
+
@Override
public void undo(long tid, Master environment) throws Exception {
try {
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java Mon Jun 24 21:34:20 2013
@@ -27,25 +27,21 @@ import org.apache.accumulo.core.client.i
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.tables.TableManager;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.tabletserver.TabletTime;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.accumulo.server.util.MetadataTable;
import org.apache.accumulo.server.util.TablePropUtil;
import org.apache.accumulo.server.util.TabletOperations;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -148,18 +144,19 @@ class CreateDir extends MasterRepo {
}
@Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
- FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration()));
- String dir = ServerConstants.getTablesDir() + "/" + tableInfo.tableId;
- TabletOperations.createTabletDirectory(fs, dir, null);
+ public Repo<Master> call(long tid, Master master) throws Exception {
+ VolumeManager fs = master.getFileSystem();
+ TabletOperations.createTabletDirectory(fs, tableInfo.tableId, null);
return new PopulateMetadata(tableInfo);
}
@Override
- public void undo(long tid, Master environment) throws Exception {
- FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration()));
- String dir = ServerConstants.getTablesDir() + "/" + tableInfo.tableId;
- fs.delete(new Path(dir), true);
+ public void undo(long tid, Master master) throws Exception {
+ VolumeManager fs = master.getFileSystem();
+ for(String dir : ServerConstants.getTablesDirs()) {
+ fs.deleteRecursively(new Path(dir + "/" + tableInfo.tableId));
+ }
+
}
}
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java Mon Jun 24 21:34:20 2013
@@ -20,7 +20,6 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Map.Entry;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
@@ -34,9 +33,10 @@ import org.apache.accumulo.core.data.Ran
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.GrepIterator;
import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.MetaDataTableScanner;
import org.apache.accumulo.server.master.state.TabletLocationState;
@@ -46,7 +46,6 @@ import org.apache.accumulo.server.proble
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -88,7 +87,7 @@ class CleanUp extends MasterRepo {
boolean done = true;
Range tableRange = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
- Scanner scanner = master.getConnector().createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+ Scanner scanner = master.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
MetaDataTableScanner.configureScanner(scanner, master);
scanner.setRange(tableRange);
@@ -126,10 +125,10 @@ class CleanUp extends MasterRepo {
try {
// look for other tables that references this tables files
Connector conn = master.getConnector();
- BatchScanner bs = conn.createBatchScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS, 8);
+ BatchScanner bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 8);
try {
- bs.setRanges(Collections.singleton(Constants.NON_ROOT_METADATA_KEYSPACE));
- bs.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
+ bs.setRanges(Collections.singleton(MetadataTable.NON_ROOT_KEYSPACE));
+ bs.fetchColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY);
IteratorSetting cfg = new IteratorSetting(40, "grep", GrepIterator.class);
GrepIterator.setTerm(cfg, "../" + tableId + "/");
bs.addScanIterator(cfg);
@@ -145,7 +144,7 @@ class CleanUp extends MasterRepo {
} catch (Exception e) {
refCount = -1;
- log.error("Failed to scan " + Constants.METADATA_TABLE_NAME + " looking for references to deleted table " + tableId, e);
+ log.error("Failed to scan " + MetadataTable.NAME + " looking for references to deleted table " + tableId, e);
}
// remove metadata table entries
@@ -168,8 +167,10 @@ class CleanUp extends MasterRepo {
if (refCount == 0) {
// delete the map files
try {
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
- fs.delete(new Path(ServerConstants.getTablesDir(), tableId), true);
+ VolumeManager fs = master.getFileSystem();
+ for (String dir : ServerConstants.getTablesDirs()) {
+ fs.deleteRecursively(new Path(dir, tableId));
+ }
} catch (IOException e) {
log.error("Unable to remove deleted table directory", e);
}
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java Mon Jun 24 21:34:20 2013
@@ -22,7 +22,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
-import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -46,13 +45,15 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.MetadataTable;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.master.Master;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -95,12 +96,12 @@ class WriteExportFiles extends MasterRep
checkOffline(conn);
- Scanner metaScanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+ Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
metaScanner.setRange(new KeyExtent(new Text(tableInfo.tableID), null, null).toMetadataRange());
// scan for locations
- metaScanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
- metaScanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY);
+ metaScanner.fetchColumnFamily(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY);
+ metaScanner.fetchColumnFamily(MetadataTable.FUTURE_LOCATION_COLUMN_FAMILY);
if (metaScanner.iterator().hasNext()) {
return 500;
@@ -109,7 +110,7 @@ class WriteExportFiles extends MasterRep
// use the same range to check for walogs that we used to check for hosted (or future hosted) tablets
// this is done as a separate scan after we check for locations, because walogs are okay only if there is no location
metaScanner.clearColumns();
- metaScanner.fetchColumnFamily(Constants.METADATA_LOG_COLUMN_FAMILY);
+ metaScanner.fetchColumnFamily(MetadataTable.LOG_COLUMN_FAMILY);
if (metaScanner.iterator().hasNext()) {
throw new ThriftTableOperationException(tableInfo.tableID, tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER,
@@ -139,7 +140,7 @@ class WriteExportFiles extends MasterRep
Utils.unreserveTable(tableInfo.tableID, tid, false);
}
- public static void exportTable(FileSystem fs, Connector conn, String tableName, String tableID, String exportDir) throws Exception {
+ public static void exportTable(VolumeManager fs, Connector conn, String tableName, String tableID, String exportDir) throws Exception {
fs.mkdirs(new Path(exportDir));
@@ -160,7 +161,7 @@ class WriteExportFiles extends MasterRep
osw.append("srcZookeepers:" + conn.getInstance().getZooKeepers() + "\n");
osw.append("srcTableName:" + tableName + "\n");
osw.append("srcTableID:" + tableID + "\n");
- osw.append(ExportTable.DATA_VERSION_PROP + ":" + Constants.DATA_VERSION + "\n");
+ osw.append(ExportTable.DATA_VERSION_PROP + ":" + ServerConstants.DATA_VERSION + "\n");
osw.append("srcCodeVersion:" + Constants.VERSION + "\n");
osw.flush();
@@ -169,7 +170,7 @@ class WriteExportFiles extends MasterRep
exportConfig(conn, tableID, zipOut, dataOut);
dataOut.flush();
- Map<String,String> uniqueFiles = exportMetadata(conn, tableID, zipOut, dataOut);
+ Map<String,String> uniqueFiles = exportMetadata(fs, conn, tableID, zipOut, dataOut);
dataOut.close();
dataOut = null;
@@ -182,24 +183,16 @@ class WriteExportFiles extends MasterRep
}
}
- private static void createDistcpFile(FileSystem fs, String exportDir, Path exportMetaFilePath, Map<String,String> uniqueFiles) throws IOException {
+ private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath, Map<String,String> uniqueFiles) throws IOException {
BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(exportDir, "distcp.txt"), false)));
try {
- URI uri = fs.getUri();
-
- for (String relPath : uniqueFiles.values()) {
- Path absPath = new Path(uri.getScheme(), uri.getAuthority(), ServerConstants.getTablesDir() + relPath);
- distcpOut.append(absPath.toUri().toString());
+ for (String file : uniqueFiles.values()) {
+ distcpOut.append(file);
distcpOut.newLine();
}
- Path absEMP = exportMetaFilePath;
- if (!exportMetaFilePath.isAbsolute())
- absEMP = new Path(fs.getWorkingDirectory().toUri().getPath(), exportMetaFilePath);
-
- distcpOut.append(new Path(uri.getScheme(), uri.getAuthority(), absEMP.toString()).toUri().toString());
-
+ distcpOut.append(exportMetaFilePath.toString());
distcpOut.newLine();
distcpOut.close();
@@ -211,41 +204,35 @@ class WriteExportFiles extends MasterRep
}
}
- private static Map<String,String> exportMetadata(Connector conn, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut) throws IOException,
+ private static Map<String,String> exportMetadata(VolumeManager fs, Connector conn, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut) throws IOException,
TableNotFoundException {
zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_METADATA_FILE));
Map<String,String> uniqueFiles = new HashMap<String,String>();
- Scanner metaScanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
- metaScanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
- Constants.METADATA_PREV_ROW_COLUMN.fetch(metaScanner);
- Constants.METADATA_TIME_COLUMN.fetch(metaScanner);
+ Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ metaScanner.fetchColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY);
+ MetadataTable.PREV_ROW_COLUMN.fetch(metaScanner);
+ MetadataTable.TIME_COLUMN.fetch(metaScanner);
metaScanner.setRange(new KeyExtent(new Text(tableID), null, null).toMetadataRange());
for (Entry<Key,Value> entry : metaScanner) {
entry.getKey().write(dataOut);
entry.getValue().write(dataOut);
- if (entry.getKey().getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
- String relPath = entry.getKey().getColumnQualifierData().toString();
-
- if (relPath.startsWith("../"))
- relPath = relPath.substring(2);
- else
- relPath = "/" + tableID + relPath;
-
- String tokens[] = relPath.split("/");
- if (tokens.length != 4) {
- throw new RuntimeException("Illegal path " + relPath);
+ if (entry.getKey().getColumnFamily().equals(MetadataTable.DATAFILE_COLUMN_FAMILY)) {
+ String path = fs.getFullPath(entry.getKey()).toString();
+ String tokens[] = path.split("/");
+ if (tokens.length < 1) {
+ throw new RuntimeException("Illegal path " + path);
}
- String filename = tokens[3];
+ String filename = tokens[tokens.length - 1];
String existingPath = uniqueFiles.get(filename);
if (existingPath == null) {
- uniqueFiles.put(filename, relPath);
- } else if (!existingPath.equals(relPath)) {
+ uniqueFiles.put(filename, path);
+ } else if (!existingPath.equals(path)) {
// make sure file names are unique, should only apply for tables with file names generated by Accumulo 1.3 and earlier
throw new IOException("Cannot export table with nonunique file names " + filename + ". Major compact table.");
}
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java Mon Jun 24 21:34:20 2013
@@ -52,6 +52,7 @@ import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.tables.TableManager;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
@@ -98,7 +99,7 @@ class FinishImportTable extends MasterRe
@Override
public Repo<Master> call(long tid, Master env) throws Exception {
- env.getFileSystem().delete(new Path(tableInfo.importDir, "mappings.txt"), true);
+ env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir, "mappings.txt"));
TableManager.getInstance().transitionTableState(tableInfo.tableId, TableState.ONLINE);
@@ -136,7 +137,7 @@ class MoveExportedFiles extends MasterRe
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
try {
- FileSystem fs = master.getFileSystem();
+ VolumeManager fs = master.getFileSystem();
Map<String,String> fileNameMappings = PopulateMetadataTable.readMappingFile(fs, tableInfo);
@@ -175,7 +176,7 @@ class PopulateMetadataTable extends Mast
this.tableInfo = ti;
}
- static Map<String,String> readMappingFile(FileSystem fs, ImportedTableInfo tableInfo) throws Exception {
+ static Map<String,String> readMappingFile(VolumeManager fs, ImportedTableInfo tableInfo) throws Exception {
BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(tableInfo.importDir, "mappings.txt"))));
try {
@@ -203,9 +204,9 @@ class PopulateMetadataTable extends Mast
ZipInputStream zis = null;
try {
- FileSystem fs = master.getFileSystem();
+ VolumeManager fs = master.getFileSystem();
- mbw = master.getConnector().createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
+ mbw = master.getConnector().createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
zis = new ZipInputStream(fs.open(path));
@@ -234,7 +235,7 @@ class PopulateMetadataTable extends Mast
Text cq;
- if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
+ if (key.getColumnFamily().equals(MetadataTable.DATAFILE_COLUMN_FAMILY)) {
String oldName = new Path(key.getColumnQualifier().toString()).getName();
String newName = fileNameMappings.get(oldName);
@@ -245,19 +246,19 @@ class PopulateMetadataTable extends Mast
if (m == null) {
m = new Mutation(metadataRow);
- Constants.METADATA_DIRECTORY_COLUMN.put(m, new Value(FastFormat.toZeroPaddedString(dirCount++, 8, 16, "/c-".getBytes())));
+ MetadataTable.DIRECTORY_COLUMN.put(m, new Value(FastFormat.toZeroPaddedString(dirCount++, 8, 16, "/c-".getBytes())));
currentRow = metadataRow;
}
if (!currentRow.equals(metadataRow)) {
mbw.addMutation(m);
m = new Mutation(metadataRow);
- Constants.METADATA_DIRECTORY_COLUMN.put(m, new Value(FastFormat.toZeroPaddedString(dirCount++, 8, 16, "/c-".getBytes())));
+ MetadataTable.DIRECTORY_COLUMN.put(m, new Value(FastFormat.toZeroPaddedString(dirCount++, 8, 16, "/c-".getBytes())));
}
m.put(key.getColumnFamily(), cq, val);
- if (endRow == null && Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
+ if (endRow == null && MetadataTable.PREV_ROW_COLUMN.hasColumns(key)) {
mbw.addMutation(m);
break; // its the last column in the last row
}
@@ -311,10 +312,10 @@ class MapImportFileNames extends MasterR
BufferedWriter mappingsWriter = null;
try {
- FileSystem fs = environment.getFileSystem();
+ VolumeManager fs = environment.getFileSystem();
fs.mkdirs(new Path(tableInfo.importDir));
-
+
FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir));
UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
@@ -323,7 +324,7 @@ class MapImportFileNames extends MasterR
for (FileStatus fileStatus : files) {
String fileName = fileStatus.getPath().getName();
-
+ log.info("filename " + fileStatus.getPath().toString());
String sa[] = fileName.split("\\.");
String extension = "";
if (sa.length > 1) {
@@ -365,7 +366,7 @@ class MapImportFileNames extends MasterR
@Override
public void undo(long tid, Master env) throws Exception {
- env.getFileSystem().delete(new Path(tableInfo.importDir), true);
+ env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir));
}
}
@@ -380,11 +381,12 @@ class CreateImportDir extends MasterRepo
}
@Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
+ public Repo<Master> call(long tid, Master master) throws Exception {
UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
- Path directory = new Path(ServerConstants.getTablesDir() + "/" + tableInfo.tableId);
+ Path base = master.getFileSystem().matchingFileSystem(new Path(tableInfo.exportDir), ServerConstants.getTablesDirs());
+ Path directory = new Path(base, tableInfo.tableId);
Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
@@ -409,12 +411,13 @@ class ImportPopulateZookeeper extends Ma
return Utils.reserveTable(tableInfo.tableId, tid, true, false, TableOperation.IMPORT);
}
- private Map<String,String> getExportedProps(FileSystem fs) throws Exception {
+ private Map<String,String> getExportedProps(VolumeManager fs) throws Exception {
Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
try {
- return TableOperationsImpl.getExportedProps(fs, path);
+ FileSystem ns = fs.getFileSystemByPath(path);
+ return TableOperationsImpl.getExportedProps(ns, path);
} catch (IOException ioe) {
throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
"Error reading table props from " + path + " " + ioe.getMessage());
@@ -570,7 +573,7 @@ public class ImportTable extends MasterR
throw new ThriftTableOperationException(null, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
"Incompatible export version " + exportVersion);
- if (dataVersion == null || dataVersion > Constants.DATA_VERSION)
+ if (dataVersion == null || dataVersion > ServerConstants.DATA_VERSION)
throw new ThriftTableOperationException(null, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
"Incompatible data version " + exportVersion);