You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/01/05 21:09:53 UTC
[1/2] accumulo git commit: ACCUMULO-3462 reduce the opportunity for
the queued tablet to get mis-marked
Repository: accumulo
Updated Branches:
refs/heads/1.5 3bcc3c9ed -> 07422efc5
http://git-wip-us.apache.org/repos/asf/accumulo/blob/07422efc/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
index ddae38a..4a99667 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
@@ -139,20 +139,20 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
* We need to be able to have the master tell a tabletServer to
* close this file, and the tablet server to handle all pending client reads
* before closing
- *
+ *
*/
/**
- *
+ *
* this class just provides an interface to read from a MapFile mostly takes care of reporting start and end keys
- *
+ *
* need this because a single row extent can have multiple columns this manages all the columns (each handled by a store) for a single row-extent
- *
- *
+ *
+ *
*/
public class Tablet {
-
+
enum MajorCompactionReason {
// do not change the order, the order of this enum determines the order
// in which queued major compactions are executed
@@ -161,44 +161,44 @@ public class Tablet {
NORMAL,
IDLE
}
-
+
enum MinorCompactionReason {
USER, SYSTEM, CLOSE
}
public class CommitSession {
-
+
private int seq;
private InMemoryMap memTable;
private int commitsInProgress;
private long maxCommittedTime = Long.MIN_VALUE;
-
+
private CommitSession(int seq, InMemoryMap imm) {
this.seq = seq;
this.memTable = imm;
commitsInProgress = 0;
}
-
+
public int getWALogSeq() {
return seq;
}
-
+
private void decrementCommitsInProgress() {
if (commitsInProgress < 1)
throw new IllegalStateException("commitsInProgress = " + commitsInProgress);
-
+
commitsInProgress--;
if (commitsInProgress == 0)
Tablet.this.notifyAll();
}
-
+
private void incrementCommitsInProgress() {
if (commitsInProgress < 0)
throw new IllegalStateException("commitsInProgress = " + commitsInProgress);
-
+
commitsInProgress++;
}
-
+
private void waitForCommitsToFinish() {
while (commitsInProgress > 0) {
try {
@@ -208,105 +208,105 @@ public class Tablet {
}
}
}
-
+
public void abortCommit(List<Mutation> value) {
Tablet.this.abortCommit(this, value);
}
-
+
public void commit(List<Mutation> mutations) {
Tablet.this.commit(this, mutations);
}
-
+
public Tablet getTablet() {
return Tablet.this;
}
-
+
public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) {
return Tablet.this.beginUpdatingLogsUsed(memTable, copy, mincFinish);
}
-
+
public void finishUpdatingLogsUsed() {
Tablet.this.finishUpdatingLogsUsed();
}
-
+
public int getLogId() {
return logId;
}
-
+
public KeyExtent getExtent() {
return extent;
}
-
+
private void updateMaxCommittedTime(long time) {
maxCommittedTime = Math.max(time, maxCommittedTime);
}
-
+
private long getMaxCommittedTime() {
if (maxCommittedTime == Long.MIN_VALUE)
throw new IllegalStateException("Tried to read max committed time when it was never set");
return maxCommittedTime;
}
-
+
}
-
+
private class TabletMemory {
private InMemoryMap memTable;
private InMemoryMap otherMemTable;
private InMemoryMap deletingMemTable;
private int nextSeq = 1;
private CommitSession commitSession;
-
+
TabletMemory() {
memTable = new InMemoryMap(tabletServer.getSystemConfiguration());
commitSession = new CommitSession(nextSeq, memTable);
nextSeq += 2;
}
-
+
InMemoryMap getMemTable() {
return memTable;
}
-
+
InMemoryMap getMinCMemTable() {
return otherMemTable;
}
-
+
CommitSession prepareForMinC() {
if (otherMemTable != null) {
throw new IllegalStateException();
}
-
+
if (deletingMemTable != null) {
throw new IllegalStateException();
}
-
+
otherMemTable = memTable;
memTable = new InMemoryMap(tabletServer.getSystemConfiguration());
-
+
CommitSession oldCommitSession = commitSession;
commitSession = new CommitSession(nextSeq, memTable);
nextSeq += 2;
-
+
tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), otherMemTable.estimatedSizeInBytes());
-
+
return oldCommitSession;
}
-
+
void finishedMinC() {
-
+
if (otherMemTable == null) {
throw new IllegalStateException();
}
-
+
if (deletingMemTable != null) {
throw new IllegalStateException();
}
-
+
deletingMemTable = otherMemTable;
-
+
otherMemTable = null;
Tablet.this.notifyAll();
}
-
+
void finalizeMinC() {
try {
deletingMemTable.delete(15000);
@@ -315,22 +315,22 @@ public class Tablet {
if (otherMemTable != null) {
throw new IllegalStateException();
}
-
+
if (deletingMemTable == null) {
throw new IllegalStateException();
}
-
+
deletingMemTable = null;
-
+
tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0);
}
}
}
-
+
boolean memoryReservedForMinC() {
return otherMemTable != null || deletingMemTable != null;
}
-
+
void waitForMinC() {
while (otherMemTable != null || deletingMemTable != null) {
try {
@@ -340,21 +340,21 @@ public class Tablet {
}
}
}
-
+
void mutate(CommitSession cm, List<Mutation> mutations) {
cm.memTable.mutate(mutations);
}
-
+
void updateMemoryUsageStats() {
long other = 0;
if (otherMemTable != null)
other = otherMemTable.estimatedSizeInBytes();
else if (deletingMemTable != null)
other = deletingMemTable.estimatedSizeInBytes();
-
+
tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other);
}
-
+
List<MemoryIterator> getIterators() {
List<MemoryIterator> toReturn = new ArrayList<MemoryIterator>(2);
toReturn.add(memTable.skvIterator());
@@ -362,50 +362,50 @@ public class Tablet {
toReturn.add(otherMemTable.skvIterator());
return toReturn;
}
-
+
void returnIterators(List<MemoryIterator> iters) {
for (MemoryIterator iter : iters) {
iter.close();
}
}
-
+
public long getNumEntries() {
if (otherMemTable != null)
return memTable.getNumEntries() + otherMemTable.getNumEntries();
return memTable.getNumEntries();
}
-
+
CommitSession getCommitSession() {
return commitSession;
}
}
-
+
private TabletMemory tabletMemory;
-
+
private final TabletTime tabletTime;
private long persistedTime;
private final Object timeLock = new Object();
-
+
private final Path location; // absolute path of this tablets dir
private TServerInstance lastLocation;
-
+
private Configuration conf;
private FileSystem fs;
-
+
private TableConfiguration acuTableConf;
-
+
private volatile boolean tableDirChecked = false;
-
+
private AtomicLong dataSourceDeletions = new AtomicLong(0);
private Set<ScanDataSource> activeScans = new HashSet<ScanDataSource>();
-
+
private volatile boolean closing = false;
private boolean closed = false;
private boolean closeComplete = false;
-
+
private long lastFlushID = -1;
private long lastCompactID = -1;
-
+
private static class CompactionWaitInfo {
long flushID = -1;
long compactionID = -1;
@@ -415,7 +415,7 @@ public class Tablet {
private CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo();
private KeyExtent extent;
-
+
private TabletResourceManager tabletResources;
final private DatafileManager datafileManager;
private volatile boolean majorCompactionInProgress = false;
@@ -423,127 +423,127 @@ public class Tablet {
private Set<MajorCompactionReason> majorCompactionQueued = Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class));
private volatile boolean minorCompactionInProgress = false;
private volatile boolean minorCompactionWaitingToStart = false;
-
+
private boolean updatingFlushID = false;
-
+
private AtomicReference<ConstraintChecker> constraintChecker = new AtomicReference<ConstraintChecker>();
-
+
private final String tabletDirectory;
-
+
private int writesInProgress = 0;
-
+
private static final Logger log = Logger.getLogger(Tablet.class);
public TabletStatsKeeper timer;
-
+
private Rate queryRate = new Rate(0.2);
private long queryCount = 0;
-
+
private Rate queryByteRate = new Rate(0.2);
private long queryBytes = 0;
-
+
private Rate ingestRate = new Rate(0.2);
private long ingestCount = 0;
-
+
private Rate ingestByteRate = new Rate(0.2);
private long ingestBytes = 0;
-
+
private byte[] defaultSecurityLabel = new byte[0];
-
+
private long lastMinorCompactionFinishTime;
private long lastMapFileImportTime;
-
+
private volatile long numEntries;
private volatile long numEntriesInMemory;
-
+
// a count of the amount of data read by the iterators
private AtomicLong scannedCount = new AtomicLong(0);
private Rate scannedRate = new Rate(0.2);
private ConfigurationObserver configObserver;
-
+
private TabletServer tabletServer;
-
+
private final int logId;
// ensure we only have one reader/writer of our bulk file notes at at time
public final Object bulkFileImportLock = new Object();
-
+
public int getLogId() {
return logId;
}
-
+
public static class TabletClosedException extends RuntimeException {
public TabletClosedException(Exception e) {
super(e);
}
-
+
public TabletClosedException() {
super();
}
-
+
private static final long serialVersionUID = 1L;
}
-
+
String getNextMapFilename(String prefix) throws IOException {
String extension = FileOperations.getNewFileExtension(tabletServer.getTableConfiguration(extent));
checkTabletDir();
return location.toString() + "/" + prefix + UniqueNameAllocator.getInstance().getNextName() + "." + extension;
}
-
+
private void checkTabletDir() throws IOException {
if (!tableDirChecked) {
checkTabletDir(this.location);
tableDirChecked = true;
}
}
-
+
private void checkTabletDir(Path tabletDir) throws IOException {
-
+
FileStatus[] files = null;
try {
files = fs.listStatus(tabletDir);
} catch (FileNotFoundException ex) {
// ignored
}
-
+
if (files == null) {
if (tabletDir.getName().startsWith("c-"))
log.debug("Tablet " + extent + " had no dir, creating " + tabletDir); // its a clone dir...
else
log.warn("Tablet " + extent + " had no dir, creating " + tabletDir);
-
+
fs.mkdirs(tabletDir);
}
}
-
+
private static String rel2abs(String relPath, KeyExtent extent) {
if (relPath.startsWith("../"))
return ServerConstants.getTablesDir() + relPath.substring(2);
else
return ServerConstants.getTablesDir() + "/" + extent.getTableId() + relPath;
}
-
+
class DatafileManager {
// access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles
final private Map<Path,DataFileValue> datafileSizes = Collections.synchronizedMap(new TreeMap<Path,DataFileValue>());
-
+
DatafileManager(SortedMap<String,DataFileValue> datafileSizes) {
for (Entry<String,DataFileValue> datafiles : datafileSizes.entrySet())
this.datafileSizes.put(new Path(rel2abs(datafiles.getKey(), extent)), datafiles.getValue());
}
-
+
Path mergingMinorCompactionFile = null;
Set<Path> filesToDeleteAfterScan = new HashSet<Path>();
Map<Long,Set<Path>> scanFileReservations = new HashMap<Long,Set<Path>>();
MapCounter<Path> fileScanReferenceCounts = new MapCounter<Path>();
long nextScanReservationId = 0;
boolean reservationsBlocked = false;
-
+
Set<Path> majorCompactingFiles = new HashSet<Path>();
-
+
Pair<Long,Map<String,DataFileValue>> reserveFilesForScan() {
synchronized (Tablet.this) {
-
+
while (reservationsBlocked) {
try {
Tablet.this.wait(50);
@@ -551,34 +551,34 @@ public class Tablet {
log.warn(e, e);
}
}
-
+
Set<Path> absFilePaths = new HashSet<Path>(datafileSizes.keySet());
-
+
long rid = nextScanReservationId++;
-
+
scanFileReservations.put(rid, absFilePaths);
-
+
Map<String,DataFileValue> ret = new HashMap<String,MetadataTable.DataFileValue>();
-
+
for (Path path : absFilePaths) {
fileScanReferenceCounts.increment(path, 1);
ret.put(path.toString(), datafileSizes.get(path));
}
-
+
return new Pair<Long,Map<String,DataFileValue>>(rid, ret);
}
}
-
+
void returnFilesForScan(Long reservationId) {
-
+
final Set<Path> filesToDelete = new HashSet<Path>();
-
+
synchronized (Tablet.this) {
Set<Path> absFilePaths = scanFileReservations.remove(reservationId);
-
+
if (absFilePaths == null)
throw new IllegalArgumentException("Unknown scan reservation id " + reservationId);
-
+
boolean notify = false;
for (Path path : absFilePaths) {
long refCount = fileScanReferenceCounts.decrement(path, 1);
@@ -589,32 +589,32 @@ public class Tablet {
} else if (refCount < 0)
throw new IllegalStateException("Scan ref count for " + path + " is " + refCount);
}
-
+
if (notify)
Tablet.this.notifyAll();
}
-
+
if (filesToDelete.size() > 0) {
log.debug("Removing scan refs from metadata " + extent + " " + abs2rel(filesToDelete));
MetadataTable.removeScanFiles(extent, abs2rel(filesToDelete), SecurityConstants.getSystemCredentials(), tabletServer.getLock());
}
}
-
+
private void removeFilesAfterScanRel(Set<String> relPaths) {
Set<Path> scanFiles = new HashSet<Path>();
-
+
for (String rpath : relPaths)
scanFiles.add(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId() + rpath));
-
+
removeFilesAfterScan(scanFiles);
}
-
+
private void removeFilesAfterScan(Set<Path> scanFiles) {
if (scanFiles.size() == 0)
return;
-
+
Set<Path> filesToDelete = new HashSet<Path>();
-
+
synchronized (Tablet.this) {
for (Path path : scanFiles) {
if (fileScanReferenceCounts.get(path) == 0)
@@ -623,26 +623,26 @@ public class Tablet {
filesToDeleteAfterScan.add(path);
}
}
-
+
if (filesToDelete.size() > 0) {
log.debug("Removing scan refs from metadata " + extent + " " + abs2rel(filesToDelete));
MetadataTable.removeScanFiles(extent, abs2rel(filesToDelete), SecurityConstants.getSystemCredentials(), tabletServer.getLock());
}
}
-
+
private TreeSet<Path> waitForScansToFinish(Set<Path> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) {
long startTime = System.currentTimeMillis();
TreeSet<Path> inUse = new TreeSet<Path>();
-
+
Span waitForScans = Trace.start("waitForScans");
synchronized (Tablet.this) {
if (blockNewScans) {
if (reservationsBlocked)
throw new IllegalStateException();
-
+
reservationsBlocked = true;
}
-
+
for (Path path : pathsToWaitFor) {
while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) {
try {
@@ -652,43 +652,43 @@ public class Tablet {
}
}
}
-
+
for (Path path : pathsToWaitFor) {
if (fileScanReferenceCounts.get(path) > 0)
inUse.add(path);
}
-
+
if (blockNewScans) {
reservationsBlocked = false;
Tablet.this.notifyAll();
}
-
+
}
waitForScans.stop();
return inUse;
}
-
+
public void importMapFiles(long tid, Map<String,DataFileValue> pathsString, boolean setTime) throws IOException {
-
+
String bulkDir = null;
-
+
Map<Path,DataFileValue> paths = new HashMap<Path,MetadataTable.DataFileValue>();
for (Entry<String,DataFileValue> entry : pathsString.entrySet())
paths.put(new Path(entry.getKey()), entry.getValue());
-
+
for (Path tpath : paths.keySet()) {
-
+
if (!tpath.getParent().getParent().equals(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId()))) {
throw new IOException("Map file " + tpath + " not in table dir " + ServerConstants.getTablesDir() + "/" + extent.getTableId());
}
-
+
if (bulkDir == null)
bulkDir = tpath.getParent().toString();
else if (!bulkDir.equals(tpath.getParent().toString()))
throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + " " + tpath);
-
+
}
-
+
if (extent.isRootTablet()) {
throw new IllegalArgumentException("Can not import files to root tablet");
}
@@ -706,7 +706,7 @@ public class Tablet {
for (String file : files)
if (paths.keySet().remove(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId() + file)))
log.debug("Ignoring request to re-import a file already imported: " + extent + ": " + file);
-
+
if (paths.size() > 0) {
long bulkTime = Long.MIN_VALUE;
if (setTime) {
@@ -722,40 +722,40 @@ public class Tablet {
synchronized (timeLock) {
if (bulkTime > persistedTime)
persistedTime = bulkTime;
-
+
MetadataTable.updateTabletDataFile(tid, extent, abs2rel(paths), tabletTime.getMetadataValue(persistedTime), auths, tabletServer.getLock());
}
}
}
-
+
synchronized (Tablet.this) {
for (Entry<Path,DataFileValue> tpath : paths.entrySet()) {
if (datafileSizes.containsKey(tpath.getKey())) {
log.error("Adding file that is already in set " + tpath.getKey());
}
datafileSizes.put(tpath.getKey(), tpath.getValue());
-
+
}
-
+
tabletResources.importedMapFiles();
-
+
computeNumEntries();
}
-
+
for (Entry<Path,DataFileValue> entry : paths.entrySet()) {
log.log(TLevel.TABLET_HIST, extent + " import " + abs2rel(entry.getKey()) + " " + entry.getValue());
}
}
-
+
String reserveMergingMinorCompactionFile() {
if (mergingMinorCompactionFile != null)
throw new IllegalStateException("Tried to reserve merging minor compaction file when already reserved : " + mergingMinorCompactionFile);
-
+
if (extent.isRootTablet())
return null;
-
+
int maxFiles = acuTableConf.getMaxFilesPerTablet();
-
+
// when a major compaction is running and we are at max files, write out
// one extra file... want to avoid the case where major compaction is
// compacting everything except for the largest file, and therefore the
@@ -764,45 +764,45 @@ public class Tablet {
// are canceled
if (majorCompactingFiles.size() > 0 && datafileSizes.size() == maxFiles)
return null;
-
+
if (datafileSizes.size() >= maxFiles) {
// find the smallest file
-
+
long min = Long.MAX_VALUE;
Path minName = null;
-
+
for (Entry<Path,DataFileValue> entry : datafileSizes.entrySet()) {
if (entry.getValue().getSize() < min && !majorCompactingFiles.contains(entry.getKey())) {
min = entry.getValue().getSize();
minName = entry.getKey();
}
}
-
+
if (minName == null)
return null;
-
+
mergingMinorCompactionFile = minName;
return minName.toString();
}
-
+
return null;
}
-
+
void unreserveMergingMinorCompactionFile(Path file) {
if ((file == null && mergingMinorCompactionFile != null) || (file != null && mergingMinorCompactionFile == null)
|| (file != null && mergingMinorCompactionFile != null && !file.equals(mergingMinorCompactionFile)))
throw new IllegalStateException("Disagreement " + file + " " + mergingMinorCompactionFile);
-
+
mergingMinorCompactionFile = null;
}
-
+
void bringMinorCompactionOnline(String tmpDatafile, String newDatafile, String absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) {
bringMinorCompactionOnline(new Path(tmpDatafile), new Path(newDatafile), absMergeFile == null ? null : new Path(absMergeFile), dfv, commitSession,
flushId);
}
-
+
void bringMinorCompactionOnline(Path tmpDatafile, Path newDatafile, Path absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) {
-
+
IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
if (extent.isRootTablet()) {
try {
@@ -813,7 +813,7 @@ public class Tablet {
throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
}
}
-
+
// rename before putting in metadata table, so files in metadata table should
// always exist
do {
@@ -825,7 +825,7 @@ public class Tablet {
log.warn("Target map file already exist " + newDatafile);
fs.delete(newDatafile, true);
}
-
+
rename(fs, tmpDatafile, newDatafile);
}
break;
@@ -834,9 +834,9 @@ public class Tablet {
UtilWaitThread.sleep(60 * 1000);
}
} while (true);
-
+
long t1, t2;
-
+
// the code below always assumes merged files are in use by scans... this must be done
// because the in memory list of files is not updated until after the metadata table
// therefore the file is available to scans until memory is updated, but want to ensure
@@ -849,12 +849,12 @@ public class Tablet {
Set<Path> filesInUseByScans = Collections.emptySet();
if (absMergeFile != null)
filesInUseByScans = Collections.singleton(absMergeFile);
-
+
// very important to write delete entries outside of log lock, because
// this !METADATA write does not go up... it goes sideways or to itself
if (absMergeFile != null)
MetadataTable.addDeleteEntries(extent, Collections.singleton(abs2rel(absMergeFile)), SecurityConstants.getSystemCredentials());
-
+
Set<String> unusedWalLogs = beginClearingUnusedLogs();
try {
// the order of writing to !METADATA and walog is important in the face of machine/process failures
@@ -862,25 +862,25 @@ public class Tablet {
// data could be lost... the minor compaction start even should be written before the following metadata
// write is made
TCredentials creds = SecurityConstants.getSystemCredentials();
-
+
synchronized (timeLock) {
if (commitSession.getMaxCommittedTime() > persistedTime)
persistedTime = commitSession.getMaxCommittedTime();
-
+
String time = tabletTime.getMetadataValue(persistedTime);
MetadataTable.updateTabletDataFile(extent, abs2rel(newDatafile), abs2rel(absMergeFile), dfv, time, creds, abs2rel(filesInUseByScans),
tabletServer.getClientAddressString(), tabletServer.getLock(), unusedWalLogs, lastLocation, flushId);
}
-
+
} finally {
finishClearingUnusedLogs();
}
-
+
do {
try {
// the purpose of making this update use the new commit session, instead of the old one passed in,
// is because the new one will reference the logs used by current memory...
-
+
tabletServer.minorCompactionFinished(tabletMemory.getCommitSession(), newDatafile.toString(), commitSession.getWALogSeq() + 2);
break;
} catch (IOException e) {
@@ -888,37 +888,37 @@ public class Tablet {
UtilWaitThread.sleep(1 * 1000);
}
} while (true);
-
+
synchronized (Tablet.this) {
lastLocation = null;
-
+
t1 = System.currentTimeMillis();
if (datafileSizes.containsKey(newDatafile)) {
log.error("Adding file that is already in set " + newDatafile);
}
-
+
if (dfv.getNumEntries() > 0) {
datafileSizes.put(newDatafile, dfv);
}
-
+
if (absMergeFile != null) {
datafileSizes.remove(absMergeFile);
}
-
+
unreserveMergingMinorCompactionFile(absMergeFile);
-
+
dataSourceDeletions.incrementAndGet();
tabletMemory.finishedMinC();
-
+
lastFlushID = flushId;
-
+
computeNumEntries();
t2 = System.currentTimeMillis();
}
-
+
// must do this after list of files in memory is updated above
removeFilesAfterScan(filesInUseByScans);
-
+
if (absMergeFile != null)
log.log(TLevel.TABLET_HIST, extent + " MinC [" + abs2rel(absMergeFile) + ",memory] -> " + abs2rel(newDatafile));
else
@@ -928,95 +928,95 @@ public class Tablet {
log.debug(String.format("Minor Compaction wrote out file larger than split threshold. split threshold = %,d file size = %,d",
acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD), dfv.getSize()));
}
-
+
}
-
+
private Map<String,DataFileValue> abs2rel(Map<Path,DataFileValue> paths) {
TreeMap<String,DataFileValue> relMap = new TreeMap<String,MetadataTable.DataFileValue>();
-
+
for (Entry<Path,DataFileValue> entry : paths.entrySet())
relMap.put(abs2rel(entry.getKey()), entry.getValue());
-
+
return relMap;
}
-
+
private Set<String> abs2rel(Set<Path> absPaths) {
Set<String> relativePaths = new TreeSet<String>();
for (Path absPath : absPaths)
relativePaths.add(abs2rel(absPath));
-
+
return relativePaths;
}
-
+
private Set<Path> string2path(Set<String> strings) {
Set<Path> paths = new HashSet<Path>();
for (String path : strings)
paths.add(new Path(path));
-
+
return paths;
}
-
+
private String abs2rel(Path absPath) {
if (absPath == null)
return null;
-
+
if (absPath.getParent().getParent().getName().equals(extent.getTableId().toString()))
return "/" + absPath.getParent().getName() + "/" + absPath.getName();
else
return "../" + absPath.getParent().getParent().getName() + "/" + absPath.getParent().getName() + "/" + absPath.getName();
}
-
+
public void reserveMajorCompactingFiles(Set<String> files) {
if (majorCompactingFiles.size() != 0)
throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles);
-
+
Set<Path> mcf = string2path(files);
if (mergingMinorCompactionFile != null && mcf.contains(mergingMinorCompactionFile))
throw new IllegalStateException("Major compaction tried to resrve file in use by minor compaction " + mergingMinorCompactionFile);
-
+
majorCompactingFiles.addAll(mcf);
}
-
+
public void clearMajorCompactingFile() {
majorCompactingFiles.clear();
}
-
+
void bringMajorCompactionOnline(Set<String> oldDatafiles, String tmpDatafile, String newDatafile, Long compactionId, DataFileValue dfv) throws IOException {
bringMajorCompactionOnline(string2path(oldDatafiles), new Path(tmpDatafile), new Path(newDatafile), compactionId, dfv);
}
-
+
void bringMajorCompactionOnline(Set<Path> oldDatafiles, Path tmpDatafile, Path newDatafile, Long compactionId, DataFileValue dfv) throws IOException {
long t1, t2;
-
+
if (!extent.isRootTablet()) {
-
+
if (fs.exists(newDatafile)) {
log.error("Target map file already exist " + newDatafile, new Exception());
throw new IllegalStateException("Target map file already exist " + newDatafile);
}
-
+
// rename before putting in metadata table, so files in metadata table should
// always exist
rename(fs, tmpDatafile, newDatafile);
-
+
if (dfv.getNumEntries() == 0) {
fs.delete(newDatafile, true);
}
}
-
+
TServerInstance lastLocation = null;
synchronized (Tablet.this) {
-
+
t1 = System.currentTimeMillis();
-
+
IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
-
+
dataSourceDeletions.incrementAndGet();
-
+
if (extent.isRootTablet()) {
-
+
waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE);
-
+
try {
if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) {
throw new IllegalStateException();
@@ -1024,25 +1024,25 @@ public class Tablet {
} catch (Exception e) {
throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
}
-
+
// mark files as ready for deletion, but
// do not delete them until we successfully
// rename the compacted map file, in case
// the system goes down
-
+
String compactName = newDatafile.getName();
-
+
for (Path path : oldDatafiles) {
rename(fs, path, new Path(location + "/delete+" + compactName + "+" + path.getName()));
}
-
+
if (fs.exists(newDatafile)) {
log.error("Target map file already exist " + newDatafile, new Exception());
throw new IllegalStateException("Target map file already exist " + newDatafile);
}
-
+
rename(fs, tmpDatafile, newDatafile);
-
+
// start deleting files, if we do not finish they will be cleaned
// up later
Trash trash = new Trash(fs, fs.getConf());
@@ -1052,7 +1052,7 @@ public class Tablet {
fs.delete(deleteFile, true);
}
}
-
+
// atomically remove old files and add new file
for (Path oldDatafile : oldDatafiles) {
if (!datafileSizes.containsKey(oldDatafile)) {
@@ -1061,29 +1061,29 @@ public class Tablet {
datafileSizes.remove(oldDatafile);
majorCompactingFiles.remove(oldDatafile);
}
-
+
if (datafileSizes.containsKey(newDatafile)) {
log.error("Adding file that is already in set " + newDatafile);
}
-
+
if (dfv.getNumEntries() > 0) {
datafileSizes.put(newDatafile, dfv);
}
-
+
// could be used by a follow on compaction in a multipass compaction
majorCompactingFiles.add(newDatafile);
-
+
computeNumEntries();
-
+
lastLocation = Tablet.this.lastLocation;
Tablet.this.lastLocation = null;
-
+
if (compactionId != null)
lastCompactID = compactionId;
-
+
t2 = System.currentTimeMillis();
}
-
+
if (!extent.isRootTablet()) {
Set<Path> filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000);
if (filesInUseByScans.size() > 0)
@@ -1092,37 +1092,37 @@ public class Tablet {
SecurityConstants.getSystemCredentials(), tabletServer.getClientAddressString(), lastLocation, tabletServer.getLock());
removeFilesAfterScan(filesInUseByScans);
}
-
+
log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0));
log.log(TLevel.TABLET_HIST, extent + " MajC " + abs2rel(oldDatafiles) + " --> " + abs2rel(newDatafile));
}
-
+
public SortedMap<String,DataFileValue> getDatafileSizesRel() {
synchronized (Tablet.this) {
TreeMap<String,DataFileValue> files = new TreeMap<String,MetadataTable.DataFileValue>();
Set<Entry<Path,DataFileValue>> es = datafileSizes.entrySet();
-
+
for (Entry<Path,DataFileValue> entry : es) {
files.put(abs2rel(entry.getKey()), entry.getValue());
}
-
+
return Collections.unmodifiableSortedMap(files);
}
}
-
+
public SortedMap<String,DataFileValue> getDatafileSizes() {
synchronized (Tablet.this) {
TreeMap<String,DataFileValue> files = new TreeMap<String,MetadataTable.DataFileValue>();
Set<Entry<Path,DataFileValue>> es = datafileSizes.entrySet();
-
+
for (Entry<Path,DataFileValue> entry : es) {
files.put(entry.getKey().toString(), entry.getValue());
}
-
+
return Collections.unmodifiableSortedMap(files);
}
}
-
+
public Set<String> getFiles() {
synchronized (Tablet.this) {
HashSet<String> files = new HashSet<String>();
@@ -1132,38 +1132,38 @@ public class Tablet {
return Collections.unmodifiableSet(files);
}
}
-
+
}
-
+
public Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap<Key,Value> tabletsKeyValues)
throws IOException {
this(tabletServer, location, extent, trm, CachedConfiguration.getInstance(), tabletsKeyValues);
splitCreationTime = 0;
}
-
+
public Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap<String,DataFileValue> datafiles, String time,
long initFlushID, long initCompactID) throws IOException {
this(tabletServer, location, extent, trm, CachedConfiguration.getInstance(), datafiles, time, initFlushID, initCompactID);
splitCreationTime = System.currentTimeMillis();
}
-
+
private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf,
SortedMap<Key,Value> tabletsKeyValues) throws IOException {
this(tabletServer, location, extent, trm, conf, TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration())),
tabletsKeyValues);
}
-
+
static private final List<LogEntry> EMPTY = Collections.emptyList();
-
+
private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf,
SortedMap<String,DataFileValue> datafiles, String time, long initFlushID, long initCompactID) throws IOException {
this(tabletServer, location, extent, trm, conf, TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration())), EMPTY,
datafiles, time, null, new HashSet<String>(), initFlushID, initCompactID);
}
-
+
private static String lookupTime(AccumuloConfiguration conf, KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
SortedMap<Key,Value> entries;
-
+
if (extent.isRootTablet()) {
return null;
} else {
@@ -1175,20 +1175,20 @@ public class Tablet {
}
}
}
-
+
// log.debug("extent : "+extent+" entries : "+entries);
-
+
if (entries.size() == 1)
return entries.values().iterator().next().toString();
return null;
}
-
+
private static SortedMap<String,DataFileValue> lookupDatafiles(AccumuloConfiguration conf, Text locText, FileSystem fs, KeyExtent extent,
SortedMap<Key,Value> tabletsKeyValues) throws IOException {
Path location = new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId().toString() + locText.toString());
-
+
TreeMap<String,DataFileValue> datafiles = new TreeMap<String,DataFileValue>();
-
+
if (extent.isRootTablet()) { // the meta0 tablet
// cleanUpFiles() has special handling for delete. files
FileStatus[] files = fs.listStatus(location);
@@ -1203,49 +1203,49 @@ public class Tablet {
datafiles.put(locText.toString() + "/" + filename, dfv);
}
} else {
-
+
SortedMap<Key,Value> datafilesMetadata;
-
+
Text rowName = extent.getMetadataEntry();
-
+
ScannerImpl mdScanner = new ScannerImpl(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), Constants.METADATA_TABLE_ID,
Constants.NO_AUTHS);
-
+
// Commented out because when no data file is present, each tablet will scan through metadata table and return nothing
// reduced batch size to improve performance
// changed here after endKeys were implemented from 10 to 1000
mdScanner.setBatchSize(1000);
-
+
// leave these in, again, now using endKey for safety
mdScanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
-
+
mdScanner.setRange(new Range(rowName));
-
+
datafilesMetadata = new TreeMap<Key,Value>();
-
+
for (Entry<Key,Value> entry : mdScanner) {
-
+
if (entry.getKey().compareRow(rowName) != 0) {
break;
}
-
+
datafilesMetadata.put(new Key(entry.getKey()), new Value(entry.getValue()));
}
-
+
Iterator<Entry<Key,Value>> dfmdIter = datafilesMetadata.entrySet().iterator();
-
+
while (dfmdIter.hasNext()) {
Entry<Key,Value> entry = dfmdIter.next();
-
+
datafiles.put(entry.getKey().getColumnQualifier().toString(), new DataFileValue(entry.getValue().get()));
}
}
return datafiles;
}
-
+
private static List<LogEntry> lookupLogEntries(KeyExtent ke, SortedMap<Key,Value> tabletsKeyValues) {
List<LogEntry> logEntries = new ArrayList<LogEntry>();
-
+
if (ke.isMeta()) {
try {
logEntries = MetadataTable.getLogEntries(SecurityConstants.getSystemCredentials(), ke);
@@ -1264,14 +1264,14 @@ public class Tablet {
}
}
}
-
+
log.debug("got " + logEntries + " for logs for " + ke);
return logEntries;
}
-
+
private static Set<String> lookupScanFiles(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
HashSet<String> scanFiles = new HashSet<String>();
-
+
Text row = extent.getMetadataEntry();
for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
Key key = entry.getKey();
@@ -1279,10 +1279,10 @@ public class Tablet {
scanFiles.add(key.getColumnQualifier().toString());
}
}
-
+
return scanFiles;
}
-
+
private static long lookupFlushID(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
Text row = extent.getMetadataEntry();
for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
@@ -1290,10 +1290,10 @@ public class Tablet {
if (key.getRow().equals(row) && Constants.METADATA_FLUSH_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
return Long.parseLong(entry.getValue().toString());
}
-
+
return -1;
}
-
+
private static long lookupCompactID(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
Text row = extent.getMetadataEntry();
for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
@@ -1301,17 +1301,17 @@ public class Tablet {
if (key.getRow().equals(row) && Constants.METADATA_COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
return Long.parseLong(entry.getValue().toString());
}
-
+
return -1;
}
-
+
private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, FileSystem fs,
SortedMap<Key,Value> tabletsKeyValues) throws IOException {
this(tabletServer, location, extent, trm, conf, fs, lookupLogEntries(extent, tabletsKeyValues), lookupDatafiles(tabletServer.getSystemConfiguration(),
location, fs, extent, tabletsKeyValues), lookupTime(tabletServer.getSystemConfiguration(), extent, tabletsKeyValues), lookupLastServer(extent,
tabletsKeyValues), lookupScanFiles(extent, tabletsKeyValues), lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent, tabletsKeyValues));
}
-
+
private static TServerInstance lookupLastServer(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
if (entry.getKey().getColumnFamily().compareTo(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY) == 0) {
@@ -1320,7 +1320,7 @@ public class Tablet {
}
return null;
}
-
+
/**
* yet another constructor - this one allows us to avoid costly lookups into the Metadata table if we already know the files we need - as at split time
*/
@@ -1332,68 +1332,70 @@ public class Tablet {
this.tabletDirectory = location.toString();
this.conf = conf;
this.acuTableConf = tabletServer.getTableConfiguration(extent);
-
+
this.fs = fs;
this.extent = extent;
this.tabletResources = trm;
-
+
this.lastFlushID = initFlushID;
this.lastCompactID = initCompactID;
-
+
if (extent.isRootTablet()) {
-
+
long rtime = Long.MIN_VALUE;
for (String path : datafiles.keySet()) {
String filename = new Path(path).getName();
-
+
FileSKVIterator reader = FileOperations.getInstance().openReader(this.location + "/" + filename, true, fs, fs.getConf(),
tabletServer.getTableConfiguration(extent));
long maxTime = -1;
try {
-
+
while (reader.hasTop()) {
maxTime = Math.max(maxTime, reader.getTopKey().getTimestamp());
reader.next();
}
-
+
} finally {
reader.close();
}
-
+
if (maxTime > rtime) {
time = TabletTime.LOGICAL_TIME_ID + "" + maxTime;
rtime = maxTime;
}
}
}
-
+
this.tabletServer = tabletServer;
this.logId = tabletServer.createLogId(extent);
-
+
this.timer = new TabletStatsKeeper();
-
+
setupDefaultSecurityLabels(extent);
-
+
tabletMemory = new TabletMemory();
tabletTime = TabletTime.getInstance(time);
persistedTime = tabletTime.getTime();
-
+
acuTableConf.addObserver(configObserver = new ConfigurationObserver() {
-
+
private void reloadConstraints() {
constraintChecker.set(new ConstraintChecker(getTableConfiguration()));
}
-
+
+ @Override
public void propertiesChanged() {
reloadConstraints();
-
+
try {
setupDefaultSecurityLabels(extent);
} catch (Exception e) {
log.error("Failed to reload default security labels for extent: " + extent.toString());
}
}
-
+
+ @Override
public void propertyChanged(String prop) {
if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey()))
reloadConstraints();
@@ -1405,19 +1407,20 @@ public class Tablet {
log.error("Failed to reload default security labels for extent: " + extent.toString());
}
}
-
+
}
-
+
+ @Override
public void sessionExpired() {
log.debug("Session expired, no longer updating per table props...");
}
-
+
});
// Force a load of any per-table properties
configObserver.propertiesChanged();
-
+
tabletResources.setTablet(this, acuTableConf);
-
+
if (!logEntries.isEmpty()) {
log.info("Starting Write-Ahead Log recovery for " + this.extent);
final long[] count = new long[2];
@@ -1427,8 +1430,9 @@ public class Tablet {
Set<String> absPaths = new HashSet<String>();
for (String relPath : datafiles.keySet())
absPaths.add(rel2abs(relPath, extent));
-
+
tabletServer.recover(this, logEntries, absPaths, new MutationReceiver() {
+ @Override
public void receive(Mutation m) {
// LogReader.printMutation(m);
Collection<ColumnUpdate> muts = m.getUpdates();
@@ -1443,17 +1447,17 @@ public class Tablet {
count[0]++;
}
});
-
+
if (count[1] != Long.MIN_VALUE) {
tabletTime.useMaxTimeFromWALog(count[1]);
}
commitSession.updateMaxCommittedTime(tabletTime.getTime());
-
+
if (count[0] == 0) {
MetadataTable.removeUnusedWALEntries(extent, logEntries, tabletServer.getLock());
logEntries.clear();
}
-
+
} catch (Throwable t) {
if (acuTableConf.getBoolean(Property.TABLE_FAILURES_IGNORE)) {
log.warn("Error recovering from log files: ", t);
@@ -1469,11 +1473,11 @@ public class Tablet {
currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.server, parts[1]));
}
}
-
+
log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries()
+ " entries created)");
}
-
+
String contextName = acuTableConf.get(Property.TABLE_CLASSPATH);
if (contextName != null && !contextName.equals("")) {
// initialize context classloader, instead of possibly waiting for it to initialize for a scan
@@ -1484,11 +1488,11 @@ public class Tablet {
// do this last after tablet is completely setup because it
// could cause major compaction to start
datafileManager = new DatafileManager(datafiles);
-
+
computeNumEntries();
-
+
datafileManager.removeFilesAfterScanRel(scanFiles);
-
+
// look for hints of a failure on the previous tablet server
if (!logEntries.isEmpty() || needsMajorCompaction(MajorCompactionReason.NORMAL)) {
// look for any temp files hanging around
@@ -1500,10 +1504,10 @@ public class Tablet {
// look for any temp files hanging around
removeOldTemporaryFiles();
}
-
+
log.log(TLevel.TABLET_HIST, extent + " opened ");
}
-
+
private void removeOldTemporaryFiles() {
// remove any temporary files created by a previous tablet server
try {
@@ -1532,18 +1536,18 @@ public class Tablet {
}
}
}
-
+
private static Collection<String> cleanUpFiles(FileSystem fs, FileStatus[] files, Path location, boolean deleteTmp) throws IOException {
/*
* called in constructor and before major compactions
*/
Collection<String> goodFiles = new ArrayList<String>(files.length);
-
+
for (FileStatus file : files) {
-
+
String path = file.getPath().toString();
String filename = file.getPath().getName();
-
+
// check for incomplete major compaction, this should only occur
// for root tablet
if (filename.startsWith("delete+")) {
@@ -1555,35 +1559,35 @@ public class Tablet {
continue;
}
// compaction did not finish, so put files back
-
+
// reset path and filename for rest of loop
filename = filename.split("\\+", 3)[2];
path = location + "/" + filename;
-
+
rename(fs, file.getPath(), new Path(path));
}
-
+
if (filename.endsWith("_tmp")) {
if (deleteTmp) {
log.warn("cleaning up old tmp file: " + path);
if (!fs.delete(file.getPath(), true))
log.warn("Delete of tmp file: " + file.getPath().toString() + " return false");
-
+
}
continue;
}
-
+
if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) {
log.error("unknown file in tablet: " + path);
continue;
}
-
+
goodFiles.add(path);
}
-
+
return goodFiles;
}
-
+
public static class KVEntry extends KeyValue {
private static final long serialVersionUID = 1L;
@@ -1594,58 +1598,58 @@ public class Tablet {
int numBytes() {
return getKey().getSize() + getValue().get().length;
}
-
+
int estimateMemoryUsed() {
return getKey().getSize() + getValue().get().length + (9 * 32); // overhead is 32 per object
}
}
-
+
private LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges, HashSet<Column> columnSet, ArrayList<KVEntry> results,
long maxResultsSize) throws IOException {
-
+
LookupResult lookupResult = new LookupResult();
-
+
boolean exceededMemoryUsage = false;
boolean tabletClosed = false;
-
+
Set<ByteSequence> cfset = null;
if (columnSet.size() > 0)
cfset = LocalityGroupUtil.families(columnSet);
-
+
for (Range range : ranges) {
-
+
if (exceededMemoryUsage || tabletClosed) {
lookupResult.unfinishedRanges.add(range);
continue;
}
-
+
int entriesAdded = 0;
-
+
try {
if (cfset != null)
mmfi.seek(range, cfset, true);
else
mmfi.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
-
+
while (mmfi.hasTop()) {
Key key = mmfi.getTopKey();
-
+
KVEntry kve = new KVEntry(key, mmfi.getTopValue());
results.add(kve);
entriesAdded++;
lookupResult.bytesAdded += kve.estimateMemoryUsed();
lookupResult.dataSize += kve.numBytes();
-
+
exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize;
-
+
if (exceededMemoryUsage) {
addUnfinishedRange(lookupResult, range, key, false);
break;
}
-
+
mmfi.next();
}
-
+
} catch (TooManyFilesException tmfe) {
// treat this as a closed tablet, and let the client retry
log.warn("Tablet " + getExtent() + " has too many files, batch lookup can not run");
@@ -1671,61 +1675,61 @@ public class Tablet {
handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded);
tabletClosed = true;
}
-
+
}
-
+
return lookupResult;
}
-
+
private void handleTabletClosedDuringScan(ArrayList<KVEntry> results, LookupResult lookupResult, boolean exceededMemoryUsage, Range range, int entriesAdded) {
if (exceededMemoryUsage)
throw new IllegalStateException("tablet should not exceed memory usage or close, not both");
-
+
if (entriesAdded > 0)
addUnfinishedRange(lookupResult, range, results.get(results.size() - 1).getKey(), false);
else
lookupResult.unfinishedRanges.add(range);
-
+
lookupResult.closed = true;
}
-
+
private void addUnfinishedRange(LookupResult lookupResult, Range range, Key key, boolean inclusiveStartKey) {
if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) {
Range nlur = new Range(new Key(key), inclusiveStartKey, range.getEndKey(), range.isEndKeyInclusive());
lookupResult.unfinishedRanges.add(nlur);
}
}
-
+
public static interface KVReceiver {
void receive(List<KVEntry> matches) throws IOException;
}
-
+
class LookupResult {
List<Range> unfinishedRanges = new ArrayList<Range>();
long bytesAdded = 0;
long dataSize = 0;
boolean closed = false;
}
-
+
public LookupResult lookup(List<Range> ranges, HashSet<Column> columns, Authorizations authorizations, ArrayList<KVEntry> results, long maxResultSize,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag) throws IOException {
-
+
if (ranges.size() == 0) {
return new LookupResult();
}
-
+
ranges = Range.mergeOverlapping(ranges);
Collections.sort(ranges);
-
+
Range tabletRange = extent.toDataRange();
for (Range range : ranges) {
// do a test to see if this range falls within the tablet, if it does not
// then clip will throw an exception
tabletRange.clip(range);
}
-
+
ScanDataSource dataSource = new ScanDataSource(authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag);
-
+
LookupResult result = null;
try {
@@ -1739,7 +1743,7 @@ public class Tablet {
// code in finally block because always want
// to return mapfiles, even when exception is thrown
dataSource.close(false);
-
+
synchronized (this) {
queryCount += results.size();
if (result != null)
@@ -1747,74 +1751,74 @@ public class Tablet {
}
}
}
-
+
private Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, HashSet<Column> columns) throws IOException {
-
+
// log.info("In nextBatch..");
-
+
List<KVEntry> results = new ArrayList<KVEntry>();
Key key = null;
-
+
Value value;
long resultSize = 0L;
long resultBytes = 0L;
-
+
long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
-
+
if (columns.size() == 0) {
iter.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
} else {
iter.seek(range, LocalityGroupUtil.families(columns), true);
}
-
+
Key continueKey = null;
boolean skipContinueKey = false;
-
+
boolean endOfTabletReached = false;
while (iter.hasTop()) {
-
- value = (Value) iter.getTopValue();
+
+ value = iter.getTopValue();
key = iter.getTopKey();
-
+
KVEntry kvEntry = new KVEntry(key, value); // copies key and value
results.add(kvEntry);
resultSize += kvEntry.estimateMemoryUsed();
resultBytes += kvEntry.numBytes();
-
+
if (resultSize >= maxResultsSize || results.size() >= num) {
continueKey = new Key(key);
skipContinueKey = true;
break;
}
-
+
iter.next();
}
-
+
if (iter.hasTop() == false) {
endOfTabletReached = true;
}
-
+
Batch retBatch = new Batch();
retBatch.numBytes = resultBytes;
-
+
if (!endOfTabletReached) {
retBatch.continueKey = continueKey;
retBatch.skipContinueKey = skipContinueKey;
} else {
retBatch.continueKey = null;
}
-
+
if (endOfTabletReached && results.size() == 0)
retBatch.results = null;
else
retBatch.results = results;
-
+
return retBatch;
}
-
+
/**
* Determine if a JVM shutdown is in progress.
- *
+ *
*/
private boolean shutdownInProgress() {
try {
@@ -1825,63 +1829,63 @@ public class Tablet {
} catch (IllegalStateException ise) {
return true;
}
-
+
return false;
}
-
+
private class Batch {
public boolean skipContinueKey;
public List<KVEntry> results;
public Key continueKey;
public long numBytes;
}
-
+
Scanner createScanner(Range range, int num, HashSet<Column> columns, Authorizations authorizations, List<IterInfo> ssiList,
Map<String,Map<String,String>> ssio, boolean isolated, AtomicBoolean interruptFlag) {
// do a test to see if this range falls within the tablet, if it does not
// then clip will throw an exception
extent.toDataRange().clip(range);
-
+
ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated);
return new Scanner(range, opts);
}
-
+
class ScanBatch {
boolean more;
List<KVEntry> results;
-
+
ScanBatch(List<KVEntry> results, boolean more) {
this.results = results;
this.more = more;
}
}
-
+
class Scanner {
-
+
private ScanOptions options;
private Range range;
private SortedKeyValueIterator<Key,Value> isolatedIter;
private ScanDataSource isolatedDataSource;
private boolean sawException = false;
private boolean scanClosed = false;
-
+
Scanner(Range range, ScanOptions options) {
this.range = range;
this.options = options;
}
-
+
synchronized ScanBatch read() throws IOException, TabletClosedException {
-
+
if (sawException)
throw new IllegalStateException("Tried to use scanner after exception occurred.");
-
+
if (scanClosed)
throw new IllegalStateException("Tried to use scanner after it was closed.");
-
+
Batch results = null;
-
+
ScanDataSource dataSource;
-
+
if (options.isolated) {
if (isolatedDataSource == null)
isolatedDataSource = new ScanDataSource(options);
@@ -1889,11 +1893,11 @@ public class Tablet {
} else {
dataSource = new ScanDataSource(options);
}
-
+
try {
-
+
SortedKeyValueIterator<Key,Value> iter;
-
+
if (options.isolated) {
if (isolatedIter == null)
isolatedIter = new SourceSwitchingIterator(dataSource, true);
@@ -1903,9 +1907,9 @@ public class Tablet {
} else {
iter = new SourceSwitchingIterator(dataSource, false);
}
-
+
results = nextBatch(iter, range, options.num, options.columnSet);
-
+
if (results.results == null) {
range = null;
return new ScanBatch(new ArrayList<Tablet.KVEntry>(), false);
@@ -1915,7 +1919,7 @@ public class Tablet {
range = new Range(results.continueKey, !results.skipContinueKey, range.getEndKey(), range.isEndKeyInclusive());
return new ScanBatch(results.results, true);
}
-
+
} catch (IterationInterruptedException iie) {
sawException = true;
if (isClosed())
@@ -1927,7 +1931,7 @@ public class Tablet {
log.debug("IOException while shutdown in progress ", ioe);
throw new TabletClosedException(ioe); // assume IOException was caused by execution of HDFS shutdown hook
}
-
+
sawException = true;
dataSource.close(true);
throw ioe;
@@ -1941,7 +1945,7 @@ public class Tablet {
dataSource.close(false);
else if (dataSource.fileManager != null)
dataSource.fileManager.detach();
-
+
synchronized (Tablet.this) {
if (results != null && results.results != null) {
long more = results.results.size();
@@ -1951,7 +1955,7 @@ public class Tablet {
}
}
}
-
+
// close and read are synchronized because can not call close on the data source while it is in use
// this cloud lead to the case where file iterators that are in use by a thread are returned
// to the pool... this would be bad
@@ -1964,9 +1968,9 @@ public class Tablet {
}
}
}
-
+
static class ScanOptions {
-
+
// scan options
Authorizations authorizations;
byte[] defaultLabels;
@@ -1976,7 +1980,7 @@ public class Tablet {
AtomicBoolean interruptFlag;
int num;
boolean isolated;
-
+
ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList,
Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, boolean isolated) {
this.num = num;
@@ -1988,11 +1992,11 @@ public class Tablet {
this.interruptFlag = interruptFlag;
this.isolated = isolated;
}
-
+
}
-
+
class ScanDataSource implements DataSource {
-
+
// data source state
private ScanFileManager fileManager;
private SortedKeyValueIterator<Key,Value> iter;
@@ -2001,22 +2005,22 @@ public class Tablet {
private long fileReservationId;
private AtomicBoolean interruptFlag;
private StatsIterator statsIterator;
-
+
ScanOptions options;
-
+
ScanDataSource(Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
AtomicBoolean interruptFlag) {
expectedDeletionCount = dataSourceDeletions.get();
this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false);
this.interruptFlag = interruptFlag;
}
-
+
ScanDataSource(ScanOptions options) {
expectedDeletionCount = dataSourceDeletions.get();
this.options = options;
this.interruptFlag = options.interruptFlag;
}
-
+
@Override
public DataSource getNewDataSource() {
if (!isCurrent()) {
@@ -2027,122 +2031,122 @@ public class Tablet {
datafileManager.returnFilesForScan(fileReservationId);
fileReservationId = -1;
}
-
+
if (fileManager != null)
fileManager.releaseOpenFiles(false);
-
+
expectedDeletionCount = dataSourceDeletions.get();
iter = null;
-
+
return this;
} else
return this;
}
-
+
@Override
public boolean isCurrent() {
return expectedDeletionCount == dataSourceDeletions.get();
}
-
+
@Override
public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
if (iter == null)
iter = createIterator();
return iter;
}
-
+
private SortedKeyValueIterator<Key,Value> createIterator() throws IOException {
-
+
Map<String,DataFileValue> files;
-
+
synchronized (Tablet.this) {
-
+
if (memIters != null)
throw new IllegalStateException("Tried to create new scan iterator w/o releasing memory");
-
+
if (Tablet.this.closed)
throw new TabletClosedException();
-
+
if (interruptFlag.get())
throw new IterationInterruptedException(extent.toString() + " " + interruptFlag.hashCode());
-
+
// only acquire the file manager when we know the tablet is open
if (fileManager == null) {
fileManager = tabletResources.newScanFileManager();
activeScans.add(this);
}
-
+
if (fileManager.getNumOpenFiles() != 0)
throw new IllegalStateException("Tried to create new scan iterator w/o releasing files");
-
+
// set this before trying to get iterators in case
// getIterators() throws an exception
expectedDeletionCount = dataSourceDeletions.get();
-
+
memIters = tabletMemory.getIterators();
Pair<Long,Map<String,DataFileValue>> reservation = datafileManager.reserveFilesForScan();
fileReservationId = reservation.getFirst();
files = reservation.getSecond();
}
-
+
Collection<InterruptibleIterator> mapfiles = fileManager.openFiles(files, options.isolated);
-
+
List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(mapfiles.size() + memIters.size());
-
+
iters.addAll(mapfiles);
iters.addAll(memIters);
-
+
for (SortedKeyValueIterator<Key,Value> skvi : iters)
((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag);
-
+
MultiIterator multiIter = new MultiIterator(iters, extent);
-
+
TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan, acuTableConf, fileManager, files);
-
+
statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, scannedCount);
-
+
DeletingIterator delIter = new DeletingIterator(statsIterator, false);
-
+
ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
-
+
ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, options.columnSet);
-
+
VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.authorizations, options.defaultLabels);
-
+
return iterEnv.getTopLevelIterator(IteratorUtil
.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.ssiList, options.ssio, iterEnv));
}
-
+
private void close(boolean sawErrors) {
-
+
if (memIters != null) {
tabletMemory.returnIterators(memIters);
memIters = null;
datafileManager.returnFilesForScan(fileReservationId);
fileReservationId = -1;
}
-
+
synchronized (Tablet.this) {
activeScans.remove(this);
if (activeScans.size() == 0)
Tablet.this.notifyAll();
}
-
+
if (fileManager != null) {
fileManager.releaseOpenFiles(sawErrors);
fileManager = null;
}
-
+
if (statsIterator != null) {
statsIterator.report();
}
}
-
+
public void interrupt() {
interruptFlag.set(true);
}
-
+
@Override
public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
throw new UnsupportedOperationException();
@@ -2152,28 +2156,28 @@ public class Tablet {
public void setInterruptFlag(AtomicBoolean flag) {
throw new UnsupportedOperationException();
}
-
+
}
-
+
private DataFileValue minorCompact(Configuration conf, FileSystem fs, InMemoryMap memTable, String tmpDatafile, String newDatafile, String mergeFile,
boolean hasQueueTime, long queued, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) {
boolean failed = false;
long start = System.currentTimeMillis();
timer.incrementStatusMinor();
-
+
long count = 0;
-
+
try {
Span span = Trace.start("write");
count = memTable.getNumEntries();
-
+
DataFileValue dfv = null;
if (mergeFile != null)
dfv = datafileManager.getDatafileSizes().get(mergeFile);
-
+
MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv, tmpDatafile, acuTableConf, extent, mincReason);
CompactionStats stats = compactor.call();
-
+
span.stop();
span = Trace.start("bringOnline");
datafileManager.bringMinorCompactionOnline(tmpDatafile, newDatafile, mergeFile, new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()),
@@ -2193,7 +2197,7 @@ public class Tablet {
} catch (Throwable t) {
log.error("Failed to free tablet memory", t);
}
-
+
if (!failed) {
lastMinorCompactionFinishTime = System.currentTimeMillis();
}
@@ -2207,16 +2211,16 @@ public class Tablet {
timer.updateTime(Operation.MINOR, start, count, failed);
}
}
-
+
private class MinorCompactionTask implements Runnable {
-
+
private long queued;
private CommitSession commitSession;
private DataFileValue stats;
private String mergeFile;
private long flushId;
private MinorCompactionReason mincReason;
-
+
MinorCompactionTask(String mergeFile, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) {
queued = System.currentTimeMillis();
minorCompactionWaitingToStart = true;
@@ -2225,7 +2229,8 @@ public class Tablet {
this.flushId = flushId;
this.mincReason = mincReason;
}
-
+
+ @Override
public void run() {
minorCompactionWaitingToStart = false;
minorCompactionInProgress = true;
@@ -2256,7 +2261,7 @@ public class Tablet {
this.stats = minorCompact(conf, fs, tabletMemory.getMinCMemTable(), newMapfileLocation + "_tmp", newMapfileLocation, mergeFile, true, queued,
commitSession, flushId, mincReason);
span.stop();
-
+
if (needsSplit()) {
tabletServer.executeSplit(Tablet.this);
} else {
@@ -2274,36 +2279,36 @@ public class Tablet {
}
}
}
-
+
private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason mincReason) {
CommitSession oldCommitSession = tabletMemory.prepareForMinC();
otherLogs = currentLogs;
currentLogs = new HashSet<DfsLogger>();
-
+
String mergeFile = datafileManager.reserveMergingMinorCompactionFile();
-
+
return new MinorCompactionTask(mergeFile, oldCommitSession, flushId, mincReason);
-
+
}
-
+
void flush(long tableFlushID) {
boolean updateMetadata = false;
boolean initiateMinor = false;
-
+
try {
-
+
synchronized (this) {
-
+
// only want one thing at a time to update flush ID to ensure that metadata table and tablet in memory state are consistent
if (updatingFlushID)
return;
-
+
if (lastFlushID >= tableFlushID)
return;
-
+
if (closing || closed || tabletMemory.memoryReservedForMinC())
return;
-
+
if (tabletMemory.getMemTable().getNumEntries() == 0) {
lastFlushID = tableFlushID;
updatingFlushID = true;
@@ -2311,7 +2316,7 @@ public class Tablet {
} else
initiateMinor = true;
}
-
+
if (updateMetadata) {
TCredentials creds = SecurityConstants.getSystemCredentials();
// if multiple threads were allowed to update this outside of a sync block, then it would be
@@ -2319,7 +2324,7 @@ public class Tablet {
MetadataTable.updateTabletFlushID(extent, tableFlushID, creds, tabletServer.getLock());
} else if (initiateMinor)
initiateMinorCompaction(tableFlushID, MinorCompactionReason.USER);
-
+
} finally {
if (updateMetadata) {
synchronized (this) {
@@ -2328,9 +2333,9 @@ public class Tablet {
}
}
}
-
+
}
-
+
boolean initiateMinorCompaction(MinorCompactionReason mincReason) {
if (isClosed()) {
// don't bother trying to get flush id if closed... could be closed after this check but that is ok... just trying to cut down on uneeded log messages....
@@ -2347,7 +2352,7 @@ public class Tablet {
}
return initiateMinorCompaction(flushId, mincReason);
}
-
+
boolean minorCompactNow(MinorCompactionReason mincReason) {
long flushId;
try {
@@ -2370,22 +2375,22 @@ public class Tablet {
tabletResources.executeMinorCompaction(mct);
return true;
}
-
+
private MinorCompactionTask createMinorCompactionTask(long flushId, MinorCompactionReason mincReason) {
MinorCompactionTask mct;
long t1, t2;
-
+
StringBuilder logMessage = null;
-
+
try {
synchronized (this) {
t1 = System.currentTimeMillis();
-
+
if (closing || closed || majorCompactionWaitingToStart || tabletMemory.memoryReservedForMinC() || tabletMemory.getMemTable().getNumEntries() == 0
|| updatingFlushID) {
-
+
logMessage = new StringBuilder();
-
+
logMessage.append(extent.toString());
logMessage.append(" closing " + closing);
logMessage.append(" closed " + closed);
@@ -2395,7 +2400,7 @@ public class Tablet {
if (tabletMemory != null && tabletMemory.getMemTable() != null)
logMessage.append(" tabletMemory.getMemTable().getNumEntries() " + tabletMemory.getMemTable().getNumEntries());
logMessage.append(" updatingFlushID " + updatingFlushID);
-
+
return null;
}
// We're still recovering log entries
@@ -2405,7 +2410,7 @@ public class Tablet {
logMessage.append(" datafileManager " + datafileManager);
return null;
}
-
+
mct = prepareForMinC(flushId, mincReason);
t2 = System.currentTimeMillis();
}
@@ -2414,11 +2419,11 @@ public class Tablet {
if (logMessage != null && log.isDebugEnabled())
log.debug(logMessage);
}
-
+
log.debug(String.format("MinC initiate lock %.2f secs", (t2 - t1) / 1000.0));
return mct;
}
-
+
long getFlushID() throws NoNodeException {
try {
String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId()
@@ -2436,11 +2441,11 @@ public class Tablet {
}
}
}
-
+
long getCompactionCancelID() {
String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId()
+ Constants.ZTABLE_COMPACT_CANCEL_ID;
-
+
try {
return Long.parseLong(new String(ZooReaderWriter.getRetryingInstance().getData(zTablePath, null), UTF_8));
} catch (KeeperException e) {
@@ -2454,32 +2459,32 @@ public class Tablet {
try {
String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId()
+ Constants.ZTABLE_COMPACT_ID;
-
+
String[] tokens = new String(ZooReaderWriter.getRetryingInstance().getData(zTablePath, null), UTF_8).split(",");
long compactID = Long.parseLong(tokens[0]);
-
+
CompactionIterators iters = new CompactionIterators();
if (tokens.length > 1) {
Hex hex = new Hex();
ByteArrayInputStream bais = new ByteArrayInputStream(hex.decode(tokens[1].split("=")[1].getBytes(UTF_8)));
DataInputStream dis = new DataInputStream(bais);
-
+
try {
iters.readFields(dis);
} catch (IOException e) {
throw new RuntimeException(e);
}
-
+
KeyExtent ke = new KeyExtent(extent.getTableId(), iters.getEndRow(), iters.getStartRow());
-
+
if (!ke.overlaps(extent)) {
// only use iterators if compaction range overlaps
iters = new CompactionIterators();
}
}
-
+
return new Pair<Long,List<IteratorSetting>>(compactID, iters.getIterators());
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -2495,62 +2500,62 @@ public class Tablet {
throw new RuntimeException(e);
}
}
-
+
public synchronized void waitForMinC() {
tabletMemory.waitForMinC();
}
-
+
static class TConstraintViolationException extends Exception {
private static final long serialVersionUID = 1L;
private Violations violations;
private List<Mutation> violators;
private List<Mutation> nonViolators;
private CommitSession commitSession;
-
+
TConstraintViolationException(Violations violations, List<Mutation> violators, List<Mutation> nonViolators, CommitSession commitSession) {
this.violations = violations;
this.violators = violators;
this.nonViolators = nonViolators;
this.commitSession = commitSession;
}
-
+
Violations getViolations() {
return violations;
}
-
+
List<Mutation> getViolators() {
return violators;
}
-
+
List<Mutation> getNonViolators() {
return nonViolators;
}
-
+
CommitSession getCommitSession() {
return commitSession;
}
}
-
+
private synchronized CommitSession finishPreparingMutations(long time) {
if (writesInProgress < 0) {
throw new IllegalStateException("waitingForLogs < 0 " + writesInProgress);
}
-
+
if (closed || tabletMemory == null) {
// log.debug("tablet closed, can't commit");
return null;
}
-
+
writesInProgress++;
CommitSession commitSession = tabletMemory.getCommitSession();
commitSession.incrementCommitsInProgress();
commitSession.updateMaxCommittedTime(time);
return commitSession;
}
-
+
public void checkConstraints() {
ConstraintChecker cc = constraintChecker.get();
-
+
if (cc.classLoaderChanged()) {
ConstraintChecker ncc = new ConstraintChecker(getTableConfiguration());
constraintChecker.compareAndSet(cc, ncc);
@@ -2558,9 +2563,9 @@ public class Tablet {
}
public CommitSession prepareMutationsForCommit(TservConstraintEnv cenv, List<Mutation> mutations) throws TConstraintViolationException {
-
+
ConstraintChecker cc = constraintChecker.get();
-
+
List<Mutation> violators = null;
Violations violations = new Violations();
cenv.setExtent(extent);
@@ -2573,22 +2578,22 @@ public class Tablet {
violators.add(mutation);
}
}
-
+
long time = tabletTime.setUpdateTimes(mutations);
-
+
if (!violations.isEmpty()) {
-
+
HashSet<Mutation> violatorsSet = new HashSet<Mutation>(violators);
ArrayList<Mutation> nonViolators = new ArrayList<Mutation>();
-
+
for (Mutation mutation : mutations) {
if (!violatorsSet.contains(mutation)) {
nonViolators.add(mutation);
}
}
-
+
CommitSession commitSession = null;
-
+
if (nonViolators.size() > 0) {
// if everything is a violation, then it is expected that
// code calling this will not log or commit
@@ -2596,66 +2601,66 @@ public class Tablet {
if (commitSession == null)
return null;
}
-
+
throw new TConstraintViolationException(violations, violators, nonViolators, commitSession);
}
-
+
return finishPreparingMutations(time);
}
-
+
public synchronized void abortCommit(CommitSession commitSession, List<Mutation> value) {
if (writesInProgress <= 0) {
throw new IllegalStateException("waitingForLogs <= 0 " + writesInProgress);
}
-
+
if (closeComplete || tabletMemory == null) {
throw new IllegalStateException("aborting commit when tablet is closed");
}
-
+
commitSession.decrementCommitsInProgress();
writesInProgress--;
if (writesInProgress == 0)
this.notifyAll();
}
-
+
public void commit(CommitSession commitSession, List<Mutation> mutations) {
-
+
int totalCount = 0;
long totalBytes = 0;
-
+
// write the mutation to the in memory table
for (Mutation mutation : mutations) {
totalCount += mutation.size();
totalBytes += mutation.numBytes();
}
-
+
tabletMemory.mutate(commitSession, mutations);
-
+
synchronized (this) {
if (writesInProgress < 1) {
throw new IllegalStateException("commiting mutations after logging, but not waiting for any log messages");
}
-
+
if (closed && closeComplete) {
throw new IllegalStateException("tablet closed with outstanding messages to the logger");
}
-
+
tabletMemory.updateMemoryUsageStats();
-
+
// decrement here in case an exception is thrown below
writesInProgress--;
if (writesInProgress == 0)
this.notifyAll();
-
+
commitSession.decrementCommitsInProgress();
-
+
numEntries += totalCount;
numEntriesInMemory += totalCount;
ingestCount += totalCount;
ingestBytes += totalBytes;
}
}
-
+
/**
* Closes the mapfiles associated with a Tablet. If saveState is true, a minor compaction is performed.
*/
@@ -2663,17 +2668,17 @@ public class Tablet {
initiateClose(saveState, false, false);
completeClose(saveState, true);
}
-
+
void initiateClose(boolean saveState, boolean queueMinC, boolean disableWrites) {
-
+
if (!saveState && queueMinC) {
throw new IllegalArgumentException("Not saving state on close and requesting minor compactions queue does not make sense");
}
-
+
log.debug("initiateClose(saveState=" + saveState + " queueMinC=" + queueMinC + " disableWrites=" + disableWrites + ") " + getExtent());
-
+
MinorCompactionTask mct = null;
-
+
synchronized (this) {
if (closed || closing || closeComplete) {
String msg = "Tablet " + getExtent() + " already";
@@ -2685,15 +2690,15 @@ public class Tablet {
msg += " closeComplete";
throw new IllegalStateException(msg);
}
-
+
// enter the closing state, no splits, minor, or major compactions can start
// should cause running major compactions to stop
closing = true;
this.notifyAll();
-
+
// determines if inserts and queries can still continue while minor compacting
closed = disableWrites;
-
+
// wait for major compactions to finish, setting closing to
// true should cause any running major compactions to abort
while (majorCompactionInProgress) {
@@ -2703,7 +2708,7 @@ public class Tablet {
log.error(e.toString());
}
}
-
+
while (updatingFlushID) {
try {
this.wait(50);
@@ -2715,50 +2720,50 @@ public class Tablet {
if (!saveState || tabletMemory.getMemTable().getNumEntries() == 0) {
return;
}
-
+
tabletMemory.waitForMinC();
-
+
try {
mct = prepareForMinC(getFlushID(), MinorCompactionReason.CLOSE);
} catch (NoNodeException e) {
throw new RuntimeException(e);
}
-
+
if (queueMinC) {
tabletResources.executeMinorCompaction(mct);
return;
}
-
+
}
-
+
// do minor compaction outside of synch block so that tablet can be read and written to while
// compaction runs
mct.run();
}
-
+
private boolean closeCompleting = false;
-
+
synchronized void completeClose(boolean saveState, boolean completeClose) throws IOException {
-
+
if (!closing || closeComplete || closeCompleting) {
throw new IllegalStateException("closing = " + closing + " closed = " + closed + " closeComplete = " + closeComplete + " closeCompleting = "
+ closeCompleting);
}
-
+
log.debug("completeClose(saveState=" + saveState + " completeClose=" + completeClose + ") " + getExtent());
-
+
// ensure this method is only called once, also guards against multiple
// threads entering the method at the same time
closeCompleting = true;
closed = true;
-
+
// modify dataSourceDeletions so scans will try to switch data sources and fail because the tablet is closed
dataSourceDeletions.incrementAndGet();
-
+
for (ScanDataSource activeScan : activeScans) {
activeScan.interrupt();
}
-
+
// wait for reads and writes to complete
while (writesInProgress > 0 || activeScans.size() > 0) {
try {
@@ -2767,9 +2772,9 @@ public class Tablet {
log.error(e.toString());
}
}
-
+
tabletMemory.waitForMinC();
-
+
if (saveState && tabletMemory.getMemTable().getNumEntries() > 0) {
try {
prepareForMinC(getFlushID(), MinorCompactionReason.CLOSE).run();
@@ -2777,7 +2782,7 @@ public class Tablet {
throw new RuntimeException(e);
}
}
-
+
if (saveState) {
// at this point all tablet data is flushed, so do a consistency check
RuntimeException err = null;
@@ -2796,48 +2801,48 @@ public class Tablet {
log.error("Tablet closed consistency check has failed for " + this.extent + " giving up and closing");
}
}
-
+
try {
tabletMemory.getMemTable().delete(0);
} catch (Throwable t) {
log.error("Failed to delete mem table : " + t.getMessage(), t);
}
-
+
tabletMemory = null;
-
+
// close map files
tabletResources.close();
-
+
log.log(TLevel.TABLET_HIST, extent + " closed");
-
+
acuTableConf.removeObserver(configObserver);
-
+
closeComplete = completeClose;
}
-
+
private void closeConsistencyCheck() {
-
+
if (tabletMemory.getMemTable().getNumEntries() != 0) {
String msg = "Closed tablet " + extent + " has " + tabletMemory.getMemTable().getNumEntries() + " entries in memory";
log.error(msg);
throw new RuntimeException(msg);
}
-
+
if (tabletMemory.memoryReservedForMinC()) {
String msg = "Closed tablet " + extent + " has minor compacting memory";
log.error(msg);
throw new RuntimeException(msg);
}
-
+
try {
Pair<List<LogEntry>,SortedMap<String,DataFileValue>> fileLog = MetadataTable.getFileAndLogEntries(SecurityConstants.getSystemCredentials(), extent);
-
+
if (fileLog.getFirst().size() != 0) {
String msg = "Closed tablet " + extent + " has walog entries in !METADATA " + fileLog.getFirst();
log.error(msg);
throw new RuntimeException(msg);
}
-
+
if (extent.isRootTablet()) {
if (!fileLog.getSecond().keySet().equals(datafileManager.getDatafileSizesRel().keySet())) {
String msg = "Data file in !METADATA differ from in memory data " + extent + " " + fileLog.getSecond().keySet() + " "
@@ -2853,59 +2858,60 @@ public class Tablet {
throw new RuntimeException(msg);
}
}
-
+
} catch (Exception e) {
String msg = "Failed to do close consistency check for tablet " + extent;
log.error(msg, e);
throw new RuntimeException(msg, e);
-
+
}
-
+
if (otherLogs.size() != 0 || currentLogs.size() != 0) {
String msg = "Closed tablet " + extent + " has walog entries in memory currentLogs = " + currentLogs + " otherLogs = " + otherLogs;
log.error(msg);
throw new RuntimeException(msg);
}
-
+
// TODO check lastFlushID and lostCompactID - ACCUMULO-1290
}
-
+
/**
* Returns a Path object representing the tablet's location on the DFS.
- *
+ *
* @return location
*/
public Path getLocation() {
return location;
}
-
+
private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
-
+
long queued;
long start;
boolean failed = false;
private MajorCompactionReason reason;
-
+
public CompactionRunner(MajorCompactionReason reason) {
queued = System.currentTimeMillis();
this.reason = reason;
}
-
+
+ @Override
public void run() {
CompactionStats majCStats = null;
-
+
if (tabletServer.isMajorCompactionDisabled()) {
// this will make compaction task that were queued when shutdown was
// initiated exit
majorCompactionQueued.remove(reason);
return;
}
-
+
try {
timer.incrementStatusMajor();
start = System.currentTimeMillis();
majCStats = majorCompact(reason);
-
+
// if there is more work to be done, queue another major compaction
synchronized (Tablet.this) {
if (reason == MajorCompactionReason.NORMAL && needsMajorCompaction(reason))
@@ -2919,12 +2925,12 @@ public class Tablet {
if (majCStats != null) {
count = majCStats.getEntriesRead();
}
-
+
timer.updateTime(Operation.MAJOR, queued, start, count, failed);
}
}
-
- // We used to synchronize on the Tablet before fetching this information,
+
+ // We used to synchronize on the Tablet before fetching this information,
// but this method is called by the compaction queue thread to re-order the compactions.
// The compaction queue holds a lock during this sort.
// A tablet lock can be held while putting itself on the queue, so we can't lock the tablet
@@ -2933,40 +2939,40 @@ public class Tablet {
private int getNumFiles() {
return datafileManager.datafileSizes.size();
}
-
+
@Override
public int compareTo(CompactionRunner o) {
int cmp = reason.compareTo(o.reason);
if (cmp != 0)
return cmp;
-
+
if (reason == MajorCompactionReason.USER || reason == MajorCompactionReason.CHOP) {
// for these types of compactions want to do the oldest first
cmp = (int) (queued - o.queued);
if (cmp != 0)
r
<TRUNCATED>
[2/2] accumulo git commit: ACCUMULO-3462 reduce the opportunity for
the queued tablet to get mis-marked
Posted by ec...@apache.org.
ACCUMULO-3462 reduce the opportunity for the queued tablet to get mis-marked
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/07422efc
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/07422efc
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/07422efc
Branch: refs/heads/1.5
Commit: 07422efc5140e57735d22fe4df1d966095c4e172
Parents: 3bcc3c9
Author: Eric C. Newton <er...@gmail.com>
Authored: Mon Jan 5 14:53:00 2015 -0500
Committer: Eric C. Newton <er...@gmail.com>
Committed: Mon Jan 5 14:53:00 2015 -0500
----------------------------------------------------------------------
.../accumulo/server/tabletserver/Tablet.java | 1560 +++++++++---------
1 file changed, 785 insertions(+), 775 deletions(-)
----------------------------------------------------------------------