You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/05 06:42:55 UTC
[13/35] ACCUMULO-2041 extract tablet classes to new files,
move tablet-related code to o.a.a.tserver.tablet,
make member variables private
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
index e13594d..d1fece5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
@@ -42,7 +42,7 @@ public class TabletIteratorEnvironment implements IteratorEnvironment {
private final ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators = new ArrayList<SortedKeyValueIterator<Key,Value>>();
private Map<FileRef,DataFileValue> files;
- TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config) {
+ public TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config) {
if (scope == IteratorScope.majc)
throw new IllegalArgumentException("must set if compaction is full");
@@ -52,7 +52,7 @@ public class TabletIteratorEnvironment implements IteratorEnvironment {
this.fullMajorCompaction = false;
}
- TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config, ScanFileManager trm, Map<FileRef,DataFileValue> files) {
+ public TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config, ScanFileManager trm, Map<FileRef,DataFileValue> files) {
if (scope == IteratorScope.majc)
throw new IllegalArgumentException("must set if compaction is full");
@@ -63,7 +63,7 @@ public class TabletIteratorEnvironment implements IteratorEnvironment {
this.files = files;
}
- TabletIteratorEnvironment(IteratorScope scope, boolean fullMajC, AccumuloConfiguration config) {
+ public TabletIteratorEnvironment(IteratorScope scope, boolean fullMajC, AccumuloConfiguration config) {
if (scope != IteratorScope.majc)
throw new IllegalArgumentException("Tried to set maj compaction type when scope was " + scope);
@@ -101,7 +101,7 @@ public class TabletIteratorEnvironment implements IteratorEnvironment {
topLevelIterators.add(iter);
}
- SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) {
+ public SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) {
if (topLevelIterators.isEmpty())
return iter;
ArrayList<SortedKeyValueIterator<Key,Value>> allIters = new ArrayList<SortedKeyValueIterator<Key,Value>>(topLevelIterators);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 144d59b..1c07c44 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -195,17 +195,7 @@ import org.apache.accumulo.trace.instrument.Span;
import org.apache.accumulo.trace.instrument.Trace;
import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
import org.apache.accumulo.trace.thrift.TInfo;
-import org.apache.accumulo.tserver.Compactor.CompactionInfo;
import org.apache.accumulo.tserver.RowLocks.RowLock;
-import org.apache.accumulo.tserver.Tablet.CommitSession;
-import org.apache.accumulo.tserver.Tablet.KVEntry;
-import org.apache.accumulo.tserver.Tablet.LookupResult;
-import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
-import org.apache.accumulo.tserver.Tablet.ScanBatch;
-import org.apache.accumulo.tserver.Tablet.Scanner;
-import org.apache.accumulo.tserver.Tablet.SplitInfo;
-import org.apache.accumulo.tserver.Tablet.TConstraintViolationException;
-import org.apache.accumulo.tserver.Tablet.TabletClosedException;
import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
@@ -221,6 +211,17 @@ import org.apache.accumulo.tserver.metrics.TabletServerMBean;
import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
+import org.apache.accumulo.tserver.tablet.CommitSession;
+import org.apache.accumulo.tserver.tablet.CompactionInfo;
+import org.apache.accumulo.tserver.tablet.CompactionWatcher;
+import org.apache.accumulo.tserver.tablet.Compactor;
+import org.apache.accumulo.tserver.tablet.KVEntry;
+import org.apache.accumulo.tserver.tablet.Tablet.LookupResult;
+import org.apache.accumulo.tserver.tablet.ScanBatch;
+import org.apache.accumulo.tserver.tablet.Scanner;
+import org.apache.accumulo.tserver.tablet.SplitInfo;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletClosedException;
import org.apache.commons.collections.map.LRUMap;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
@@ -252,7 +253,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
private TabletServerLogger logger;
- protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
+ protected final TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
+ public TabletServerMinCMetrics getMinCMetrics() {
+ return mincMetrics;
+ }
private ServerConfiguration serverConfig;
private LogSorter logSorter = null;
@@ -629,7 +633,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
- static class TservConstraintEnv implements Environment {
+ public static class TservConstraintEnv implements Environment {
private TCredentials credentials;
private SecurityOperation security;
@@ -641,7 +645,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
this.credentials = credentials;
}
- void setExtent(KeyExtent ke) {
+ public void setExtent(KeyExtent ke) {
this.ke = ke;
}
@@ -1659,16 +1663,16 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
commitSession.commit(mutations);
- Tablet tablet = commitSession.getTablet();
+ KeyExtent extent = commitSession.getExtent();
- if (tablet == us.currentTablet) {
+ if (extent == us.currentTablet.getExtent()) {
// because constraint violations may filter out some
// mutations, for proper
// accounting with the client code, need to increment
// the count based
// on the original number of mutations from the client
// NOT the filtered number
- us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size());
+ us.successfulCommits.increment(us.currentTablet, us.queuedMutations.get(us.currentTablet).size());
}
}
long t2 = System.currentTimeMillis();
@@ -2141,7 +2145,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
KeyExtent ke = entry.getKey();
if (ke.getTableId().compareTo(text) == 0) {
Tablet tablet = entry.getValue();
- TabletStats stats = tablet.timer.getTabletStats();
+ TabletStats stats = tablet.getTabletStats();
stats.extent = ke.toThrift();
stats.ingestRate = tablet.ingestRate();
stats.queryRate = tablet.queryRate();
@@ -2563,11 +2567,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
- boolean isMajorCompactionDisabled() {
+ public boolean isMajorCompactionDisabled() {
return majorCompactorDisabled;
}
- void executeSplit(Tablet tablet) {
+ public void executeSplit(Tablet tablet) {
resourceManager.executeSplit(tablet.getExtent(), new LoggingRunnable(log, new SplitRunner(tablet)));
}
@@ -2617,7 +2621,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
synchronized (tablet) {
- if (tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL) || tablet.majorCompactionQueued() || tablet.majorCompactionRunning()) {
+ if (tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL) || tablet.isMajorCompactionQueued() || tablet.isMajorCompactionRunning()) {
numMajorCompactionsInProgress++;
continue;
}
@@ -2683,16 +2687,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
Entry<KeyExtent,SplitInfo> first = tabletInfo.firstEntry();
TabletResourceManager newTrm0 = resourceManager.createTabletResourceManager(first.getKey(), getTableConfiguration(first.getKey()));
- newTablets[0] = new Tablet(first.getKey(), TabletServer.this, newTrm0, first.getValue());
+ newTablets[0] = new Tablet(TabletServer.this, first.getKey(), newTrm0, first.getValue());
Entry<KeyExtent,SplitInfo> last = tabletInfo.lastEntry();
TabletResourceManager newTrm1 = resourceManager.createTabletResourceManager(last.getKey(), getTableConfiguration(last.getKey()));
- newTablets[1] = new Tablet(last.getKey(), TabletServer.this, newTrm1, last.getValue());
+ newTablets[1] = new Tablet(TabletServer.this, last.getKey(), newTrm1, last.getValue());
// roll tablet stats over into tablet server's statsKeeper object as
// historical data
- statsKeeper.saveMinorTimes(tablet.timer);
- statsKeeper.saveMajorTimes(tablet.timer);
+ statsKeeper.saveMajorMinorTimes(tablet.getTabletStats());
// lose the reference to the old tablet and open two new ones
synchronized (onlineTablets) {
@@ -2719,7 +2722,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
private BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>();
// add a message for the main thread to send back to the master
- void enqueueMasterMessage(MasterMessage m) {
+ public void enqueueMasterMessage(MasterMessage m) {
masterMessages.addLast(m);
}
@@ -2808,9 +2811,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// roll tablet stats over into tablet server's statsKeeper object as
// historical data
- statsKeeper.saveMinorTimes(t.timer);
- statsKeeper.saveMajorTimes(t.timer);
-
+ statsKeeper.saveMajorMinorTimes(t.getTabletStats());
log.info("unloaded " + extent);
}
@@ -2914,7 +2915,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// this opens the tablet file and fills in the endKey in the
// extent
locationToOpen = VolumeUtil.switchRootTabletVolume(extent, locationToOpen);
- tablet = new Tablet(TabletServer.this, locationToOpen, extent, trm, tabletsKeyValues);
+ tablet = new Tablet(TabletServer.this, extent, locationToOpen, trm, tabletsKeyValues);
/*
* If a minor compaction starts after a tablet opens, this indicates a log recovery occurred. This recovered data must be minor compacted.
*
@@ -3018,7 +3019,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
private static ObjectName OBJECT_NAME = null;
- static AtomicLong seekCount = new AtomicLong(0);
+ public static final AtomicLong seekCount = new AtomicLong(0);
public TabletStatsKeeper getStatsKeeper() {
return statsKeeper;
@@ -3098,7 +3099,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
return address;
}
- ZooLock getLock() {
+ public ZooLock getLock() {
return tabletServerLock;
}
@@ -3452,7 +3453,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
return clientAddress.getHostText() + ":" + clientAddress.getPort();
}
- TServerInstance getTabletSession() {
+ public TServerInstance getTabletSession() {
String address = getClientAddressString();
if (address == null)
return null;
@@ -3596,13 +3597,13 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
table.scanRate += tablet.scanRate();
long recsInMemory = tablet.getNumEntriesInMemory();
table.recsInMemory += recsInMemory;
- if (tablet.minorCompactionRunning())
+ if (tablet.isMinorCompactionRunning())
table.minors.running++;
- if (tablet.minorCompactionQueued())
+ if (tablet.isMinorCompactionQueued())
table.minors.queued++;
- if (tablet.majorCompactionRunning())
+ if (tablet.isMajorCompactionRunning())
table.majors.running++;
- if (tablet.majorCompactionQueued())
+ if (tablet.isMajorCompactionQueued())
table.majors.queued++;
}
@@ -3775,7 +3776,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (this.isEnabled()) {
int result = 0;
for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
- if (tablet.majorCompactionQueued())
+ if (tablet.isMajorCompactionQueued())
result++;
}
return result;
@@ -3788,7 +3789,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (this.isEnabled()) {
int result = 0;
for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
- if (tablet.minorCompactionRunning())
+ if (tablet.isMinorCompactionRunning())
result++;
}
return result;
@@ -3801,7 +3802,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (this.isEnabled()) {
int result = 0;
for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
- if (tablet.minorCompactionQueued())
+ if (tablet.isMinorCompactionQueued())
result++;
}
return result;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index f26c74b..095f8d5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -54,11 +54,11 @@ import org.apache.accumulo.server.tabletserver.TabletState;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.trace.instrument.TraceExecutorService;
import org.apache.accumulo.tserver.FileManager.ScanFileManager;
-import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
import org.apache.accumulo.tserver.compaction.CompactionStrategy;
import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.accumulo.tserver.tablet.Tablet;
import org.apache.log4j.Logger;
/**
@@ -69,34 +69,34 @@ import org.apache.log4j.Logger;
*/
public class TabletServerResourceManager {
- private ExecutorService minorCompactionThreadPool;
- private ExecutorService majorCompactionThreadPool;
- private ExecutorService rootMajorCompactionThreadPool;
- private ExecutorService defaultMajorCompactionThreadPool;
- private ExecutorService splitThreadPool;
- private ExecutorService defaultSplitThreadPool;
- private ExecutorService defaultMigrationPool;
- private ExecutorService migrationPool;
- private ExecutorService assignmentPool;
- private ExecutorService assignMetaDataPool;
- private ExecutorService readAheadThreadPool;
- private ExecutorService defaultReadAheadThreadPool;
- private Map<String,ExecutorService> threadPools = new TreeMap<String,ExecutorService>();
+ private static final Logger log = Logger.getLogger(TabletServerResourceManager.class);
+
+ private final ExecutorService minorCompactionThreadPool;
+ private final ExecutorService majorCompactionThreadPool;
+ private final ExecutorService rootMajorCompactionThreadPool;
+ private final ExecutorService defaultMajorCompactionThreadPool;
+ private final ExecutorService splitThreadPool;
+ private final ExecutorService defaultSplitThreadPool;
+ private final ExecutorService defaultMigrationPool;
+ private final ExecutorService migrationPool;
+ private final ExecutorService assignmentPool;
+ private final ExecutorService assignMetaDataPool;
+ private final ExecutorService readAheadThreadPool;
+ private final ExecutorService defaultReadAheadThreadPool;
+ private final Map<String,ExecutorService> threadPools = new TreeMap<String,ExecutorService>();
private final VolumeManager fs;
- private FileManager fileManager;
+ private final FileManager fileManager;
- private MemoryManager memoryManager;
+ private final MemoryManager memoryManager;
- private MemoryManagementFramework memMgmt;
+ private final MemoryManagementFramework memMgmt;
private final LruBlockCache _dCache;
private final LruBlockCache _iCache;
private final ServerConfiguration conf;
- private static final Logger log = Logger.getLogger(TabletServerResourceManager.class);
-
private ExecutorService addEs(String name, ExecutorService tp) {
if (threadPools.containsKey(name)) {
throw new IllegalArgumentException("Cannot create two executor services with same name " + name);
@@ -210,10 +210,10 @@ public class TabletServerResourceManager {
private static class TabletStateImpl implements TabletState, Cloneable {
- private long lct;
- private Tablet tablet;
- private long mts;
- private long mcmts;
+ private final long lct;
+ private final Tablet tablet;
+ private final long mts;
+ private final long mcmts;
public TabletStateImpl(Tablet t, long mts, long lct, long mcmts) {
this.tablet = t;
@@ -249,11 +249,12 @@ public class TabletServerResourceManager {
private class MemoryManagementFramework {
private final Map<KeyExtent,TabletStateImpl> tabletReports;
- private LinkedBlockingQueue<TabletStateImpl> memUsageReports;
+ private final LinkedBlockingQueue<TabletStateImpl> memUsageReports;
private long lastMemCheckTime = System.currentTimeMillis();
private long maxMem;
- private Thread memoryGuardThread;
- private Thread minorCompactionInitiatorThread;
+ private long lastMemTotal = 0;
+ private final Thread memoryGuardThread;
+ private final Thread minorCompactionInitiatorThread;
MemoryManagementFramework() {
tabletReports = Collections.synchronizedMap(new HashMap<KeyExtent,TabletStateImpl>());
@@ -287,8 +288,6 @@ public class TabletServerResourceManager {
minorCompactionInitiatorThread.start();
}
- private long lastMemTotal = 0;
-
private void processTabletMemStats() {
while (true) {
try {
@@ -494,7 +493,7 @@ public class TabletServerResourceManager {
lastReportedCommitTime = System.currentTimeMillis();
}
- synchronized ScanFileManager newScanFileManager() {
+ public synchronized ScanFileManager newScanFileManager() {
if (closed)
throw new IllegalStateException("closed");
return fileManager.newScanFileManager(extent);
@@ -504,8 +503,8 @@ public class TabletServerResourceManager {
// BEGIN methods that Tablets call to manage memory
- private AtomicLong lastReportedSize = new AtomicLong();
- private AtomicLong lastReportedMincSize = new AtomicLong();
+ private final AtomicLong lastReportedSize = new AtomicLong();
+ private final AtomicLong lastReportedMincSize = new AtomicLong();
private volatile long lastReportedCommitTime = 0;
public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize) {
@@ -544,7 +543,7 @@ public class TabletServerResourceManager {
// BEGIN methods that Tablets call to make decisions about major compaction
// when too many files are open, we may want tablets to compact down
// to one map file
- boolean needsMajorCompaction(SortedMap<FileRef,DataFileValue> tabletFiles, MajorCompactionReason reason) {
+ public boolean needsMajorCompaction(SortedMap<FileRef,DataFileValue> tabletFiles, MajorCompactionReason reason) {
if (closed)
return false;// throw new IOException("closed");
@@ -585,11 +584,11 @@ public class TabletServerResourceManager {
// tablets call this method to run minor compactions,
// this allows us to control how many minor compactions
// run concurrently in a tablet server
- void executeMinorCompaction(final Runnable r) {
+ public void executeMinorCompaction(final Runnable r) {
minorCompactionThreadPool.execute(new LoggingRunnable(log, r));
}
- void close() throws IOException {
+ public void close() throws IOException {
// always obtain locks in same order to avoid deadlock
synchronized (TabletServerResourceManager.this) {
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
index 58e16be..d914ac6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
@@ -81,6 +81,11 @@ public class TabletStatsKeeper {
}
+ public void saveMajorMinorTimes(TabletStats t) {
+ ActionStatsUpdator.update(minor, t.minors);
+ ActionStatsUpdator.update(major, t.majors);
+ }
+
public void saveMinorTimes(TabletStatsKeeper t) {
ActionStatsUpdator.update(minor, t.minor);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 871f4ae..9fec437 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -36,10 +36,10 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.tserver.Tablet.CommitSession;
import org.apache.accumulo.tserver.TabletMutations;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
+import org.apache.accumulo.tserver.tablet.CommitSession;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
@@ -81,7 +81,7 @@ public class TabletServerLogger {
}
private static boolean enabled(CommitSession commitSession) {
- return enabled(commitSession.getTablet().getTableConfiguration());
+ return commitSession.getUseWAL();
}
static private abstract class TestCallWithWriteLock {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java
new file mode 100644
index 0000000..73434c6
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.List;
+
+import org.apache.accumulo.core.data.Key;
+
+class Batch {
+ final boolean skipContinueKey;
+ final List<KVEntry> results;
+ final Key continueKey;
+ final long numBytes;
+
+ Batch(boolean skipContinueKey, List<KVEntry> results, Key continueKey, long numBytes) {
+ this.skipContinueKey = skipContinueKey;
+ this.results = results;
+ this.continueKey = continueKey;
+ this.numBytes = numBytes;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
new file mode 100644
index 0000000..6402797
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.log4j.Logger;
+
+public class CommitSession {
+
+ private static final Logger log = Logger.getLogger(CommitSession.class);
+
+ private final int seq;
+ private final InMemoryMap memTable;
+ private final TabletCommitter committer;
+
+ private int commitsInProgress;
+ private long maxCommittedTime = Long.MIN_VALUE;
+
+ CommitSession(TabletCommitter committer, int seq, InMemoryMap imm) {
+ this.seq = seq;
+ this.memTable = imm;
+ this.committer = committer;
+ commitsInProgress = 0;
+ }
+
+ public int getWALogSeq() {
+ return seq;
+ }
+
+ public void decrementCommitsInProgress() {
+ if (commitsInProgress < 1)
+ throw new IllegalStateException("commitsInProgress = " + commitsInProgress);
+
+ commitsInProgress--;
+ if (commitsInProgress == 0)
+ committer.notifyAll();
+ }
+
+ public void incrementCommitsInProgress() {
+ if (commitsInProgress < 0)
+ throw new IllegalStateException("commitsInProgress = " + commitsInProgress);
+
+ commitsInProgress++;
+ }
+
+ public void waitForCommitsToFinish() {
+ while (commitsInProgress > 0) {
+ try {
+ committer.wait(50);
+ } catch (InterruptedException e) {
+ log.warn(e, e);
+ }
+ }
+ }
+
+ public void abortCommit(List<Mutation> value) {
+ committer.abortCommit(this, value);
+ }
+
+ public void commit(List<Mutation> mutations) {
+ committer.commit(this, mutations);
+ }
+
+ public TabletCommitter getTablet() {
+ return committer;
+ }
+
+ public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) {
+ return committer.beginUpdatingLogsUsed(memTable, copy, mincFinish);
+ }
+
+ public void finishUpdatingLogsUsed() {
+ committer.finishUpdatingLogsUsed();
+ }
+
+ public int getLogId() {
+ return committer.getLogId();
+ }
+
+ public KeyExtent getExtent() {
+ return committer.getExtent();
+ }
+
+ public void updateMaxCommittedTime(long time) {
+ maxCommittedTime = Math.max(time, maxCommittedTime);
+ }
+
+ public long getMaxCommittedTime() {
+ if (maxCommittedTime == Long.MIN_VALUE)
+ throw new IllegalStateException("Tried to read max committed time when it was never set");
+ return maxCommittedTime;
+ }
+
+ public boolean getUseWAL() {
+ return committer.getUseWAL();
+ }
+
+ public void mutate(List<Mutation> mutations) {
+ memTable.mutate(mutations);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java
new file mode 100644
index 0000000..ab57d65
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java
@@ -0,0 +1,113 @@
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionType;
+import org.apache.accumulo.server.fs.FileRef;
+
+public class CompactionInfo {
+
+ private final Compactor compactor;
+ private final String localityGroup;
+ private final long entriesRead;
+ private final long entriesWritten;
+
+ CompactionInfo(Compactor compactor) {
+ this.localityGroup = compactor.getCurrentLocalityGroup();
+ this.entriesRead = compactor.getEntriesRead();
+ this.entriesWritten = compactor.getEntriesWritten();
+ this.compactor = compactor;
+ }
+
+ public long getID() {
+ return compactor.getCompactorID();
+ }
+
+ public KeyExtent getExtent() {
+ return compactor.getExtent();
+ }
+
+ public long getEntriesRead() {
+ return entriesRead;
+ }
+
+ public long getEntriesWritten() {
+ return entriesWritten;
+ }
+
+ public Thread getThread() {
+ return compactor.thread;
+ }
+
+ public String getOutputFile() {
+ return compactor.getOutputFile();
+ }
+
+ public ActiveCompaction toThrift() {
+
+ CompactionType type;
+
+ if (compactor.hasIMM())
+ if (compactor.getFilesToCompact().size() > 0)
+ type = CompactionType.MERGE;
+ else
+ type = CompactionType.MINOR;
+ else if (!compactor.willPropogateDeletes())
+ type = CompactionType.FULL;
+ else
+ type = CompactionType.MAJOR;
+
+ CompactionReason reason;
+
+ if (compactor.hasIMM())
+ switch (compactor.getMinCReason()) {
+ case USER:
+ reason = CompactionReason.USER;
+ break;
+ case CLOSE:
+ reason = CompactionReason.CLOSE;
+ break;
+ case SYSTEM:
+ default:
+ reason = CompactionReason.SYSTEM;
+ break;
+ }
+ else
+ switch (compactor.getMajorCompactionReason()) {
+ case USER:
+ reason = CompactionReason.USER;
+ break;
+ case CHOP:
+ reason = CompactionReason.CHOP;
+ break;
+ case IDLE:
+ reason = CompactionReason.IDLE;
+ break;
+ case NORMAL:
+ default:
+ reason = CompactionReason.SYSTEM;
+ break;
+ }
+
+ List<IterInfo> iiList = new ArrayList<IterInfo>();
+ Map<String,Map<String,String>> iterOptions = new HashMap<String,Map<String,String>>();
+
+ for (IteratorSetting iterSetting : compactor.getIterators()) {
+ iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(), iterSetting.getName()));
+ iterOptions.put(iterSetting.getName(), iterSetting.getOptions());
+ }
+ List<String> filesToCompact = new ArrayList<String>();
+ for (FileRef ref : compactor.getFilesToCompact())
+ filesToCompact.add(ref.toString());
+ return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.getStartTime(), filesToCompact,
+ compactor.getOutputFile(), type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionRunner.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionRunner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionRunner.java
new file mode 100644
index 0000000..de5a66d
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionRunner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
+
+class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
+
+ private final Tablet tablet;
+ private final MajorCompactionReason reason;
+ private final long queued;
+
+ public CompactionRunner(Tablet tablet, MajorCompactionReason reason) {
+ this.tablet = tablet;
+ queued = System.currentTimeMillis();
+ this.reason = reason;
+ }
+
+ @Override
+ public void run() {
+ if (tablet.getTabletServer().isMajorCompactionDisabled()) {
+ // this will make compaction tasks that were queued when shutdown was
+ // initiated exit
+ tablet.removeMajorCompactionQueuedReason(reason);
+ return;
+ }
+
+ tablet.majorCompact(reason, queued);
+
+ // if there is more work to be done, queue another major compaction
+ synchronized (tablet) {
+ if (reason == MajorCompactionReason.NORMAL && tablet.needsMajorCompaction(reason))
+ tablet.initiateMajorCompaction(reason);
+ }
+ }
+
+ // We used to synchronize on the Tablet before fetching this information,
+ // but this method is called by the compaction queue thread to re-order the compactions.
+ // The compaction queue holds a lock during this sort.
+ // A tablet lock can be held while putting itself on the queue, so we can't lock the tablet
+ // while pulling information used to sort the tablets in the queue, or we may get deadlocked.
+ // See ACCUMULO-1110.
+ private int getNumFiles() {
+ return tablet.getDatafileManager().getNumFiles();
+ }
+
+ @Override
+ public int compareTo(CompactionRunner o) {
+ int cmp = reason.compareTo(o.reason);
+ if (cmp != 0)
+ return cmp;
+
+ if (reason == MajorCompactionReason.USER || reason == MajorCompactionReason.CHOP) {
+ // for these types of compactions want to do the oldest first
+ cmp = (int) (queued - o.queued);
+ if (cmp != 0)
+ return cmp;
+ }
+
+ return o.getNumFiles() - this.getNumFiles();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionStats.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionStats.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionStats.java
new file mode 100644
index 0000000..69832e9
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionStats.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+public class CompactionStats {
+ private long entriesRead;
+ private long entriesWritten;
+ private long fileSize;
+
+ CompactionStats(long er, long ew) {
+ this.setEntriesRead(er);
+ this.setEntriesWritten(ew);
+ }
+
+ public CompactionStats() {}
+
+ private void setEntriesRead(long entriesRead) {
+ this.entriesRead = entriesRead;
+ }
+
+ public long getEntriesRead() {
+ return entriesRead;
+ }
+
+ private void setEntriesWritten(long entriesWritten) {
+ this.entriesWritten = entriesWritten;
+ }
+
+ public long getEntriesWritten() {
+ return entriesWritten;
+ }
+
+ public void add(CompactionStats mcs) {
+ this.entriesRead += mcs.entriesRead;
+ this.entriesWritten += mcs.entriesWritten;
+ }
+
+ public void setFileSize(long fileSize) {
+ this.fileSize = fileSize;
+ }
+
+ public long getFileSize() {
+ return this.fileSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java
new file mode 100644
index 0000000..1ca1f33
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.log4j.Logger;
+
+/**
+ *
+ */
+public class CompactionWatcher implements Runnable {
+ private final Map<List<Long>,ObservedCompactionInfo> observedCompactions = new HashMap<List<Long>,ObservedCompactionInfo>();
+ private final AccumuloConfiguration config;
+ private static boolean watching = false;
+
+ private static class ObservedCompactionInfo {
+ CompactionInfo compactionInfo;
+ long firstSeen;
+ boolean loggedWarning;
+
+ ObservedCompactionInfo(CompactionInfo ci, long time) {
+ this.compactionInfo = ci;
+ this.firstSeen = time;
+ }
+ }
+
+ public CompactionWatcher(AccumuloConfiguration config) {
+ this.config = config;
+ }
+
+ public void run() {
+ List<CompactionInfo> runningCompactions = Compactor.getRunningCompactions();
+
+ Set<List<Long>> newKeys = new HashSet<List<Long>>();
+
+ long time = System.currentTimeMillis();
+
+ for (CompactionInfo ci : runningCompactions) {
+ List<Long> compactionKey = Arrays.asList(ci.getID(), ci.getEntriesRead(), ci.getEntriesWritten());
+ newKeys.add(compactionKey);
+
+ if (!observedCompactions.containsKey(compactionKey)) {
+ observedCompactions.put(compactionKey, new ObservedCompactionInfo(ci, time));
+ }
+ }
+
+ // look for compactions that finished or made progress and logged a warning
+ HashMap<List<Long>,ObservedCompactionInfo> copy = new HashMap<List<Long>,ObservedCompactionInfo>(observedCompactions);
+ copy.keySet().removeAll(newKeys);
+
+ for (ObservedCompactionInfo oci : copy.values()) {
+ if (oci.loggedWarning) {
+ Logger.getLogger(CompactionWatcher.class).info("Compaction of " + oci.compactionInfo.getExtent() + " is no longer stuck");
+ }
+ }
+
+ // remove any compaction that completed or made progress
+ observedCompactions.keySet().retainAll(newKeys);
+
+ long warnTime = config.getTimeInMillis(Property.TSERV_COMPACTION_WARN_TIME);
+
+ // check for stuck compactions
+ for (ObservedCompactionInfo oci : observedCompactions.values()) {
+ if (time - oci.firstSeen > warnTime && !oci.loggedWarning) {
+ Thread compactionThread = oci.compactionInfo.getThread();
+ if (compactionThread != null) {
+ StackTraceElement[] trace = compactionThread.getStackTrace();
+ Exception e = new Exception("Possible stack trace of compaction stuck on " + oci.compactionInfo.getExtent());
+ e.setStackTrace(trace);
+ Logger.getLogger(CompactionWatcher.class).warn(
+ "Compaction of " + oci.compactionInfo.getExtent() + " to " + oci.compactionInfo.getOutputFile() + " has not made progress for at least "
+ + (time - oci.firstSeen) + "ms", e);
+ oci.loggedWarning = true;
+ }
+ }
+ }
+ }
+
+ public static synchronized void startWatching(AccumuloConfiguration config) {
+ if (!watching) {
+ SimpleTimer.getInstance(config).schedule(new CompactionWatcher(config), 10000, 10000);
+ watching = true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
new file mode 100644
index 0000000..2a3e2f4
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReportingIterator;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.MinorCompactionReason;
+import org.apache.accumulo.tserver.TabletIteratorEnvironment;
+import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+
+public class Compactor implements Callable<CompactionStats> {
+ private static final Logger log = Logger.getLogger(Compactor.class);
+ private static final AtomicLong nextCompactorID = new AtomicLong(0);
+
+ public static class CountingIterator extends WrappingIterator {
+
+ private long count;
+ private final ArrayList<CountingIterator> deepCopies;
+ private final AtomicLong entriesRead;
+
+ @Override
+ public CountingIterator deepCopy(IteratorEnvironment env) {
+ return new CountingIterator(this, env);
+ }
+
+ private CountingIterator(CountingIterator other, IteratorEnvironment env) {
+ setSource(other.getSource().deepCopy(env));
+ count = 0;
+ this.deepCopies = other.deepCopies;
+ this.entriesRead = other.entriesRead;
+ deepCopies.add(this);
+ }
+
+ public CountingIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong entriesRead) {
+ deepCopies = new ArrayList<Compactor.CountingIterator>();
+ this.setSource(source);
+ count = 0;
+ this.entriesRead = entriesRead;
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void next() throws IOException {
+ super.next();
+ count++;
+ if (count % 1024 == 0) {
+ entriesRead.addAndGet(1024);
+ }
+ }
+
+ public long getCount() {
+ long sum = 0;
+ for (CountingIterator dc : deepCopies) {
+ sum += dc.count;
+ }
+
+ return count + sum;
+ }
+ }
+
+
+ public static class CompactionCanceledException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public interface CompactionEnv {
+
+ boolean isCompactionEnabled();
+
+ IteratorScope getIteratorScope();
+ }
+
+ private final Map<FileRef,DataFileValue> filesToCompact;
+ private final InMemoryMap imm;
+ private final FileRef outputFile;
+ private final boolean propogateDeletes;
+ private final AccumuloConfiguration acuTableConf;
+ private final CompactionEnv env;
+ private final VolumeManager fs;
+ protected final KeyExtent extent;
+ private final List<IteratorSetting> iterators;
+
+ // things to report
+ private String currentLocalityGroup = "";
+ private final long startTime;
+
+ private int reason;
+
+ private final AtomicLong entriesRead = new AtomicLong(0);
+ private final AtomicLong entriesWritten = new AtomicLong(0);
+ private final DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
+
+ // a unique id to identify a compactor
+ private final long compactorID = nextCompactorID.getAndIncrement();
+ protected volatile Thread thread;
+
+ public long getCompactorID() { return compactorID; }
+
+ private synchronized void setLocalityGroup(String name) {
+ this.currentLocalityGroup = name;
+ }
+
+ public synchronized String getCurrentLocalityGroup() {
+ return currentLocalityGroup;
+ }
+
+ private void clearStats() {
+ entriesRead.set(0);
+ entriesWritten.set(0);
+ }
+
+ protected static final Set<Compactor> runningCompactions = Collections.synchronizedSet(new HashSet<Compactor>());
+
+ public static List<CompactionInfo> getRunningCompactions() {
+ ArrayList<CompactionInfo> compactions = new ArrayList<CompactionInfo>();
+
+ synchronized (runningCompactions) {
+ for (Compactor compactor : runningCompactions) {
+ compactions.add(new CompactionInfo(compactor));
+ }
+ }
+
+ return compactions;
+ }
+
+ public Compactor(VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
+ AccumuloConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, int reason) {
+ this.extent = extent;
+ this.fs = fs;
+ this.filesToCompact = files;
+ this.imm = imm;
+ this.outputFile = outputFile;
+ this.propogateDeletes = propogateDeletes;
+ this.acuTableConf = acuTableConf;
+ this.env = env;
+ this.iterators = iterators;
+ this.reason = reason;
+
+ startTime = System.currentTimeMillis();
+ }
+
+ public VolumeManager getFileSystem() {
+ return fs;
+ }
+
+ KeyExtent getExtent() {
+ return extent;
+ }
+
+ String getOutputFile() {
+ return outputFile.toString();
+ }
+
+ MajorCompactionReason getMajorCompactionReason() { return MajorCompactionReason.values()[reason]; }
+
+ @Override
+ public CompactionStats call() throws IOException, CompactionCanceledException {
+
+ FileSKVWriter mfw = null;
+
+ CompactionStats majCStats = new CompactionStats();
+
+ boolean remove = runningCompactions.add(this);
+
+ clearStats();
+
+ String oldThreadName = Thread.currentThread().getName();
+ String newThreadName = "MajC compacting " + extent.toString() + " started " + dateFormatter.format(new Date()) + " file: " + outputFile;
+ Thread.currentThread().setName(newThreadName);
+ thread = Thread.currentThread();
+ try {
+ FileOperations fileFactory = FileOperations.getInstance();
+ FileSystem ns = this.fs.getVolumeByPath(outputFile.path()).getFileSystem();
+ mfw = fileFactory.openWriter(outputFile.path().toString(), ns, ns.getConf(), acuTableConf);
+
+ Map<String,Set<ByteSequence>> lGroups;
+ try {
+ lGroups = LocalityGroupUtil.getLocalityGroups(acuTableConf);
+ } catch (LocalityGroupConfigurationError e) {
+ throw new IOException(e);
+ }
+
+ long t1 = System.currentTimeMillis();
+
+ HashSet<ByteSequence> allColumnFamilies = new HashSet<ByteSequence>();
+
+ if (mfw.supportsLocalityGroups()) {
+ for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) {
+ setLocalityGroup(entry.getKey());
+ compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats);
+ allColumnFamilies.addAll(entry.getValue());
+ }
+ }
+
+ setLocalityGroup("");
+ compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats);
+
+ long t2 = System.currentTimeMillis();
+
+ FileSKVWriter mfwTmp = mfw;
+ mfw = null; // set this to null so we do not try to close it again in finally if the close fails
+ mfwTmp.close(); // if the close fails it will cause the compaction to fail
+
+ // Verify the file, since hadoop 0.20.2 sometimes lies about the success of close()
+ try {
+ FileSKVIterator openReader = fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(), acuTableConf);
+ openReader.close();
+ } catch (IOException ex) {
+ log.error("Verification of successful compaction fails!!! " + extent + " " + outputFile, ex);
+ throw ex;
+ }
+
+ log.debug(String.format("Compaction %s %,d read | %,d written | %,6d entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(),
+ majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0));
+
+ majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns, ns.getConf(), acuTableConf));
+ return majCStats;
+ } catch (IOException e) {
+ log.error(e, e);
+ throw e;
+ } catch (RuntimeException e) {
+ log.error(e, e);
+ throw e;
+ } finally {
+ Thread.currentThread().setName(oldThreadName);
+ if (remove) {
+ thread = null;
+ runningCompactions.remove(this);
+ }
+
+ try {
+ if (mfw != null) {
+ // compaction must not have finished successfully, so close its output file
+ try {
+ mfw.close();
+ } finally {
+ if (!fs.deleteRecursively(outputFile.path()))
+ if (fs.exists(outputFile.path()))
+ log.error("Unable to delete " + outputFile);
+ }
+ }
+ } catch (IOException e) {
+ log.warn(e, e);
+ } catch (RuntimeException exception) {
+ log.warn(exception, exception);
+ }
+ }
+ }
+
+ private List<SortedKeyValueIterator<Key,Value>> openMapDataFiles(String lgName, ArrayList<FileSKVIterator> readers) throws IOException {
+
+ List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
+
+ for (FileRef mapFile : filesToCompact.keySet()) {
+ try {
+
+ FileOperations fileFactory = FileOperations.getInstance();
+ FileSystem fs = this.fs.getVolumeByPath(mapFile.path()).getFileSystem();
+ FileSKVIterator reader;
+
+ reader = fileFactory.openReader(mapFile.path().toString(), false, fs, fs.getConf(), acuTableConf);
+
+ readers.add(reader);
+
+ SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile.path().toString(), false, reader);
+
+ if (filesToCompact.get(mapFile).isTimeSet()) {
+ iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime());
+ }
+
+ iters.add(iter);
+
+ } catch (Throwable e) {
+
+ ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile.path().toString(), e));
+
+ log.warn("Some problem opening map file " + mapFile + " " + e.getMessage(), e);
+ // failed to open some map file... close the ones that were opened
+ for (FileSKVIterator reader : readers) {
+ try {
+ reader.close();
+ } catch (Throwable e2) {
+ log.warn("Failed to close map file", e2);
+ }
+ }
+
+ readers.clear();
+
+ if (e instanceof IOException)
+ throw (IOException) e;
+ throw new IOException("Failed to open map data files", e);
+ }
+ }
+
+ return iters;
+ }
+
+ private void compactLocalityGroup(String lgName, Set<ByteSequence> columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats)
+ throws IOException, CompactionCanceledException {
+ ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(filesToCompact.size());
+ Span span = Trace.start("compact");
+ try {
+ long entriesCompacted = 0;
+ List<SortedKeyValueIterator<Key,Value>> iters = openMapDataFiles(lgName, readers);
+
+ if (imm != null) {
+ iters.add(imm.compactionIterator());
+ }
+
+ CountingIterator citr = new CountingIterator(new MultiIterator(iters, extent.toDataRange()), entriesRead);
+ DeletingIterator delIter = new DeletingIterator(citr, propogateDeletes);
+ ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+
+ // if(env.getIteratorScope() )
+
+ TabletIteratorEnvironment iterEnv;
+ if (env.getIteratorScope() == IteratorScope.majc)
+ iterEnv = new TabletIteratorEnvironment(IteratorScope.majc, !propogateDeletes, acuTableConf);
+ else if (env.getIteratorScope() == IteratorScope.minc)
+ iterEnv = new TabletIteratorEnvironment(IteratorScope.minc, acuTableConf);
+ else
+ throw new IllegalArgumentException();
+
+ SortedKeyValueIterator<Key,Value> itr = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(env.getIteratorScope(), cfsi, extent, acuTableConf,
+ iterators, iterEnv));
+
+ itr.seek(extent.toDataRange(), columnFamilies, inclusive);
+
+ if (!inclusive) {
+ mfw.startDefaultLocalityGroup();
+ } else {
+ mfw.startNewLocalityGroup(lgName, columnFamilies);
+ }
+
+ Span write = Trace.start("write");
+ try {
+ while (itr.hasTop() && env.isCompactionEnabled()) {
+ mfw.append(itr.getTopKey(), itr.getTopValue());
+ itr.next();
+ entriesCompacted++;
+
+ if (entriesCompacted % 1024 == 0) {
+ // Periodically update stats, do not want to do this too often since its volatile
+ entriesWritten.addAndGet(1024);
+ }
+ }
+
+ if (itr.hasTop() && !env.isCompactionEnabled()) {
+ // cancel major compaction operation
+ try {
+ try {
+ mfw.close();
+ } catch (IOException e) {
+ log.error(e, e);
+ }
+ fs.deleteRecursively(outputFile.path());
+ } catch (Exception e) {
+ log.warn("Failed to delete Canceled compaction output file " + outputFile, e);
+ }
+ throw new CompactionCanceledException();
+ }
+
+ } finally {
+ CompactionStats lgMajcStats = new CompactionStats(citr.getCount(), entriesCompacted);
+ majCStats.add(lgMajcStats);
+ write.stop();
+ }
+
+ } finally {
+ // close sequence files opened
+ for (FileSKVIterator reader : readers) {
+ try {
+ reader.close();
+ } catch (Throwable e) {
+ log.warn("Failed to close map file", e);
+ }
+ }
+ span.stop();
+ }
+ }
+
+ Collection<FileRef> getFilesToCompact() {
+ return filesToCompact.keySet();
+ }
+
+ boolean hasIMM() {
+ return imm != null;
+ }
+
+ boolean willPropogateDeletes() {
+ return propogateDeletes;
+ }
+
+ long getEntriesRead() {
+ return entriesRead.get();
+ }
+
+ long getEntriesWritten() {
+ return entriesWritten.get();
+ }
+
+ long getStartTime() {
+ return startTime;
+ }
+
+ Iterable<IteratorSetting> getIterators() {
+ return this.iterators;
+ }
+
+ MinorCompactionReason getMinCReason() {
+ return MinorCompactionReason.values()[reason];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
new file mode 100644
index 0000000..2771db9
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.util.MapCounter;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.util.MasterMetadataUtil;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.tserver.TLevel;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+class DatafileManager {
+ private final Logger log = Logger.getLogger(DatafileManager.class);
+ // access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles
+ private final Map<FileRef,DataFileValue> datafileSizes = Collections.synchronizedMap(new TreeMap<FileRef,DataFileValue>());
+ private final Tablet tablet;
+
+ // ensure we only have one reader/writer of our bulk file notes at at time
+ private final Object bulkFileImportLock = new Object();
+
+ DatafileManager(Tablet tablet, SortedMap<FileRef,DataFileValue> datafileSizes) {
+ for (Entry<FileRef,DataFileValue> datafiles : datafileSizes.entrySet()) {
+ this.datafileSizes.put(datafiles.getKey(), datafiles.getValue());
+ }
+ this.tablet = tablet;
+ }
+
+ private FileRef mergingMinorCompactionFile = null;
+ private final Set<FileRef> filesToDeleteAfterScan = new HashSet<FileRef>();
+ private final Map<Long,Set<FileRef>> scanFileReservations = new HashMap<Long,Set<FileRef>>();
+ private final MapCounter<FileRef> fileScanReferenceCounts = new MapCounter<FileRef>();
+ private long nextScanReservationId = 0;
+ private boolean reservationsBlocked = false;
+
+ private final Set<FileRef> majorCompactingFiles = new HashSet<FileRef>();
+
+ static void rename(VolumeManager fs, Path src, Path dst) throws IOException {
+ if (!fs.rename(src, dst)) {
+ throw new IOException("Rename " + src + " to " + dst + " returned false ");
+ }
+ }
+
+ Pair<Long,Map<FileRef,DataFileValue>> reserveFilesForScan() {
+ synchronized (tablet) {
+
+ while (reservationsBlocked) {
+ try {
+ tablet.wait(50);
+ } catch (InterruptedException e) {
+ log.warn(e, e);
+ }
+ }
+
+ Set<FileRef> absFilePaths = new HashSet<FileRef>(datafileSizes.keySet());
+
+ long rid = nextScanReservationId++;
+
+ scanFileReservations.put(rid, absFilePaths);
+
+ Map<FileRef,DataFileValue> ret = new HashMap<FileRef,DataFileValue>();
+
+ for (FileRef path : absFilePaths) {
+ fileScanReferenceCounts.increment(path, 1);
+ ret.put(path, datafileSizes.get(path));
+ }
+
+ return new Pair<Long,Map<FileRef,DataFileValue>>(rid, ret);
+ }
+ }
+
+ void returnFilesForScan(Long reservationId) {
+
+ final Set<FileRef> filesToDelete = new HashSet<FileRef>();
+
+ synchronized (tablet) {
+ Set<FileRef> absFilePaths = scanFileReservations.remove(reservationId);
+
+ if (absFilePaths == null)
+ throw new IllegalArgumentException("Unknown scan reservation id " + reservationId);
+
+ boolean notify = false;
+ for (FileRef path : absFilePaths) {
+ long refCount = fileScanReferenceCounts.decrement(path, 1);
+ if (refCount == 0) {
+ if (filesToDeleteAfterScan.remove(path))
+ filesToDelete.add(path);
+ notify = true;
+ } else if (refCount < 0)
+ throw new IllegalStateException("Scan ref count for " + path + " is " + refCount);
+ }
+
+ if (notify)
+ tablet.notifyAll();
+ }
+
+ if (filesToDelete.size() > 0) {
+ log.debug("Removing scan refs from metadata " + tablet.getExtent() + " " + filesToDelete);
+ MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, SystemCredentials.get(), tablet.getTabletServer().getLock());
+ }
+ }
+
+ void removeFilesAfterScan(Set<FileRef> scanFiles) {
+ if (scanFiles.size() == 0)
+ return;
+
+ Set<FileRef> filesToDelete = new HashSet<FileRef>();
+
+ synchronized (tablet) {
+ for (FileRef path : scanFiles) {
+ if (fileScanReferenceCounts.get(path) == 0)
+ filesToDelete.add(path);
+ else
+ filesToDeleteAfterScan.add(path);
+ }
+ }
+
+ if (filesToDelete.size() > 0) {
+ log.debug("Removing scan refs from metadata " + tablet.getExtent() + " " + filesToDelete);
+ MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, SystemCredentials.get(), tablet.getTabletServer().getLock());
+ }
+ }
+
+ private TreeSet<FileRef> waitForScansToFinish(Set<FileRef> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) {
+ long startTime = System.currentTimeMillis();
+ TreeSet<FileRef> inUse = new TreeSet<FileRef>();
+
+ Span waitForScans = Trace.start("waitForScans");
+ try {
+ synchronized (tablet) {
+ if (blockNewScans) {
+ if (reservationsBlocked)
+ throw new IllegalStateException();
+
+ reservationsBlocked = true;
+ }
+
+ for (FileRef path : pathsToWaitFor) {
+ while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) {
+ try {
+ tablet.wait(100);
+ } catch (InterruptedException e) {
+ log.warn(e, e);
+ }
+ }
+ }
+
+ for (FileRef path : pathsToWaitFor) {
+ if (fileScanReferenceCounts.get(path) > 0)
+ inUse.add(path);
+ }
+
+ if (blockNewScans) {
+ reservationsBlocked = false;
+ tablet.notifyAll();
+ }
+
+ }
+ } finally {
+ waitForScans.stop();
+ }
+ return inUse;
+ }
+
+ public void importMapFiles(long tid, Map<FileRef,DataFileValue> pathsString, boolean setTime) throws IOException {
+
+ final KeyExtent extent = tablet.getExtent();
+ String bulkDir = null;
+
+ Map<FileRef,DataFileValue> paths = new HashMap<FileRef,DataFileValue>();
+ for (Entry<FileRef,DataFileValue> entry : pathsString.entrySet())
+ paths.put(entry.getKey(), entry.getValue());
+
+ for (FileRef tpath : paths.keySet()) {
+
+ boolean inTheRightDirectory = false;
+ Path parent = tpath.path().getParent().getParent();
+ for (String tablesDir : ServerConstants.getTablesDirs()) {
+ if (parent.equals(new Path(tablesDir, tablet.getExtent().getTableId().toString()))) {
+ inTheRightDirectory = true;
+ break;
+ }
+ }
+ if (!inTheRightDirectory) {
+ throw new IOException("Data file " + tpath + " not in table dirs");
+ }
+
+ if (bulkDir == null)
+ bulkDir = tpath.path().getParent().toString();
+ else if (!bulkDir.equals(tpath.path().getParent().toString()))
+ throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + " " + tpath);
+
+ }
+
+ if (tablet.getExtent().isRootTablet()) {
+ throw new IllegalArgumentException("Can not import files to root tablet");
+ }
+
+ synchronized (bulkFileImportLock) {
+ Credentials creds = SystemCredentials.get();
+ Connector conn;
+ try {
+ conn = HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), creds.getToken());
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ // Remove any bulk files we've previously loaded and compacted away
+ List<FileRef> files = MetadataTableUtil.getBulkFilesLoaded(conn, extent, tid);
+
+ for (FileRef file : files)
+ if (paths.keySet().remove(file))
+ log.debug("Ignoring request to re-import a file already imported: " + extent + ": " + file);
+
+ if (paths.size() > 0) {
+ long bulkTime = Long.MIN_VALUE;
+ if (setTime) {
+ for (DataFileValue dfv : paths.values()) {
+ long nextTime = tablet.getAndUpdateTime();
+ if (nextTime < bulkTime)
+ throw new IllegalStateException("Time went backwards unexpectedly " + nextTime + " " + bulkTime);
+ bulkTime = nextTime;
+ dfv.setTime(bulkTime);
+ }
+ }
+
+ tablet.updatePersistedTime(bulkTime, paths, tid);
+ }
+ }
+
+ synchronized (tablet) {
+ for (Entry<FileRef,DataFileValue> tpath : paths.entrySet()) {
+ if (datafileSizes.containsKey(tpath.getKey())) {
+ log.error("Adding file that is already in set " + tpath.getKey());
+ }
+ datafileSizes.put(tpath.getKey(), tpath.getValue());
+
+ }
+
+ tablet.getTabletResources().importedMapFiles();
+
+ tablet.computeNumEntries();
+ }
+
+ for (Entry<FileRef,DataFileValue> entry : paths.entrySet()) {
+ log.log(TLevel.TABLET_HIST, tablet.getExtent() + " import " + entry.getKey() + " " + entry.getValue());
+ }
+ }
+
+ FileRef reserveMergingMinorCompactionFile() {
+ if (mergingMinorCompactionFile != null)
+ throw new IllegalStateException("Tried to reserve merging minor compaction file when already reserved : " + mergingMinorCompactionFile);
+
+ if (tablet.getExtent().isRootTablet())
+ return null;
+
+ int maxFiles = tablet.getTableConfiguration().getMaxFilesPerTablet();
+
+ // when a major compaction is running and we are at max files, write out
+ // one extra file... want to avoid the case where major compaction is
+ // compacting everything except for the largest file, and therefore the
+ // largest file is returned for merging.. the following check mostly
+ // avoids this case, except for the case where major compactions fail or
+ // are canceled
+ if (majorCompactingFiles.size() > 0 && datafileSizes.size() == maxFiles)
+ return null;
+
+ if (datafileSizes.size() >= maxFiles) {
+ // find the smallest file
+
+ long min = Long.MAX_VALUE;
+ FileRef minName = null;
+
+ for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
+ if (entry.getValue().getSize() < min && !majorCompactingFiles.contains(entry.getKey())) {
+ min = entry.getValue().getSize();
+ minName = entry.getKey();
+ }
+ }
+
+ if (minName == null)
+ return null;
+
+ mergingMinorCompactionFile = minName;
+ return minName;
+ }
+
+ return null;
+ }
+
+ void unreserveMergingMinorCompactionFile(FileRef file) {
+ if ((file == null && mergingMinorCompactionFile != null) || (file != null && mergingMinorCompactionFile == null)
+ || (file != null && mergingMinorCompactionFile != null && !file.equals(mergingMinorCompactionFile)))
+ throw new IllegalStateException("Disagreement " + file + " " + mergingMinorCompactionFile);
+
+ mergingMinorCompactionFile = null;
+ }
+
+ void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId)
+ throws IOException {
+
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ if (tablet.getExtent().isRootTablet()) {
+ try {
+ if (!zoo.isLockHeld(tablet.getTabletServer().getLock().getLockID())) {
+ throw new IllegalStateException();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
+ }
+ }
+
+ // rename before putting in metadata table, so files in metadata table should
+ // always exist
+ do {
+ try {
+ if (dfv.getNumEntries() == 0) {
+ tablet.getTabletServer().getFileSystem().deleteRecursively(tmpDatafile.path());
+ } else {
+ if (tablet.getTabletServer().getFileSystem().exists(newDatafile.path())) {
+ log.warn("Target map file already exist " + newDatafile);
+ tablet.getTabletServer().getFileSystem().deleteRecursively(newDatafile.path());
+ }
+
+ rename(tablet.getTabletServer().getFileSystem(), tmpDatafile.path(), newDatafile.path());
+ }
+ break;
+ } catch (IOException ioe) {
+ log.warn("Tablet " + tablet.getExtent() + " failed to rename " + newDatafile + " after MinC, will retry in 60 secs...", ioe);
+ UtilWaitThread.sleep(60 * 1000);
+ }
+ } while (true);
+
+ long t1, t2;
+
+ // the code below always assumes merged files are in use by scans... this must be done
+ // because the in memory list of files is not updated until after the metadata table
+ // therefore the file is available to scans until memory is updated, but want to ensure
+ // the file is not available for garbage collection... if memory were updated
+ // before this point (like major compactions do), then the following code could wait
+ // for scans to finish like major compactions do.... used to wait for scans to finish
+ // here, but that was incorrect because a scan could start after waiting but before
+ // memory was updated... assuming the file is always in use by scans leads to
+ // one uneeded metadata update when it was not actually in use
+ Set<FileRef> filesInUseByScans = Collections.emptySet();
+ if (absMergeFile != null)
+ filesInUseByScans = Collections.singleton(absMergeFile);
+
+ // very important to write delete entries outside of log lock, because
+ // this metadata write does not go up... it goes sideways or to itself
+ if (absMergeFile != null)
+ MetadataTableUtil.addDeleteEntries(tablet.getExtent(), Collections.singleton(absMergeFile), SystemCredentials.get());
+
+ Set<String> unusedWalLogs = tablet.beginClearingUnusedLogs();
+ try {
+ // the order of writing to metadata and walog is important in the face of machine/process failures
+ // need to write to metadata before writing to walog, when things are done in the reverse order
+ // data could be lost... the minor compaction start even should be written before the following metadata
+ // write is made
+
+ tablet.updateTabletDataFile(commitSession.getMaxCommittedTime(), newDatafile, absMergeFile, dfv, unusedWalLogs, filesInUseByScans, flushId);
+
+ } finally {
+ tablet.finishClearingUnusedLogs();
+ }
+
+ do {
+ try {
+ // the purpose of making this update use the new commit session, instead of the old one passed in,
+ // is because the new one will reference the logs used by current memory...
+
+ tablet.getTabletServer().minorCompactionFinished(tablet.getTabletMemory().getCommitSession(), newDatafile.toString(), commitSession.getWALogSeq() + 2);
+ break;
+ } catch (IOException e) {
+ log.error("Failed to write to write-ahead log " + e.getMessage() + " will retry", e);
+ UtilWaitThread.sleep(1 * 1000);
+ }
+ } while (true);
+
+ synchronized (tablet) {
+ t1 = System.currentTimeMillis();
+
+ if (datafileSizes.containsKey(newDatafile)) {
+ log.error("Adding file that is already in set " + newDatafile);
+ }
+
+ if (dfv.getNumEntries() > 0) {
+ datafileSizes.put(newDatafile, dfv);
+ }
+
+ if (absMergeFile != null) {
+ datafileSizes.remove(absMergeFile);
+ }
+
+ unreserveMergingMinorCompactionFile(absMergeFile);
+
+ tablet.flushComplete(flushId);
+
+ t2 = System.currentTimeMillis();
+ }
+
+ // must do this after list of files in memory is updated above
+ removeFilesAfterScan(filesInUseByScans);
+
+ if (absMergeFile != null)
+ log.log(TLevel.TABLET_HIST, tablet.getExtent() + " MinC [" + absMergeFile + ",memory] -> " + newDatafile);
+ else
+ log.log(TLevel.TABLET_HIST, tablet.getExtent() + " MinC [memory] -> " + newDatafile);
+ log.debug(String.format("MinC finish lock %.2f secs %s", (t2 - t1) / 1000.0, tablet.getExtent().toString()));
+ long splitSize = tablet.getTableConfiguration().getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD);
+ if (dfv.getSize() > splitSize) {
+ log.debug(String.format("Minor Compaction wrote out file larger than split threshold. split threshold = %,d file size = %,d", splitSize, dfv.getSize()));
+ }
+ }
+
+ public void reserveMajorCompactingFiles(Collection<FileRef> files) {
+ if (majorCompactingFiles.size() != 0)
+ throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles);
+
+ if (mergingMinorCompactionFile != null && files.contains(mergingMinorCompactionFile))
+ throw new IllegalStateException("Major compaction tried to resrve file in use by minor compaction " + mergingMinorCompactionFile);
+
+ majorCompactingFiles.addAll(files);
+ }
+
+ public void clearMajorCompactingFile() {
+ majorCompactingFiles.clear();
+ }
+
+ void bringMajorCompactionOnline(Set<FileRef> oldDatafiles, FileRef tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv)
+ throws IOException {
+ final KeyExtent extent = tablet.getExtent();
+ long t1, t2;
+
+ if (!extent.isRootTablet()) {
+
+ if (tablet.getTabletServer().getFileSystem().exists(newDatafile.path())) {
+ log.error("Target map file already exist " + newDatafile, new Exception());
+ throw new IllegalStateException("Target map file already exist " + newDatafile);
+ }
+
+ // rename before putting in metadata table, so files in metadata table should
+ // always exist
+ rename(tablet.getTabletServer().getFileSystem(), tmpDatafile.path(), newDatafile.path());
+
+ if (dfv.getNumEntries() == 0) {
+ tablet.getTabletServer().getFileSystem().deleteRecursively(newDatafile.path());
+ }
+ }
+
+ TServerInstance lastLocation = null;
+ synchronized (tablet) {
+
+ t1 = System.currentTimeMillis();
+
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+
+ tablet.incrementDataSourceDeletions();
+
+ if (extent.isRootTablet()) {
+
+ waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE);
+
+ try {
+ if (!zoo.isLockHeld(tablet.getTabletServer().getLock().getLockID())) {
+ throw new IllegalStateException();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
+ }
+
+ // mark files as ready for deletion, but
+ // do not delete them until we successfully
+ // rename the compacted map file, in case
+ // the system goes down
+
+ RootFiles.replaceFiles(tablet.getTableConfiguration(), tablet.getTabletServer().getFileSystem(), tablet.getLocation(), oldDatafiles, tmpDatafile, newDatafile);
+ }
+
+ // atomically remove old files and add new file
+ for (FileRef oldDatafile : oldDatafiles) {
+ if (!datafileSizes.containsKey(oldDatafile)) {
+ log.error("file does not exist in set " + oldDatafile);
+ }
+ datafileSizes.remove(oldDatafile);
+ majorCompactingFiles.remove(oldDatafile);
+ }
+
+ if (datafileSizes.containsKey(newDatafile)) {
+ log.error("Adding file that is already in set " + newDatafile);
+ }
+
+ if (dfv.getNumEntries() > 0) {
+ datafileSizes.put(newDatafile, dfv);
+ }
+
+ // could be used by a follow on compaction in a multipass compaction
+ majorCompactingFiles.add(newDatafile);
+
+ tablet.computeNumEntries();
+
+ lastLocation = tablet.resetLastLocation();
+
+ tablet.setLastCompactionID(compactionId);
+ t2 = System.currentTimeMillis();
+ }
+
+ if (!extent.isRootTablet()) {
+ Set<FileRef> filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000);
+ if (filesInUseByScans.size() > 0)
+ log.debug("Adding scan refs to metadata " + extent + " " + filesInUseByScans);
+ MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles, filesInUseByScans, newDatafile, compactionId, dfv, SystemCredentials.get(),
+ tablet.getTabletServer().getClientAddressString(), lastLocation, tablet.getTabletServer().getLock());
+ removeFilesAfterScan(filesInUseByScans);
+ }
+
+ log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0));
+ log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + " --> " + newDatafile);
+ }
+
+ public SortedMap<FileRef,DataFileValue> getDatafileSizes() {
+ synchronized (tablet) {
+ TreeMap<FileRef,DataFileValue> copy = new TreeMap<FileRef,DataFileValue>(datafileSizes);
+ return Collections.unmodifiableSortedMap(copy);
+ }
+ }
+
+ public Set<FileRef> getFiles() {
+ synchronized (tablet) {
+ HashSet<FileRef> files = new HashSet<FileRef>(datafileSizes.keySet());
+ return Collections.unmodifiableSet(files);
+ }
+ }
+
+ public int getNumFiles() {
+ return datafileSizes.size();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/KVEntry.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/KVEntry.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/KVEntry.java
new file mode 100644
index 0000000..4919be9
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/KVEntry.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.Arrays;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.Value;
+
+public class KVEntry extends KeyValue {
+ private static final long serialVersionUID = 1L;
+
+ public KVEntry(Key k, Value v) {
+ super(new Key(k), Arrays.copyOf(v.get(), v.get().length));
+ }
+
+ int numBytes() {
+ return getKey().getSize() + getValue().get().length;
+ }
+
+ int estimateMemoryUsed() {
+ return getKey().getSize() + getValue().get().length + (9 * 32); // overhead is 32 per object
+ }
+}
\ No newline at end of file