You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/09/12 09:46:34 UTC
[05/10] Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d54e0fd8/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
index f9fdacb,0000000..703d1ae
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@@ -1,3833 -1,0 +1,3838 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.impl.ScannerImpl;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.ConfigurationObserver;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.constraints.Violations;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.data.thrift.MapFileInfo;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
+import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
+import org.apache.accumulo.core.iterators.system.StatsIterator;
+import org.apache.accumulo.core.iterators.system.VisibilityFilter;
+import org.apache.accumulo.core.master.thrift.TabletLoadState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.core.util.MapCounter;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManager.FileType;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.fs.VolumeUtil;
+import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.tableOps.CompactionIterators;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tablets.TabletTime;
+import org.apache.accumulo.server.tablets.UniqueNameAllocator;
+import org.apache.accumulo.server.util.FileUtil;
+import org.apache.accumulo.server.util.MasterMetadataUtil;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.util.TabletOperations;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.tserver.Compactor.CompactionCanceledException;
+import org.apache.accumulo.tserver.Compactor.CompactionEnv;
+import org.apache.accumulo.tserver.FileManager.ScanFileManager;
+import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
+import org.apache.accumulo.tserver.TabletServer.TservConstraintEnv;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
+import org.apache.accumulo.tserver.compaction.CompactionPlan;
+import org.apache.accumulo.tserver.compaction.CompactionStrategy;
+import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
+import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
+import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.accumulo.tserver.compaction.WriteParameters;
+import org.apache.accumulo.tserver.constraints.ConstraintChecker;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.accumulo.tserver.log.MutationReceiver;
+import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
+import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/*
+ * We need to be able to have the master tell a tabletServer to
+ * close this file, and the tablet server to handle all pending client reads
+ * before closing
+ *
+ */
+
+/**
+ *
+ * this class just provides an interface to read from a MapFile mostly takes care of reporting start and end keys
+ *
+ * need this because a single row extent can have multiple columns this manages all the columns (each handled by a store) for a single row-extent
+ *
+ *
+ */
+
+public class Tablet {
+
+ enum MinorCompactionReason {
+ USER, SYSTEM, CLOSE, RECOVERY
+ }
+
+ public class CommitSession {
+
+ private int seq;
+ private InMemoryMap memTable;
+ private int commitsInProgress;
+ private long maxCommittedTime = Long.MIN_VALUE;
+
+ private CommitSession(int seq, InMemoryMap imm) {
+ this.seq = seq;
+ this.memTable = imm;
+ commitsInProgress = 0;
+ }
+
+ public int getWALogSeq() {
+ return seq;
+ }
+
+ private void decrementCommitsInProgress() {
+ if (commitsInProgress < 1)
+ throw new IllegalStateException("commitsInProgress = " + commitsInProgress);
+
+ commitsInProgress--;
+ if (commitsInProgress == 0)
+ Tablet.this.notifyAll();
+ }
+
+ private void incrementCommitsInProgress() {
+ if (commitsInProgress < 0)
+ throw new IllegalStateException("commitsInProgress = " + commitsInProgress);
+
+ commitsInProgress++;
+ }
+
+ private void waitForCommitsToFinish() {
+ while (commitsInProgress > 0) {
+ try {
+ Tablet.this.wait(50);
+ } catch (InterruptedException e) {
+ log.warn(e, e);
+ }
+ }
+ }
+
+ public void abortCommit(List<Mutation> value) {
+ Tablet.this.abortCommit(this, value);
+ }
+
+ public void commit(List<Mutation> mutations) {
+ Tablet.this.commit(this, mutations);
+ }
+
+ public Tablet getTablet() {
+ return Tablet.this;
+ }
+
+ public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) {
+ return Tablet.this.beginUpdatingLogsUsed(memTable, copy, mincFinish);
+ }
+
+ public void finishUpdatingLogsUsed() {
+ Tablet.this.finishUpdatingLogsUsed();
+ }
+
+ public int getLogId() {
+ return logId;
+ }
+
+ public KeyExtent getExtent() {
+ return extent;
+ }
+
+ private void updateMaxCommittedTime(long time) {
+ maxCommittedTime = Math.max(time, maxCommittedTime);
+ }
+
+ private long getMaxCommittedTime() {
+ if (maxCommittedTime == Long.MIN_VALUE)
+ throw new IllegalStateException("Tried to read max committed time when it was never set");
+ return maxCommittedTime;
+ }
+
+ }
+
+ private class TabletMemory {
+ private InMemoryMap memTable;
+ private InMemoryMap otherMemTable;
+ private InMemoryMap deletingMemTable;
+ private int nextSeq = 1;
+ private CommitSession commitSession;
+
+ TabletMemory() {
+ try {
+ memTable = new InMemoryMap(acuTableConf);
+ } catch (LocalityGroupConfigurationError e) {
+ throw new RuntimeException(e);
+ }
+ commitSession = new CommitSession(nextSeq, memTable);
+ nextSeq += 2;
+ }
+
+ InMemoryMap getMemTable() {
+ return memTable;
+ }
+
+ InMemoryMap getMinCMemTable() {
+ return otherMemTable;
+ }
+
+ CommitSession prepareForMinC() {
+ if (otherMemTable != null) {
+ throw new IllegalStateException();
+ }
+
+ if (deletingMemTable != null) {
+ throw new IllegalStateException();
+ }
+
+ otherMemTable = memTable;
+ try {
+ memTable = new InMemoryMap(acuTableConf);
+ } catch (LocalityGroupConfigurationError e) {
+ throw new RuntimeException(e);
+ }
+
+ CommitSession oldCommitSession = commitSession;
+ commitSession = new CommitSession(nextSeq, memTable);
+ nextSeq += 2;
+
+ tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), otherMemTable.estimatedSizeInBytes());
+
+ return oldCommitSession;
+ }
+
+ void finishedMinC() {
+
+ if (otherMemTable == null) {
+ throw new IllegalStateException();
+ }
+
+ if (deletingMemTable != null) {
+ throw new IllegalStateException();
+ }
+
+ deletingMemTable = otherMemTable;
+
+ otherMemTable = null;
+ Tablet.this.notifyAll();
+ }
+
+ void finalizeMinC() {
+ try {
+ deletingMemTable.delete(15000);
+ } finally {
+ synchronized (Tablet.this) {
+ if (otherMemTable != null) {
+ throw new IllegalStateException();
+ }
+
+ if (deletingMemTable == null) {
+ throw new IllegalStateException();
+ }
+
+ deletingMemTable = null;
+
+ tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0);
+ }
+ }
+ }
+
+ boolean memoryReservedForMinC() {
+ return otherMemTable != null || deletingMemTable != null;
+ }
+
+ void waitForMinC() {
+ while (otherMemTable != null || deletingMemTable != null) {
+ try {
+ Tablet.this.wait(50);
+ } catch (InterruptedException e) {
+ log.warn(e, e);
+ }
+ }
+ }
+
+ void mutate(CommitSession cm, List<Mutation> mutations) {
+ cm.memTable.mutate(mutations);
+ }
+
+ void updateMemoryUsageStats() {
+ long other = 0;
+ if (otherMemTable != null)
+ other = otherMemTable.estimatedSizeInBytes();
+ else if (deletingMemTable != null)
+ other = deletingMemTable.estimatedSizeInBytes();
+
+ tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other);
+ }
+
+ List<MemoryIterator> getIterators() {
+ List<MemoryIterator> toReturn = new ArrayList<MemoryIterator>(2);
+ toReturn.add(memTable.skvIterator());
+ if (otherMemTable != null)
+ toReturn.add(otherMemTable.skvIterator());
+ return toReturn;
+ }
+
+ void returnIterators(List<MemoryIterator> iters) {
+ for (MemoryIterator iter : iters) {
+ iter.close();
+ }
+ }
+
+ public long getNumEntries() {
+ if (otherMemTable != null)
+ return memTable.getNumEntries() + otherMemTable.getNumEntries();
+ return memTable.getNumEntries();
+ }
+
+ CommitSession getCommitSession() {
+ return commitSession;
+ }
+ }
+
+ private TabletMemory tabletMemory;
+
+ private final TabletTime tabletTime;
+ private long persistedTime;
+ private final Object timeLock = new Object();
+
+ private final Path location; // absolute path of this tablets dir
+ private TServerInstance lastLocation;
+
+ private Configuration conf;
+ private VolumeManager fs;
+
+ private TableConfiguration acuTableConf;
+
+ private volatile boolean tableDirChecked = false;
+
+ private AtomicLong dataSourceDeletions = new AtomicLong(0);
+ private Set<ScanDataSource> activeScans = new HashSet<ScanDataSource>();
+
+ private volatile boolean closing = false;
+ private boolean closed = false;
+ private boolean closeComplete = false;
+
+ private long lastFlushID = -1;
+ private long lastCompactID = -1;
+
+ private KeyExtent extent;
+
+ private TabletResourceManager tabletResources;
+ final private DatafileManager datafileManager;
+ private volatile boolean majorCompactionInProgress = false;
+ private volatile boolean majorCompactionWaitingToStart = false;
+ private Set<MajorCompactionReason> majorCompactionQueued = Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class));
+ private volatile boolean minorCompactionInProgress = false;
+ private volatile boolean minorCompactionWaitingToStart = false;
+
+ private boolean updatingFlushID = false;
+
+ private AtomicReference<ConstraintChecker> constraintChecker = new AtomicReference<ConstraintChecker>();
+
+ private final String tabletDirectory;
+
+ private int writesInProgress = 0;
+
+ private static final Logger log = Logger.getLogger(Tablet.class);
+ public TabletStatsKeeper timer;
+
+ private Rate queryRate = new Rate(0.2);
+ private long queryCount = 0;
+
+ private Rate queryByteRate = new Rate(0.2);
+ private long queryBytes = 0;
+
+ private Rate ingestRate = new Rate(0.2);
+ private long ingestCount = 0;
+
+ private Rate ingestByteRate = new Rate(0.2);
+ private long ingestBytes = 0;
+
+ private byte[] defaultSecurityLabel = new byte[0];
+
+ private long lastMinorCompactionFinishTime;
+ private long lastMapFileImportTime;
+
+ private volatile long numEntries;
+ private volatile long numEntriesInMemory;
+
+ // a count of the amount of data read by the iterators
+ private AtomicLong scannedCount = new AtomicLong(0);
+ private Rate scannedRate = new Rate(0.2);
+
+ private ConfigurationObserver configObserver;
+
+ private TabletServer tabletServer;
+
+ private final int logId;
+ // ensure we only have one reader/writer of our bulk file notes at at time
+ public final Object bulkFileImportLock = new Object();
+
+ public int getLogId() {
+ return logId;
+ }
+
+ public static class TabletClosedException extends RuntimeException {
+ public TabletClosedException(Exception e) {
+ super(e);
+ }
+
+ public TabletClosedException() {
+ super();
+ }
+
+ private static final long serialVersionUID = 1L;
+ }
+
+ FileRef getNextMapFilename(String prefix) throws IOException {
+ String extension = FileOperations.getNewFileExtension(tabletServer.getTableConfiguration(extent));
+ checkTabletDir();
+ return new FileRef(location.toString() + "/" + prefix + UniqueNameAllocator.getInstance().getNextName() + "." + extension);
+ }
+
+ private void checkTabletDir() throws IOException {
+ if (!tableDirChecked) {
+ checkTabletDir(this.location);
+ tableDirChecked = true;
+ }
+ }
+
+ private void checkTabletDir(Path tabletDir) throws IOException {
+
+ FileStatus[] files = null;
+ try {
+ files = fs.listStatus(tabletDir);
+ } catch (FileNotFoundException ex) {
+ // ignored
+ }
+
+ if (files == null) {
+ if (tabletDir.getName().startsWith("c-"))
+ log.debug("Tablet " + extent + " had no dir, creating " + tabletDir); // its a clone dir...
+ else
+ log.warn("Tablet " + extent + " had no dir, creating " + tabletDir);
+
+ fs.mkdirs(tabletDir);
+ }
+ }
+
+ class DatafileManager {
+ // access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles
+ final private Map<FileRef,DataFileValue> datafileSizes = Collections.synchronizedMap(new TreeMap<FileRef,DataFileValue>());
+
+ DatafileManager(SortedMap<FileRef,DataFileValue> datafileSizes) {
+ for (Entry<FileRef,DataFileValue> datafiles : datafileSizes.entrySet())
+ this.datafileSizes.put(datafiles.getKey(), datafiles.getValue());
+ }
+
+ FileRef mergingMinorCompactionFile = null;
+ Set<FileRef> filesToDeleteAfterScan = new HashSet<FileRef>();
+ Map<Long,Set<FileRef>> scanFileReservations = new HashMap<Long,Set<FileRef>>();
+ MapCounter<FileRef> fileScanReferenceCounts = new MapCounter<FileRef>();
+ long nextScanReservationId = 0;
+ boolean reservationsBlocked = false;
+
+ Set<FileRef> majorCompactingFiles = new HashSet<FileRef>();
+
+ Pair<Long,Map<FileRef,DataFileValue>> reserveFilesForScan() {
+ synchronized (Tablet.this) {
+
+ while (reservationsBlocked) {
+ try {
+ Tablet.this.wait(50);
+ } catch (InterruptedException e) {
+ log.warn(e, e);
+ }
+ }
+
+ Set<FileRef> absFilePaths = new HashSet<FileRef>(datafileSizes.keySet());
+
+ long rid = nextScanReservationId++;
+
+ scanFileReservations.put(rid, absFilePaths);
+
+ Map<FileRef,DataFileValue> ret = new HashMap<FileRef,DataFileValue>();
+
+ for (FileRef path : absFilePaths) {
+ fileScanReferenceCounts.increment(path, 1);
+ ret.put(path, datafileSizes.get(path));
+ }
+
+ return new Pair<Long,Map<FileRef,DataFileValue>>(rid, ret);
+ }
+ }
+
+ void returnFilesForScan(Long reservationId) {
+
+ final Set<FileRef> filesToDelete = new HashSet<FileRef>();
+
+ synchronized (Tablet.this) {
+ Set<FileRef> absFilePaths = scanFileReservations.remove(reservationId);
+
+ if (absFilePaths == null)
+ throw new IllegalArgumentException("Unknown scan reservation id " + reservationId);
+
+ boolean notify = false;
+ for (FileRef path : absFilePaths) {
+ long refCount = fileScanReferenceCounts.decrement(path, 1);
+ if (refCount == 0) {
+ if (filesToDeleteAfterScan.remove(path))
+ filesToDelete.add(path);
+ notify = true;
+ } else if (refCount < 0)
+ throw new IllegalStateException("Scan ref count for " + path + " is " + refCount);
+ }
+
+ if (notify)
+ Tablet.this.notifyAll();
+ }
+
+ if (filesToDelete.size() > 0) {
+ log.debug("Removing scan refs from metadata " + extent + " " + filesToDelete);
+ MetadataTableUtil.removeScanFiles(extent, filesToDelete, SystemCredentials.get(), tabletServer.getLock());
+ }
+ }
+
+ private void removeFilesAfterScan(Set<FileRef> scanFiles) {
+ if (scanFiles.size() == 0)
+ return;
+
+ Set<FileRef> filesToDelete = new HashSet<FileRef>();
+
+ synchronized (Tablet.this) {
+ for (FileRef path : scanFiles) {
+ if (fileScanReferenceCounts.get(path) == 0)
+ filesToDelete.add(path);
+ else
+ filesToDeleteAfterScan.add(path);
+ }
+ }
+
+ if (filesToDelete.size() > 0) {
+ log.debug("Removing scan refs from metadata " + extent + " " + filesToDelete);
+ MetadataTableUtil.removeScanFiles(extent, filesToDelete, SystemCredentials.get(), tabletServer.getLock());
+ }
+ }
+
+ private TreeSet<FileRef> waitForScansToFinish(Set<FileRef> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) {
+ long startTime = System.currentTimeMillis();
+ TreeSet<FileRef> inUse = new TreeSet<FileRef>();
+
+ Span waitForScans = Trace.start("waitForScans");
+ try {
+ synchronized (Tablet.this) {
+ if (blockNewScans) {
+ if (reservationsBlocked)
+ throw new IllegalStateException();
+
+ reservationsBlocked = true;
+ }
+
+ for (FileRef path : pathsToWaitFor) {
+ while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) {
+ try {
+ Tablet.this.wait(100);
+ } catch (InterruptedException e) {
+ log.warn(e, e);
+ }
+ }
+ }
+
+ for (FileRef path : pathsToWaitFor) {
+ if (fileScanReferenceCounts.get(path) > 0)
+ inUse.add(path);
+ }
+
+ if (blockNewScans) {
+ reservationsBlocked = false;
+ Tablet.this.notifyAll();
+ }
+
+ }
+ } finally {
+ waitForScans.stop();
+ }
+ return inUse;
+ }
+
+ public void importMapFiles(long tid, Map<FileRef,DataFileValue> pathsString, boolean setTime) throws IOException {
+
+ String bulkDir = null;
+
+ Map<FileRef,DataFileValue> paths = new HashMap<FileRef,DataFileValue>();
+ for (Entry<FileRef,DataFileValue> entry : pathsString.entrySet())
+ paths.put(entry.getKey(), entry.getValue());
+
+ for (FileRef tpath : paths.keySet()) {
+
+ boolean inTheRightDirectory = false;
+ Path parent = tpath.path().getParent().getParent();
+ for (String tablesDir : ServerConstants.getTablesDirs()) {
+ if (parent.equals(new Path(tablesDir, extent.getTableId().toString()))) {
+ inTheRightDirectory = true;
+ break;
+ }
+ }
+ if (!inTheRightDirectory) {
+ throw new IOException("Data file " + tpath + " not in table dirs");
+ }
+
+ if (bulkDir == null)
+ bulkDir = tpath.path().getParent().toString();
+ else if (!bulkDir.equals(tpath.path().getParent().toString()))
+ throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + " " + tpath);
+
+ }
+
+ if (extent.isRootTablet()) {
+ throw new IllegalArgumentException("Can not import files to root tablet");
+ }
+
+ synchronized (bulkFileImportLock) {
+ Credentials creds = SystemCredentials.get();
+ Connector conn;
+ try {
+ conn = HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), creds.getToken());
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ // Remove any bulk files we've previously loaded and compacted away
+ List<FileRef> files = MetadataTableUtil.getBulkFilesLoaded(conn, extent, tid);
+
+ for (FileRef file : files)
+ if (paths.keySet().remove(file))
+ log.debug("Ignoring request to re-import a file already imported: " + extent + ": " + file);
+
+ if (paths.size() > 0) {
+ long bulkTime = Long.MIN_VALUE;
+ if (setTime) {
+ for (DataFileValue dfv : paths.values()) {
+ long nextTime = tabletTime.getAndUpdateTime();
+ if (nextTime < bulkTime)
+ throw new IllegalStateException("Time went backwards unexpectedly " + nextTime + " " + bulkTime);
+ bulkTime = nextTime;
+ dfv.setTime(bulkTime);
+ }
+ }
+
+ synchronized (timeLock) {
+ if (bulkTime > persistedTime)
+ persistedTime = bulkTime;
+
+ MetadataTableUtil.updateTabletDataFile(tid, extent, paths, tabletTime.getMetadataValue(persistedTime), creds, tabletServer.getLock());
+ }
+ }
+ }
+
+ synchronized (Tablet.this) {
+ for (Entry<FileRef,DataFileValue> tpath : paths.entrySet()) {
+ if (datafileSizes.containsKey(tpath.getKey())) {
+ log.error("Adding file that is already in set " + tpath.getKey());
+ }
+ datafileSizes.put(tpath.getKey(), tpath.getValue());
+
+ }
+
+ tabletResources.importedMapFiles();
+
+ computeNumEntries();
+ }
+
+ for (Entry<FileRef,DataFileValue> entry : paths.entrySet()) {
+ log.log(TLevel.TABLET_HIST, extent + " import " + entry.getKey() + " " + entry.getValue());
+ }
+ }
+
+ FileRef reserveMergingMinorCompactionFile() {
+ if (mergingMinorCompactionFile != null)
+ throw new IllegalStateException("Tried to reserve merging minor compaction file when already reserved : " + mergingMinorCompactionFile);
+
+ if (extent.isRootTablet())
+ return null;
+
+ int maxFiles = acuTableConf.getMaxFilesPerTablet();
+
+ // when a major compaction is running and we are at max files, write out
+ // one extra file... want to avoid the case where major compaction is
+ // compacting everything except for the largest file, and therefore the
+ // largest file is returned for merging.. the following check mostly
+ // avoids this case, except for the case where major compactions fail or
+ // are canceled
+ if (majorCompactingFiles.size() > 0 && datafileSizes.size() == maxFiles)
+ return null;
+
+ if (datafileSizes.size() >= maxFiles) {
+ // find the smallest file
+
+ long min = Long.MAX_VALUE;
+ FileRef minName = null;
+
+ for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
+ if (entry.getValue().getSize() < min && !majorCompactingFiles.contains(entry.getKey())) {
+ min = entry.getValue().getSize();
+ minName = entry.getKey();
+ }
+ }
+
+ if (minName == null)
+ return null;
+
+ mergingMinorCompactionFile = minName;
+ return minName;
+ }
+
+ return null;
+ }
+
+ void unreserveMergingMinorCompactionFile(FileRef file) {
+ if ((file == null && mergingMinorCompactionFile != null) || (file != null && mergingMinorCompactionFile == null)
+ || (file != null && mergingMinorCompactionFile != null && !file.equals(mergingMinorCompactionFile)))
+ throw new IllegalStateException("Disagreement " + file + " " + mergingMinorCompactionFile);
+
+ mergingMinorCompactionFile = null;
+ }
+
+ void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId)
+ throws IOException {
+
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ if (extent.isRootTablet()) {
+ try {
+ if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) {
+ throw new IllegalStateException();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
+ }
+ }
+
+ // rename before putting in metadata table, so files in metadata table should
+ // always exist
+ do {
+ try {
+ if (dfv.getNumEntries() == 0) {
+ fs.deleteRecursively(tmpDatafile.path());
+ } else {
+ if (fs.exists(newDatafile.path())) {
+ log.warn("Target map file already exist " + newDatafile);
+ fs.deleteRecursively(newDatafile.path());
+ }
+
+ rename(fs, tmpDatafile.path(), newDatafile.path());
+ }
+ break;
+ } catch (IOException ioe) {
+ log.warn("Tablet " + extent + " failed to rename " + newDatafile + " after MinC, will retry in 60 secs...", ioe);
+ UtilWaitThread.sleep(60 * 1000);
+ }
+ } while (true);
+
+ long t1, t2;
+
+ // the code below always assumes merged files are in use by scans... this must be done
+ // because the in memory list of files is not updated until after the metadata table
+ // therefore the file is available to scans until memory is updated, but want to ensure
+ // the file is not available for garbage collection... if memory were updated
+ // before this point (like major compactions do), then the following code could wait
+ // for scans to finish like major compactions do.... used to wait for scans to finish
+ // here, but that was incorrect because a scan could start after waiting but before
+ // memory was updated... assuming the file is always in use by scans leads to
+ // one uneeded metadata update when it was not actually in use
+ Set<FileRef> filesInUseByScans = Collections.emptySet();
+ if (absMergeFile != null)
+ filesInUseByScans = Collections.singleton(absMergeFile);
+
+ // very important to write delete entries outside of log lock, because
+ // this metadata write does not go up... it goes sideways or to itself
+ if (absMergeFile != null)
+ MetadataTableUtil.addDeleteEntries(extent, Collections.singleton(absMergeFile), SystemCredentials.get());
+
+ Set<String> unusedWalLogs = beginClearingUnusedLogs();
+ try {
+ // the order of writing to metadata and walog is important in the face of machine/process failures
+ // need to write to metadata before writing to walog, when things are done in the reverse order
+ // data could be lost... the minor compaction start even should be written before the following metadata
+ // write is made
+
+ synchronized (timeLock) {
+ if (commitSession.getMaxCommittedTime() > persistedTime)
+ persistedTime = commitSession.getMaxCommittedTime();
+
+ String time = tabletTime.getMetadataValue(persistedTime);
+ MasterMetadataUtil.updateTabletDataFile(extent, newDatafile, absMergeFile, dfv, time, SystemCredentials.get(), filesInUseByScans,
+ tabletServer.getClientAddressString(), tabletServer.getLock(), unusedWalLogs, lastLocation, flushId);
+ }
+
+ } finally {
+ finishClearingUnusedLogs();
+ }
+
+ do {
+ try {
+ // the purpose of making this update use the new commit session, instead of the old one passed in,
+ // is because the new one will reference the logs used by current memory...
+
+ tabletServer.minorCompactionFinished(tabletMemory.getCommitSession(), newDatafile.toString(), commitSession.getWALogSeq() + 2);
+ break;
+ } catch (IOException e) {
+ log.error("Failed to write to write-ahead log " + e.getMessage() + " will retry", e);
+ UtilWaitThread.sleep(1 * 1000);
+ }
+ } while (true);
+
+ synchronized (Tablet.this) {
+ lastLocation = null;
+
+ t1 = System.currentTimeMillis();
+ if (datafileSizes.containsKey(newDatafile)) {
+ log.error("Adding file that is already in set " + newDatafile);
+ }
+
+ if (dfv.getNumEntries() > 0) {
+ datafileSizes.put(newDatafile, dfv);
+ }
+
+ if (absMergeFile != null) {
+ datafileSizes.remove(absMergeFile);
+ }
+
+ unreserveMergingMinorCompactionFile(absMergeFile);
+
+ dataSourceDeletions.incrementAndGet();
+ tabletMemory.finishedMinC();
+
+ lastFlushID = flushId;
+
+ computeNumEntries();
+ t2 = System.currentTimeMillis();
+ }
+
+ // must do this after list of files in memory is updated above
+ removeFilesAfterScan(filesInUseByScans);
+
+ if (absMergeFile != null)
+ log.log(TLevel.TABLET_HIST, extent + " MinC [" + absMergeFile + ",memory] -> " + newDatafile);
+ else
+ log.log(TLevel.TABLET_HIST, extent + " MinC [memory] -> " + newDatafile);
+ log.debug(String.format("MinC finish lock %.2f secs %s", (t2 - t1) / 1000.0, getExtent().toString()));
+ if (dfv.getSize() > acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD)) {
+ log.debug(String.format("Minor Compaction wrote out file larger than split threshold. split threshold = %,d file size = %,d",
+ acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD), dfv.getSize()));
+ }
+
+ }
+
+ public void reserveMajorCompactingFiles(Collection<FileRef> files) {
+ if (majorCompactingFiles.size() != 0)
+ throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles);
+
+ if (mergingMinorCompactionFile != null && files.contains(mergingMinorCompactionFile))
+ throw new IllegalStateException("Major compaction tried to resrve file in use by minor compaction " + mergingMinorCompactionFile);
+
+ majorCompactingFiles.addAll(files);
+ }
+
+ public void clearMajorCompactingFile() {
+ majorCompactingFiles.clear();
+ }
+
+ void bringMajorCompactionOnline(Set<FileRef> oldDatafiles, FileRef tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv)
+ throws IOException {
+ long t1, t2;
+
+ if (!extent.isRootTablet()) {
+
+ if (fs.exists(newDatafile.path())) {
+ log.error("Target map file already exist " + newDatafile, new Exception());
+ throw new IllegalStateException("Target map file already exist " + newDatafile);
+ }
+
+ // rename before putting in metadata table, so files in metadata table should
+ // always exist
+ rename(fs, tmpDatafile.path(), newDatafile.path());
+
+ if (dfv.getNumEntries() == 0) {
+ fs.deleteRecursively(newDatafile.path());
+ }
+ }
+
+ TServerInstance lastLocation = null;
+ synchronized (Tablet.this) {
+
+ t1 = System.currentTimeMillis();
+
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+
+ dataSourceDeletions.incrementAndGet();
+
+ if (extent.isRootTablet()) {
+
+ waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE);
+
+ try {
+ if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) {
+ throw new IllegalStateException();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
+ }
+
+ // mark files as ready for deletion, but
+ // do not delete them until we successfully
+ // rename the compacted map file, in case
+ // the system goes down
+
+ RootFiles.replaceFiles(acuTableConf, fs, location, oldDatafiles, tmpDatafile, newDatafile);
+ }
+
+ // atomically remove old files and add new file
+ for (FileRef oldDatafile : oldDatafiles) {
+ if (!datafileSizes.containsKey(oldDatafile)) {
+ log.error("file does not exist in set " + oldDatafile);
+ }
+ datafileSizes.remove(oldDatafile);
+ majorCompactingFiles.remove(oldDatafile);
+ }
+
+ if (datafileSizes.containsKey(newDatafile)) {
+ log.error("Adding file that is already in set " + newDatafile);
+ }
+
+ if (dfv.getNumEntries() > 0) {
+ datafileSizes.put(newDatafile, dfv);
+ }
+
+ // could be used by a follow on compaction in a multipass compaction
+ majorCompactingFiles.add(newDatafile);
+
+ computeNumEntries();
+
+ lastLocation = Tablet.this.lastLocation;
+ Tablet.this.lastLocation = null;
+
+ if (compactionId != null)
+ lastCompactID = compactionId;
+
+ t2 = System.currentTimeMillis();
+ }
+
+ if (!extent.isRootTablet()) {
+ Set<FileRef> filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000);
+ if (filesInUseByScans.size() > 0)
+ log.debug("Adding scan refs to metadata " + extent + " " + filesInUseByScans);
+ MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles, filesInUseByScans, newDatafile, compactionId, dfv, SystemCredentials.get(),
+ tabletServer.getClientAddressString(), lastLocation, tabletServer.getLock());
+ removeFilesAfterScan(filesInUseByScans);
+ }
+
+ log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0));
+ log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + " --> " + newDatafile);
+ }
+
+ public SortedMap<FileRef,DataFileValue> getDatafileSizes() {
+ synchronized (Tablet.this) {
+ TreeMap<FileRef,DataFileValue> copy = new TreeMap<FileRef,DataFileValue>(datafileSizes);
+ return Collections.unmodifiableSortedMap(copy);
+ }
+ }
+
+ public Set<FileRef> getFiles() {
+ synchronized (Tablet.this) {
+ HashSet<FileRef> files = new HashSet<FileRef>(datafileSizes.keySet());
+ return Collections.unmodifiableSet(files);
+ }
+ }
+
+ }
+
+ public Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap<Key,Value> tabletsKeyValues)
+ throws IOException {
+ this(tabletServer, location, extent, trm, CachedConfiguration.getInstance(), tabletsKeyValues);
+ splitCreationTime = 0;
+ }
+
+ public Tablet(KeyExtent extent, TabletServer tabletServer, TabletResourceManager trm, SplitInfo info) throws IOException {
+ this(tabletServer, new Text(info.dir), extent, trm, CachedConfiguration.getInstance(), info.datafiles, info.time, info.initFlushID, info.initCompactID, info.lastLocation);
+ splitCreationTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Only visibile for testing
+ */
+ @VisibleForTesting
+ protected Tablet(TabletTime tabletTime, String tabletDirectory, int logId, Path location, DatafileManager datafileManager) {
+ this.tabletTime = tabletTime;
+ this.tabletDirectory = tabletDirectory;
+ this.logId = logId;
+ this.location = location;
+ this.datafileManager = datafileManager;
+ }
+
+ private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf,
+ SortedMap<Key,Value> tabletsKeyValues) throws IOException {
+ this(tabletServer, location, extent, trm, conf, VolumeManagerImpl.get(), tabletsKeyValues);
+ }
+
+ static private final List<LogEntry> EMPTY = Collections.emptyList();
+
+ private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf,
+ SortedMap<FileRef,DataFileValue> datafiles, String time, long initFlushID, long initCompactID, TServerInstance last) throws IOException {
+ this(tabletServer, location, extent, trm, conf, VolumeManagerImpl.get(), EMPTY, datafiles, time, last, new HashSet<FileRef>(), initFlushID, initCompactID);
+ }
+
+ private static String lookupTime(AccumuloConfiguration conf, KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
+ SortedMap<Key,Value> entries;
+
+ if (extent.isRootTablet()) {
+ return null;
+ } else {
+ entries = new TreeMap<Key,Value>();
+ Text rowName = extent.getMetadataEntry();
+ for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
+ if (entry.getKey().compareRow(rowName) == 0 && TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) {
+ entries.put(new Key(entry.getKey()), new Value(entry.getValue()));
+ }
+ }
+ }
+
+ // log.debug("extent : "+extent+" entries : "+entries);
+
+ if (entries.size() == 1)
+ return entries.values().iterator().next().toString();
+ return null;
+ }
+
+ private static SortedMap<FileRef,DataFileValue> lookupDatafiles(AccumuloConfiguration conf, VolumeManager fs, KeyExtent extent,
+ SortedMap<Key,Value> tabletsKeyValues) throws IOException {
+
+ TreeMap<FileRef,DataFileValue> datafiles = new TreeMap<FileRef,DataFileValue>();
+
+ if (extent.isRootTablet()) { // the meta0 tablet
+ Path location = new Path(MetadataTableUtil.getRootTabletDir());
+
+ // cleanUpFiles() has special handling for delete. files
+ FileStatus[] files = fs.listStatus(location);
+ Collection<String> goodPaths = RootFiles.cleanupReplacement(fs, files, true);
+ for (String good : goodPaths) {
+ Path path = new Path(good);
+ String filename = path.getName();
+ FileRef ref = new FileRef(location.toString() + "/" + filename, path);
+ DataFileValue dfv = new DataFileValue(0, 0);
+ datafiles.put(ref, dfv);
+ }
+ } else {
+
+ Text rowName = extent.getMetadataEntry();
+
+ String tableId = extent.isMeta() ? RootTable.ID : MetadataTable.ID;
+ ScannerImpl mdScanner = new ScannerImpl(HdfsZooInstance.getInstance(), SystemCredentials.get(), tableId, Authorizations.EMPTY);
+
+ // Commented out because when no data file is present, each tablet will scan through metadata table and return nothing
+ // reduced batch size to improve performance
+ // changed here after endKeys were implemented from 10 to 1000
+ mdScanner.setBatchSize(1000);
+
+ // leave these in, again, now using endKey for safety
+ mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+
+ mdScanner.setRange(new Range(rowName));
+
+ for (Entry<Key,Value> entry : mdScanner) {
+
+ if (entry.getKey().compareRow(rowName) != 0) {
+ break;
+ }
+
+ FileRef ref = new FileRef(fs, entry.getKey());
+ datafiles.put(ref, new DataFileValue(entry.getValue().get()));
+ }
+ }
+ return datafiles;
+ }
+
+ private static List<LogEntry> lookupLogEntries(KeyExtent ke, SortedMap<Key,Value> tabletsKeyValues) {
+ List<LogEntry> logEntries = new ArrayList<LogEntry>();
+
+ if (ke.isMeta()) {
+ try {
+ logEntries = MetadataTableUtil.getLogEntries(SystemCredentials.get(), ke);
+ } catch (Exception ex) {
+ throw new RuntimeException("Unable to read tablet log entries", ex);
+ }
+ } else {
+ log.debug("Looking at metadata " + tabletsKeyValues);
+ Text row = ke.getMetadataEntry();
+ for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
+ Key key = entry.getKey();
+ if (key.getRow().equals(row)) {
+ if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
+ logEntries.add(LogEntry.fromKeyValue(key, entry.getValue()));
+ }
+ }
+ }
+ }
+
+ log.debug("got " + logEntries + " for logs for " + ke);
+ return logEntries;
+ }
+
+ private static Set<FileRef> lookupScanFiles(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues, VolumeManager fs) throws IOException {
+ HashSet<FileRef> scanFiles = new HashSet<FileRef>();
+
+ Text row = extent.getMetadataEntry();
+ for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
+ Key key = entry.getKey();
+ if (key.getRow().equals(row) && key.getColumnFamily().equals(ScanFileColumnFamily.NAME)) {
+ scanFiles.add(new FileRef(fs, key));
+ }
+ }
+
+ return scanFiles;
+ }
+
+ private static long lookupFlushID(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
+ Text row = extent.getMetadataEntry();
+ for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
+ Key key = entry.getKey();
+ if (key.getRow().equals(row) && TabletsSection.ServerColumnFamily.FLUSH_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
+ return Long.parseLong(entry.getValue().toString());
+ }
+
+ return -1;
+ }
+
+ private static long lookupCompactID(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
+ Text row = extent.getMetadataEntry();
+ for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
+ Key key = entry.getKey();
+ if (key.getRow().equals(row) && TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
+ return Long.parseLong(entry.getValue().toString());
+ }
+
+ return -1;
+ }
+
+ private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, VolumeManager fs,
+ SortedMap<Key,Value> tabletsKeyValues) throws IOException {
+ this(tabletServer, location, extent, trm, conf, fs, lookupLogEntries(extent, tabletsKeyValues), lookupDatafiles(tabletServer.getSystemConfiguration(), fs,
+ extent, tabletsKeyValues), lookupTime(tabletServer.getSystemConfiguration(), extent, tabletsKeyValues), lookupLastServer(extent, tabletsKeyValues),
+ lookupScanFiles(extent, tabletsKeyValues, fs), lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent, tabletsKeyValues));
+ }
+
+ private static TServerInstance lookupLastServer(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
+ for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
+ if (entry.getKey().getColumnFamily().compareTo(TabletsSection.LastLocationColumnFamily.NAME) == 0) {
+ return new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier());
+ }
+ }
+ return null;
+ }
+
+ /**
+ * yet another constructor - this one allows us to avoid costly lookups into the Metadata table if we already know the files we need - as at split time
+ */
+ private Tablet(final TabletServer tabletServer, final Text location, final KeyExtent extent, final TabletResourceManager trm, final Configuration conf,
+ final VolumeManager fs, final List<LogEntry> rawLogEntries, final SortedMap<FileRef,DataFileValue> rawDatafiles, String time,
+ final TServerInstance lastLocation, Set<FileRef> scanFiles, long initFlushID, long initCompactID) throws IOException {
+
+ TabletFiles tabletPaths = VolumeUtil.updateTabletVolumes(tabletServer.getLock(), fs, extent, new TabletFiles(location.toString(), rawLogEntries,
+ rawDatafiles));
+
+ Path locationPath;
+
+ if (tabletPaths.dir.contains(":")) {
+ locationPath = new Path(tabletPaths.dir.toString());
+ } else {
+ locationPath = fs.getFullPath(FileType.TABLE, extent.getTableId().toString() + tabletPaths.dir.toString());
+ }
+
+ final List<LogEntry> logEntries = tabletPaths.logEntries;
+ final SortedMap<FileRef,DataFileValue> datafiles = tabletPaths.datafiles;
+
+ this.location = locationPath;
+ this.lastLocation = lastLocation;
+ this.tabletDirectory = tabletPaths.dir;
+ this.conf = conf;
+ this.acuTableConf = tabletServer.getTableConfiguration(extent);
+
+ this.fs = fs;
+ this.extent = extent;
+ this.tabletResources = trm;
+
+ this.lastFlushID = initFlushID;
+ this.lastCompactID = initCompactID;
+
+ if (extent.isRootTablet()) {
+ long rtime = Long.MIN_VALUE;
+ for (FileRef ref : datafiles.keySet()) {
+ Path path = ref.path();
+ FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
+ FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), tabletServer.getTableConfiguration(extent));
+ long maxTime = -1;
+ try {
+
+ while (reader.hasTop()) {
+ maxTime = Math.max(maxTime, reader.getTopKey().getTimestamp());
+ reader.next();
+ }
+
+ } finally {
+ reader.close();
+ }
+
+ if (maxTime > rtime) {
+ time = TabletTime.LOGICAL_TIME_ID + "" + maxTime;
+ rtime = maxTime;
+ }
+ }
+ }
+ if (time == null && datafiles.isEmpty() && extent.equals(RootTable.OLD_EXTENT)) {
+ // recovery... old root tablet has no data, so time doesn't matter:
+ time = TabletTime.LOGICAL_TIME_ID + "" + Long.MIN_VALUE;
+ }
+
+ this.tabletServer = tabletServer;
+ this.logId = tabletServer.createLogId(extent);
+
+ this.timer = new TabletStatsKeeper();
+
+ setupDefaultSecurityLabels(extent);
+
+ tabletMemory = new TabletMemory();
+ tabletTime = TabletTime.getInstance(time);
+ persistedTime = tabletTime.getTime();
+
+ acuTableConf.addObserver(configObserver = new ConfigurationObserver() {
+
+ private void reloadConstraints() {
+ constraintChecker.set(new ConstraintChecker(acuTableConf));
+ }
+
+ @Override
+ public void propertiesChanged() {
+ reloadConstraints();
+
+ try {
+ setupDefaultSecurityLabels(extent);
+ } catch (Exception e) {
+ log.error("Failed to reload default security labels for extent: " + extent.toString());
+ }
+ }
+
+ @Override
+ public void propertyChanged(String prop) {
+ if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey()))
+ reloadConstraints();
+ else if (prop.equals(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())) {
+ try {
+ log.info("Default security labels changed for extent: " + extent.toString());
+ setupDefaultSecurityLabels(extent);
+ } catch (Exception e) {
+ log.error("Failed to reload default security labels for extent: " + extent.toString());
+ }
+ }
+
+ }
+
+ @Override
+ public void sessionExpired() {
+ log.debug("Session expired, no longer updating per table props...");
+ }
+
+ });
+
+ acuTableConf.getNamespaceConfiguration().addObserver(configObserver);
+
+ // Force a load of any per-table properties
+ configObserver.propertiesChanged();
+
+ tabletResources.setTablet(this, acuTableConf);
+ if (!logEntries.isEmpty()) {
+ log.info("Starting Write-Ahead Log recovery for " + this.extent);
+ // count[0] = entries used on tablet
+ // count[1] = track max time from walog entries wihtout timestamps
+ final long[] count = new long[2];
+ final CommitSession commitSession = tabletMemory.getCommitSession();
+ count[1] = Long.MIN_VALUE;
+ try {
+ Set<String> absPaths = new HashSet<String>();
+ for (FileRef ref : datafiles.keySet())
+ absPaths.add(ref.path().toString());
+
+ tabletServer.recover(this.tabletServer.getFileSystem(), this, logEntries, absPaths, new MutationReceiver() {
+ @Override
+ public void receive(Mutation m) {
+ // LogReader.printMutation(m);
+ Collection<ColumnUpdate> muts = m.getUpdates();
+ for (ColumnUpdate columnUpdate : muts) {
+ if (!columnUpdate.hasTimestamp()) {
+ // if it is not a user set timestamp, it must have been set
+ // by the system
+ count[1] = Math.max(count[1], columnUpdate.getTimestamp());
+ }
+ }
+ tabletMemory.mutate(commitSession, Collections.singletonList(m));
+ count[0]++;
+ }
+ });
+
+ if (count[1] != Long.MIN_VALUE) {
+ tabletTime.useMaxTimeFromWALog(count[1]);
+ }
+ commitSession.updateMaxCommittedTime(tabletTime.getTime());
+
+ if (count[0] == 0) {
+ log.debug("No replayed mutations applied, removing unused entries for " + extent);
+ MetadataTableUtil.removeUnusedWALEntries(extent, logEntries, tabletServer.getLock());
+ logEntries.clear();
+ }
+
+ } catch (Throwable t) {
+ if (acuTableConf.getBoolean(Property.TABLE_FAILURES_IGNORE)) {
+ log.warn("Error recovering from log files: ", t);
+ } else {
+ throw new RuntimeException(t);
+ }
+ }
+ // make some closed references that represent the recovered logs
+ currentLogs = new HashSet<DfsLogger>();
+ for (LogEntry logEntry : logEntries) {
+ for (String log : logEntry.logSet) {
+ currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log, logEntry.getColumnQualifier().toString()));
+ }
+ }
+
+ log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries()
+ + " entries created)");
+ }
+
+ String contextName = acuTableConf.get(Property.TABLE_CLASSPATH);
+ if (contextName != null && !contextName.equals("")) {
+ // initialize context classloader, instead of possibly waiting for it to initialize for a scan
+ // TODO this could hang, causing other tablets to fail to load - ACCUMULO-1292
+ AccumuloVFSClassLoader.getContextManager().getClassLoader(contextName);
+ }
+
+ // do this last after tablet is completely setup because it
+ // could cause major compaction to start
+ datafileManager = new DatafileManager(datafiles);
+
+ computeNumEntries();
+
+ datafileManager.removeFilesAfterScan(scanFiles);
+
+ // look for hints of a failure on the previous tablet server
+ if (!logEntries.isEmpty() || needsMajorCompaction(MajorCompactionReason.NORMAL)) {
+ // look for any temp files hanging around
+ removeOldTemporaryFiles();
+ }
+
+ log.log(TLevel.TABLET_HIST, extent + " opened");
+ }
+
+ private void removeOldTemporaryFiles() {
+ // remove any temporary files created by a previous tablet server
+ try {
+ for (FileStatus tmp : fs.globStatus(new Path(location, "*_tmp"))) {
+ try {
+ log.debug("Removing old temp file " + tmp.getPath());
+ fs.delete(tmp.getPath());
+ } catch (IOException ex) {
+ log.error("Unable to remove old temp file " + tmp.getPath() + ": " + ex);
+ }
+ }
+ } catch (IOException ex) {
+ log.error("Error scanning for old temp files in " + location);
+ }
+ }
+
+ private void setupDefaultSecurityLabels(KeyExtent extent) {
+ if (extent.isMeta()) {
+ defaultSecurityLabel = new byte[0];
+ } else {
+ try {
+ ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
+ this.defaultSecurityLabel = cv.getExpression();
+ } catch (Exception e) {
+ log.error(e, e);
+ this.defaultSecurityLabel = new byte[0];
+ }
+ }
+ }
+
+ public static class KVEntry extends KeyValue {
+ public KVEntry(Key k, Value v) {
+ super(new Key(k), Arrays.copyOf(v.get(), v.get().length));
+ }
+
+ @Override
+ public String toString() {
+ return key.toString() + "=" + getValue();
+ }
+
+ int numBytes() {
+ return key.getSize() + getValue().get().length;
+ }
+
+ int estimateMemoryUsed() {
+ return key.getSize() + getValue().get().length + (9 * 32); // overhead is 32 per object
+ }
+ }
+
+ private LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges, HashSet<Column> columnSet, ArrayList<KVEntry> results,
+ long maxResultsSize) throws IOException {
+
+ LookupResult lookupResult = new LookupResult();
+
+ boolean exceededMemoryUsage = false;
+ boolean tabletClosed = false;
+
+ Set<ByteSequence> cfset = null;
+ if (columnSet.size() > 0)
+ cfset = LocalityGroupUtil.families(columnSet);
+
+ for (Range range : ranges) {
+
+ if (exceededMemoryUsage || tabletClosed) {
+ lookupResult.unfinishedRanges.add(range);
+ continue;
+ }
+
+ int entriesAdded = 0;
+
+ try {
+ if (cfset != null)
+ mmfi.seek(range, cfset, true);
+ else
+ mmfi.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
+
+ while (mmfi.hasTop()) {
+ Key key = mmfi.getTopKey();
+
+ KVEntry kve = new KVEntry(key, mmfi.getTopValue());
+ results.add(kve);
+ entriesAdded++;
+ lookupResult.bytesAdded += kve.estimateMemoryUsed();
+ lookupResult.dataSize += kve.numBytes();
+
+ exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize;
+
+ if (exceededMemoryUsage) {
+ addUnfinishedRange(lookupResult, range, key, false);
+ break;
+ }
+
+ mmfi.next();
+ }
+
+ } catch (TooManyFilesException tmfe) {
+ // treat this as a closed tablet, and let the client retry
+ log.warn("Tablet " + getExtent() + " has too many files, batch lookup can not run");
+ handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded);
+ tabletClosed = true;
+ } catch (IOException ioe) {
+ if (shutdownInProgress()) {
+ // assume HDFS shutdown hook caused this exception
+ log.debug("IOException while shutdown in progress ", ioe);
+ handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded);
+ tabletClosed = true;
+ } else {
+ throw ioe;
+ }
+ } catch (IterationInterruptedException iie) {
+ if (isClosed()) {
+ handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded);
+ tabletClosed = true;
+ } else {
+ throw iie;
+ }
+ } catch (TabletClosedException tce) {
+ handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded);
+ tabletClosed = true;
+ }
+
+ }
+
+ return lookupResult;
+ }
+
+ private void handleTabletClosedDuringScan(ArrayList<KVEntry> results, LookupResult lookupResult, boolean exceededMemoryUsage, Range range, int entriesAdded) {
+ if (exceededMemoryUsage)
+ throw new IllegalStateException("tablet should not exceed memory usage or close, not both");
+
+ if (entriesAdded > 0)
+ addUnfinishedRange(lookupResult, range, results.get(results.size() - 1).key, false);
+ else
+ lookupResult.unfinishedRanges.add(range);
+
+ lookupResult.closed = true;
+ }
+
+ private void addUnfinishedRange(LookupResult lookupResult, Range range, Key key, boolean inclusiveStartKey) {
+ if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) {
+ Range nlur = new Range(new Key(key), inclusiveStartKey, range.getEndKey(), range.isEndKeyInclusive());
+ lookupResult.unfinishedRanges.add(nlur);
+ }
+ }
+
+ public static interface KVReceiver {
+ void receive(List<KVEntry> matches) throws IOException;
+ }
+
+ class LookupResult {
+ List<Range> unfinishedRanges = new ArrayList<Range>();
+ long bytesAdded = 0;
+ long dataSize = 0;
+ boolean closed = false;
+ }
+
+ public LookupResult lookup(List<Range> ranges, HashSet<Column> columns, Authorizations authorizations, ArrayList<KVEntry> results, long maxResultSize,
+ List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag) throws IOException {
+
+ if (ranges.size() == 0) {
+ return new LookupResult();
+ }
+
+ ranges = Range.mergeOverlapping(ranges);
+ Collections.sort(ranges);
+
+ Range tabletRange = extent.toDataRange();
+ for (Range range : ranges) {
+ // do a test to see if this range falls within the tablet, if it does not
+ // then clip will throw an exception
+ tabletRange.clip(range);
+ }
+
+ ScanDataSource dataSource = new ScanDataSource(authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag);
+
+ LookupResult result = null;
+
+ try {
+ SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource);
+ result = lookup(iter, ranges, columns, results, maxResultSize);
+ return result;
+ } catch (IOException ioe) {
+ dataSource.close(true);
+ throw ioe;
+ } finally {
+ // code in finally block because always want
+ // to return mapfiles, even when exception is thrown
+ dataSource.close(false);
+
+ synchronized (this) {
+ queryCount += results.size();
+ if (result != null)
+ queryBytes += result.dataSize;
+ }
+ }
+ }
+
+ private Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, Set<Column> columns) throws IOException {
+
+ // log.info("In nextBatch..");
+
+ List<KVEntry> results = new ArrayList<KVEntry>();
+ Key key = null;
+
+ Value value;
+ long resultSize = 0L;
+ long resultBytes = 0L;
+
+ long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
+
+ if (columns.size() == 0) {
+ iter.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
+ } else {
+ iter.seek(range, LocalityGroupUtil.families(columns), true);
+ }
+
+ Key continueKey = null;
+ boolean skipContinueKey = false;
+
+ boolean endOfTabletReached = false;
+ while (iter.hasTop()) {
+
+ value = iter.getTopValue();
+ key = iter.getTopKey();
+
+ KVEntry kvEntry = new KVEntry(key, value); // copies key and value
+ results.add(kvEntry);
+ resultSize += kvEntry.estimateMemoryUsed();
+ resultBytes += kvEntry.numBytes();
+
+ if (resultSize >= maxResultsSize || results.size() >= num) {
+ continueKey = new Key(key);
+ skipContinueKey = true;
+ break;
+ }
+
+ iter.next();
+ }
+
+ if (iter.hasTop() == false) {
+ endOfTabletReached = true;
+ }
+
+ Batch retBatch = new Batch();
+ retBatch.numBytes = resultBytes;
+
+ if (!endOfTabletReached) {
+ retBatch.continueKey = continueKey;
+ retBatch.skipContinueKey = skipContinueKey;
+ } else {
+ retBatch.continueKey = null;
+ }
+
+ if (endOfTabletReached && results.size() == 0)
+ retBatch.results = null;
+ else
+ retBatch.results = results;
+
+ return retBatch;
+ }
+
+ /**
+ * Determine if a JVM shutdown is in progress.
+ *
+ */
+ private boolean shutdownInProgress() {
+ try {
+ Runtime.getRuntime().removeShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {}
+ }));
+ } catch (IllegalStateException ise) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private class Batch {
+ public boolean skipContinueKey;
+ public List<KVEntry> results;
+ public Key continueKey;
+ public long numBytes;
+ }
+
+ Scanner createScanner(Range range, int num, Set<Column> columns, Authorizations authorizations, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
+ boolean isolated, AtomicBoolean interruptFlag) {
+ // do a test to see if this range falls within the tablet, if it does not
+ // then clip will throw an exception
+ extent.toDataRange().clip(range);
+
+ ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated);
+ return new Scanner(range, opts);
+ }
+
+ class ScanBatch {
+ boolean more;
+ List<KVEntry> results;
+
+ ScanBatch(List<KVEntry> results, boolean more) {
+ this.results = results;
+ this.more = more;
+ }
+ }
+
+ class Scanner {
+
+ private ScanOptions options;
+ private Range range;
+ private SortedKeyValueIterator<Key,Value> isolatedIter;
+ private ScanDataSource isolatedDataSource;
+ private boolean sawException = false;
+ private boolean scanClosed = false;
+
+ Scanner(Range range, ScanOptions options) {
+ this.range = range;
+ this.options = options;
+ }
+
+ synchronized ScanBatch read() throws IOException, TabletClosedException {
+
+ if (sawException)
+ throw new IllegalStateException("Tried to use scanner after exception occurred.");
+
+ if (scanClosed)
+ throw new IllegalStateException("Tried to use scanner after it was closed.");
+
+ Batch results = null;
+
+ ScanDataSource dataSource;
+
+ if (options.isolated) {
+ if (isolatedDataSource == null)
+ isolatedDataSource = new ScanDataSource(options);
+ dataSource = isolatedDataSource;
+ } else {
+ dataSource = new ScanDataSource(options);
+ }
+
+ try {
+
+ SortedKeyValueIterator<Key,Value> iter;
+
+ if (options.isolated) {
+ if (isolatedIter == null)
+ isolatedIter = new SourceSwitchingIterator(dataSource, true);
+ else
+ isolatedDataSource.fileManager.reattach();
+ iter = isolatedIter;
+ } else {
+ iter = new SourceSwitchingIterator(dataSource, false);
+ }
+
+ results = nextBatch(iter, range, options.num, options.columnSet);
+
+ if (results.results == null) {
+ range = null;
+ return new ScanBatch(new ArrayList<Tablet.KVEntry>(), false);
+ } else if (results.continueKey == null) {
+ return new ScanBatch(results.results, false);
+ } else {
+ range = new Range(results.continueKey, !results.skipContinueKey, range.getEndKey(), range.isEndKeyInclusive());
+ return new ScanBatch(results.results, true);
+ }
+
+ } catch (IterationInterruptedException iie) {
+ sawException = true;
+ if (isClosed())
+ throw new TabletClosedException(iie);
+ else
+ throw iie;
+ } catch (IOException ioe) {
+ if (shutdownInProgress()) {
+ log.debug("IOException while shutdown in progress ", ioe);
+ throw new TabletClosedException(ioe); // assume IOException was caused by execution of HDFS shutdown hook
+ }
+
+ sawException = true;
+ dataSource.close(true);
+ throw ioe;
+ } catch (RuntimeException re) {
+ sawException = true;
+ throw re;
+ } finally {
+ // code in finally block because always want
+ // to return mapfiles, even when exception is thrown
+ if (!options.isolated)
+ dataSource.close(false);
+ else if (dataSource.fileManager != null)
+ dataSource.fileManager.detach();
+
+ synchronized (Tablet.this) {
+ if (results != null && results.results != null) {
+ long more = results.results.size();
+ queryCount += more;
+ queryBytes += results.numBytes;
+ }
+ }
+ }
+ }
+
+ // close and read are synchronized because can not call close on the data source while it is in use
+ // this cloud lead to the case where file iterators that are in use by a thread are returned
+ // to the pool... this would be bad
+ void close() {
+ options.interruptFlag.set(true);
+ synchronized (this) {
+ scanClosed = true;
+ if (isolatedDataSource != null)
+ isolatedDataSource.close(false);
+ }
+ }
+ }
+
+ static class ScanOptions {
+
+ // scan options
+ Authorizations authorizations;
+ byte[] defaultLabels;
+ Set<Column> columnSet;
+ List<IterInfo> ssiList;
+ Map<String,Map<String,String>> ssio;
+ AtomicBoolean interruptFlag;
+ int num;
+ boolean isolated;
+
+ ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set<Column> columnSet, List<IterInfo> ssiList,
+ Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, boolean isolated) {
+ this.num = num;
+ this.authorizations = authorizations;
+ this.defaultLabels = defaultLabels;
+ this.columnSet = columnSet;
+ this.ssiList = ssiList;
+ this.ssio = ssio;
+ this.interruptFlag = interruptFlag;
+ this.isolated = isolated;
+ }
+
+ }
+
+ class ScanDataSource implements DataSource {
+
+ // data source state
+ private ScanFileManager fileManager;
+ private SortedKeyValueIterator<Key,Value> iter;
+ private long expectedDeletionCount;
+ private List<MemoryIterator> memIters = null;
+ private long fileReservationId;
+ private AtomicBoolean interruptFlag;
+ private StatsIterator statsIterator;
+
+ ScanOptions options;
+
+ ScanDataSource(Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
+ AtomicBoolean interruptFlag) {
+ expectedDeletionCount = dataSourceDeletions.get();
+ this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false);
+ this.interruptFlag = interruptFlag;
+ }
+
+ ScanDataSource(ScanOptions options) {
+ expectedDeletionCount = dataSourceDeletions.get();
+ this.options = options;
+ this.interruptFlag = options.interruptFlag;
+ }
+
+ @Override
+ public DataSource getNewDataSource() {
+ if (!isCurrent()) {
+ // log.debug("Switching data sources during a scan");
+ if (memIters != null) {
+ tabletMemory.returnIterators(memIters);
+ memIters = null;
+ datafileManager.returnFilesForScan(fileReservationId);
+ fileReservationId = -1;
+ }
+
+ if (fileManager != null)
+ fileManager.releaseOpenFiles(false);
+
+ expectedDeletionCount = dataSourceDeletions.get();
+ iter = null;
+
+ return this;
+ } else
+ return this;
+ }
+
+ @Override
+ public boolean isCurrent() {
+ return expectedDeletionCount == dataSourceDeletions.get();
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
+ if (iter == null)
+ iter = createIterator();
+ return iter;
+ }
+
+ private SortedKeyValueIterator<Key,Value> createIterator() throws IOException {
+
+ Map<FileRef,DataFileValue> files;
+
+ synchronized (Tablet.this) {
+
+ if (memIters != null)
+ throw new IllegalStateException("Tried to create new scan iterator w/o releasing memory");
+
+ if (Tablet.this.closed)
+ throw new TabletClosedException();
+
+ if (interruptFlag.get())
+ throw new IterationInterruptedException(extent.toString() + " " + interruptFlag.hashCode());
+
+ // only acquire the file manager when we know the tablet is open
+ if (fileManager == null) {
+ fileManager = tabletResources.newScanFileManager();
+ activeScans.add(this);
+ }
+
+ if (fileManager.getNumOpenFiles() != 0)
+ throw new IllegalStateException("Tried to create new scan iterator w/o releasing files");
+
+ // set this before trying to get iterators in case
+ // getIterators() throws an exception
+ expectedDeletionCount = dataSourceDeletions.get();
+
+ memIters = tabletMemory.getIterators();
+ Pair<Long,Map<FileRef,DataFileValue>> reservation = datafileManager.reserveFilesForScan();
+ fileReservationId = reservation.getFirst();
+ files = reservation.getSecond();
+ }
+
+ Collection<InterruptibleIterator> mapfiles = fileManager.openFiles(files, options.isolated);
+
+ List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(mapfiles.size() + memIters.size());
+
+ iters.addAll(mapfiles);
+ iters.addAll(memIters);
+
+ for (SortedKeyValueIterator<Key,Value> skvi : iters)
+ ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag);
+
+ MultiIterator multiIter = new MultiIterator(iters, extent);
+
+ TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan, acuTableConf, fileManager, files);
+
+ statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, scannedCount);
+
+ DeletingIterator delIter = new DeletingIterator(statsIterator, false);
+
+ ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+
+ ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, options.columnSet);
+
+ VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.authorizations, options.defaultLabels);
+
+ return iterEnv.getTopLevelIterator(IteratorUtil
+ .loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.ssiList, options.ssio, iterEnv));
+ }
+
+ private void close(boolean sawErrors) {
+
+ if (memIters != null) {
+ tabletMemory.returnIterators(memIters);
+ memIters = null;
+ datafileManager.returnFilesForScan(fileReservationId);
+ fileReservationId = -1;
+ }
+
+ synchronized (Tablet.this) {
+ activeScans.remove(this);
+ if (activeScans.size() == 0)
+ Tablet.this.notifyAll();
+ }
+
+ if (fileManager != null) {
+ fileManager.releaseOpenFiles(sawErrors);
+ fileManager = null;
+ }
+
+ if (statsIterator != null) {
+ statsIterator.report();
+ }
+
+ }
+
+ public void interrupt() {
+ interruptFlag.set(true);
+ }
+
+ @Override
+ public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
+ throw new UnsupportedOperationException();
+ }
+
++ @Override
++ public void setInterruptFlag(AtomicBoolean flag) {
++ throw new UnsupportedOperationException();
++ }
++
+ }
+
+ private DataFileValue minorCompact(Configuration conf, VolumeManager fs, InMemoryMap memTable, FileRef tmpDatafile, FileRef newDatafile, FileRef mergeFile,
+ boolean hasQueueTime, long queued, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) {
+ boolean failed = false;
+ long start = System.currentTimeMillis();
+ timer.incrementStatusMinor();
+
+ long count = 0;
+
+ try {
+ Span span = Trace.start("write");
+ CompactionStats stats;
+ try {
+ count = memTable.getNumEntries();
+
+ DataFileValue dfv = null;
+ if (mergeFile != null)
+ dfv = datafileManager.getDatafileSizes().get(mergeFile);
+
+ MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv, tmpDatafile, acuTableConf, extent, mincReason);
+ stats = compactor.call();
+ } finally {
+ span.stop();
+ }
+ span = Trace.start("bringOnline");
+ try {
+ datafileManager.bringMinorCompactionOnline(tmpDatafile, newDatafile, mergeFile, new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()),
+ commitSession, flushId);
+ } finally {
+ span.stop();
+ }
+ return new DataFileValue(stats.getFileSize(), stats.getEntriesWritten());
+ } catch (Exception E) {
+ failed = true;
+ throw new RuntimeException(E);
+ } catch (Error E) {
+ // Weird errors like "OutOfMemoryError" when trying to create the thread for the compaction
+ failed = true;
+ throw new RuntimeException(E);
+ } finally {
+ try {
+ tabletMemory.finalizeMinC();
+ } catch (Throwable t) {
+ log.error("Failed to free tablet memory", t);
+ }
+
+ if (!failed) {
+ lastMinorCompactionFinishTime = System.currentTimeMillis();
+ }
+ if (tabletServer.mincMetrics.isEnabled())
+ tabletServer.mincMetrics.add(TabletServerMinCMetrics.minc, (lastMinorCompactionFinishTime - start));
+ if (hasQueueTime) {
+ timer.updateTime(Operation.MINOR, queued, start, count, failed);
+ if (tabletServer.mincMetrics.isEnabled())
+ tabletServer.mincMetrics.add(TabletServerMinCMetrics.queue, (start - queued));
+ } else
+ timer.updateTime(Operation.MINOR, start, count, failed);
+ }
+ }
+
+ private class MinorCompactionTask implements Runnable {
+
+ private long queued;
+ private CommitSession commitSession;
+ private DataFileValue stats;
+ private FileRef mergeFile;
+ private long flushId;
+ private MinorCompactionReason mincReason;
+
+ MinorCompactionTask(FileRef mergeFile, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) {
+ queued = System.currentTimeMillis();
+ minorCompactionWaitingToStart = true;
+ this.commitSession = commitSession;
+ this.mergeFile = mergeFile;
+ this.flushId = flushId;
+ this.mincReason = mincReason;
+ }
+
+ @Override
+ public void run() {
+ minorCompactionWaitingToStart = false;
+ minorCompactionInProgress = true;
+ Span minorCompaction = Trace.on("minorCompaction");
+ try {
+ FileRef newMapfileLocation = getNextMapFilename(mergeFile == null ? "F" : "M");
+ FileRef tmpFileRef = new FileRef(newMapfileLocation.path() + "_tmp");
+ Span span = Trace.start("waitForCommits");
+ synchronized (Tablet.this) {
+ commitSession.waitForCommitsToFinish();
+ }
+ span.stop();
+ span = Trace.start("start");
+ while (true) {
+ try {
+ // the purpose of the minor compaction start event is to keep track of the filename... in the case
+ // where the metadata table write for the minor compaction finishes and the process dies before
+ // writing the minor compaction finish event, then the start event+filename in metadata table will
+ // prevent recovery of duplicate data... the minor compaction start event could be written at any time
+ // before the metadata write for the minor compaction
+ tabletServer.minorCompactionStarted(commitSession, commitSession.getWALogSeq() + 1, newMapfileLocation.path().toString());
+ break;
+ } catch (IOException e) {
+ log.warn("Failed to write to write ahead log " + e.getMessage(), e);
+ }
+ }
+ span.stop();
+ span = Trace.start("compact");
+ this.stats = minorCompact(conf, fs, tabletMemory.getMinCMemTable(), tmpFileRef, newMapfileLocation, mergeFile, true, queued, commitSession, flushId,
+ mincReason);
+ span.stop();
+
+ if (needsSplit()) {
+ tabletServer.executeSplit(Tablet.this);
+ } else {
+ initiateMajorCompaction(MajorCompactionReason.NORMAL);
+ }
+ } catch (Throwable t) {
+ log.error("Unknown error during minor compaction for extent: " + getExtent(), t);
+ throw new RuntimeException(t);
+ } finally {
+ minorCompactionInProgress = false;
+ minorCompaction.data("extent", extent.toString());
+ minorCompaction.data("numEntries", Long.toString(this.stats.getNumEntries()));
+ minorCompaction.data("size", Long.toString(this.stats.getSize()));
+ minorCompaction.stop();
+ }
+ }
+ }
+
+ private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason mincReason) {
+ CommitSession oldCommitSession = tabletMemory.prepareForMinC();
+ otherLogs = currentLogs;
+ currentLogs = new HashSet<DfsLogger>();
+
+ FileRef mergeFile = null;
+ if (mincReason != MinorCompactionReason.RECOVERY) {
+ mergeFile = datafileManager.reserveMergingMinorCompactionFile();
+ }
+
+ return new MinorCompactionTask(mergeFile, oldCommitSession, flushId, mincReason);
+
+ }
+
+ void flush(long tableFlushID) {
+ boolean updateMetadata = false;
+ boolean initiateMinor = false;
+
+ try {
+
+ synchronized (this) {
+
+ // only want one thing at a time to update flush ID to ensure that metadata table and tablet in memory state are consistent
+ if (updatingFlushID)
+ return;
+
+ if (lastFlushID >= tableFlushID)
+ return;
+
+ if (closing || closed || tabletMemory.memoryReservedForMinC())
+ return;
+
+ if (tabletMemory.getMemTable().getNumEntries() == 0) {
+ lastFlushID = tableFlushID;
+ updatingFlushID = true;
+ updateMetadata = true;
+ } else
+ initiateMinor = true;
+ }
+
+ if (updateMetadata) {
+ Credentials creds = SystemCredentials.get();
+ // if multiple threads were allowed to update this outside of a sync block, then it would be
+ // a race condition
+ MetadataTableUtil.updateTabletFlushID(extent, tableFlushID, creds, tabletServer.getLock());
+ } else if (initiateMinor)
+ initiateMinorCompaction(tableFlushID, MinorCompactionReason.USER);
+
+ } finally {
+ if (updateMetadata) {
+ synchronized (this) {
+ updatingFlushID = false;
+ this.notifyAll();
+ }
+ }
+ }
+
+ }
+
+ boolean initiateMinorCompaction(MinorCompactionReason mincReason) {
+ if (isClosed()) {
+ // don't bother trying to get flush id if closed... could be closed after this check but that is ok... just trying to cut down on uneeded log messages....
+ return false;
+ }
+
+ // get the flush id before the new memmap is made available for write
+ long flushId;
+ try {
+ flushId = getFlushID();
+ } catch (NoNodeException e) {
+ log.info("Asked to initiate MinC when there was no flush id " + getExtent() + " " + e.getMessage());
+ return false;
+ }
+ return initiateMinorCompaction(flushId, mincReason);
+ }
+
+ boolean minorCompactNow(MinorCompactionReason mincReason) {
+ long flushId;
+ try {
+ flushId = getFlushID();
+ } catch (NoNodeException e) {
+ log.info("Asked to initiate MinC when there was no flush id " + getExtent() + " " + e.getMessage());
+ return false;
+ }
+ MinorCompactionTask mct = createMinorCompactionTask(flushId, mincReason);
+ if (mct == null)
+ return false;
+ mct.run();
+ return true;
+ }
+
+ boolean initiateMinorCompaction(long flushId, MinorCompactionReason mincReason) {
+ MinorCompactionTask mct = createMinorCompactionTask(flushId, mincReason);
+ if (mct == null)
+ return false;
+ tabletResources.executeMinorCompaction(mct);
+ return true;
+ }
+
+ private MinorCompactionTask createMinorCompactionTask(long flushId, MinorCompactionReason mincReason) {
+ MinorCompactionTask mct;
+ long t1, t2;
+
+ StringBuilder logMessage = null;
+
+ try {
+ synchronized (this) {
+ t1 = System.currentTimeMillis();
+
+ if (closing || closed || majorCompactionWaitingToStart || tabletMemory.memoryReservedForMinC() || tabletMemory.getMemTable().getNumEntries() == 0
+ || updatingFlushID) {
+
+ logMessage = new StringBuilder();
+
+ logMessage.append(extent.toString());
+ logMessage.append(" closing " + closing);
+ logMessage.append(" closed " + closed);
+ logMessage.append(" majorCompactionWaitingToStart " + majorCompactionWaitingToStart);
+ if (tabletMemory != null)
+ logMessage.append(" tabletMemory.memoryReservedForMinC() " + tabletMemory.memoryReservedForMinC());
+ if (tabletMemory != null && tabletMemory.getMemTable() != null)
+ logMessage.append(" tabletMemory.getMemTable().getNumEntries() " + tabletMemory.getMemTable().getNumEntries());
+ logMessage.append(" updatingFlushID " + updatingFlushID);
+
+ return null;
+ }
+
+ mct = prepareForMinC(flushId, mincReason);
+ t2 = System.currentTimeMillis();
+ }
+ } finally {
+ // log outside of sync block
+ if (logMessage != null && log.isDebugEnabled())
+ log.debug(logMessage);
+ }
+
+ log.debug(String.format("MinC initiate lock %.2f secs", (t2 - t1) / 1000.0));
+ return mct;
+ }
+
+ long getFlushID() throws NoNodeException {
+ try {
+ String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId()
+ + Constants.ZTABLE_FLUSH_ID;
+ return Long.parseLong(new String(ZooReaderWriter.getRetryingInstance().getData(zTablePath, null), Constants.UTF8));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (NumberFormatException nfe) {
+ throw new RuntimeException(nfe);
+ } catch (KeeperException ke) {
+ if (ke instanceof NoNodeException) {
+ throw (NoNodeException) ke;
+ } else {
+ throw new RuntimeException(ke);
+ }
+ }
+ }
+
+ long getCompactionCancelID() {
+ String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId()
+ + Constants.ZTABLE_COMPACT_CANCEL_ID;
+
+ try {
+ return Long.parseLong(new String(ZooReaderWriter.getRetryingInstance().getData(zTablePath, null), Constants.UTF8));
+ } catch (KeeperException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ Pair<Long,List<IteratorSetting>> getCompactionID() throws NoNodeException {
+ try {
+ String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId()
+ + Constants.ZTABLE_COMP
<TRUNCATED>