You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:55:41 UTC
[02/54] [partial] ACCUMULO-658,
ACCUMULO-656 Split server into separate modules
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerConstants.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerConstants.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerConstants.java
deleted file mode 100644
index 2c6858d..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerConstants.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-public class TabletServerConstants {
-
- public static final String IntermediateKeyName = "IntermediateKey";
- public static final String ColumnSetName = "ColumnSet";
- public static final String AuthorizationSetName = "AuthorizationSet";
- public static final String EndKeyName = "EndKey";
- public static final String MaxResultsName = "MaxResults";
- public static final String PreviousQueryTypeName = "PreviousQueryType";
- public static final String PreviousQueryStatusName = "PreviousQueryStatus";
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
deleted file mode 100644
index 15fb7c7..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
+++ /dev/null
@@ -1,713 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.LoggingRunnable;
-import org.apache.accumulo.core.util.NamingThreadFactory;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager;
-import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
-import org.apache.accumulo.server.tabletserver.compaction.CompactionStrategy;
-import org.apache.accumulo.server.tabletserver.compaction.DefaultCompactionStrategy;
-import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionReason;
-import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionRequest;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.trace.instrument.TraceExecutorService;
-import org.apache.log4j.Logger;
-
-/**
- * ResourceManager is responsible for managing the resources of all tablets within a tablet server.
- *
- *
- *
- */
-public class TabletServerResourceManager {
-
- private ExecutorService minorCompactionThreadPool;
- private ExecutorService majorCompactionThreadPool;
- private ExecutorService rootMajorCompactionThreadPool;
- private ExecutorService defaultMajorCompactionThreadPool;
- private ExecutorService splitThreadPool;
- private ExecutorService defaultSplitThreadPool;
- private ExecutorService defaultMigrationPool;
- private ExecutorService migrationPool;
- private ExecutorService assignmentPool;
- private ExecutorService assignMetaDataPool;
- private ExecutorService readAheadThreadPool;
- private ExecutorService defaultReadAheadThreadPool;
- private Map<String,ExecutorService> threadPools = new TreeMap<String,ExecutorService>();
-
- private HashSet<TabletResourceManager> tabletResources;
-
- private final VolumeManager fs;
-
- private FileManager fileManager;
-
- private MemoryManager memoryManager;
-
- private MemoryManagementFramework memMgmt;
-
- private final LruBlockCache _dCache;
- private final LruBlockCache _iCache;
- private final ServerConfiguration conf;
-
- private static final Logger log = Logger.getLogger(TabletServerResourceManager.class);
-
- private ExecutorService addEs(String name, ExecutorService tp) {
- if (threadPools.containsKey(name)) {
- throw new IllegalArgumentException("Cannot create two executor services with same name " + name);
- }
- tp = new TraceExecutorService(tp);
- threadPools.put(name, tp);
- return tp;
- }
-
- private ExecutorService addEs(final Property maxThreads, String name, final ThreadPoolExecutor tp) {
- ExecutorService result = addEs(name, tp);
- SimpleTimer.getInstance().schedule(new Runnable() {
- @Override
- public void run() {
- try {
- int max = conf.getConfiguration().getCount(maxThreads);
- if (tp.getMaximumPoolSize() != max) {
- log.info("Changing " + maxThreads.getKey() + " to " + max);
- tp.setCorePoolSize(max);
- tp.setMaximumPoolSize(max);
- }
- } catch (Throwable t) {
- log.error(t, t);
- }
- }
-
- }, 1000, 10 * 1000);
- return result;
- }
-
- private ExecutorService createEs(int max, String name) {
- return addEs(name, Executors.newFixedThreadPool(max, new NamingThreadFactory(name)));
- }
-
- private ExecutorService createEs(Property max, String name) {
- return createEs(max, name, new LinkedBlockingQueue<Runnable>());
- }
-
- private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) {
- int maxThreads = conf.getConfiguration().getCount(max);
- ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L, TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
- return addEs(max, name, tp);
- }
-
- private ExecutorService createEs(int min, int max, int timeout, String name) {
- return addEs(name, new ThreadPoolExecutor(min, max, timeout, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(name)));
- }
-
- public TabletServerResourceManager(Instance instance, VolumeManager fs) {
- this.conf = new ServerConfiguration(instance);
- this.fs = fs;
- final AccumuloConfiguration acuConf = conf.getConfiguration();
-
- long maxMemory = acuConf.getMemoryInBytes(Property.TSERV_MAXMEM);
- boolean usingNativeMap = acuConf.getBoolean(Property.TSERV_NATIVEMAP_ENABLED) && NativeMap.loadedNativeLibraries();
-
- long blockSize = acuConf.getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE);
- long dCacheSize = acuConf.getMemoryInBytes(Property.TSERV_DATACACHE_SIZE);
- long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE);
-
- _iCache = new LruBlockCache(iCacheSize, blockSize);
- _dCache = new LruBlockCache(dCacheSize, blockSize);
-
- Runtime runtime = Runtime.getRuntime();
- if (!usingNativeMap && maxMemory + dCacheSize + iCacheSize > runtime.maxMemory()) {
- throw new IllegalArgumentException(String.format(
- "Maximum tablet server map memory %,d and block cache sizes %,d is too large for this JVM configuration %,d", maxMemory, dCacheSize + iCacheSize,
- runtime.maxMemory()));
- }
- runtime.gc();
-
- // totalMemory - freeMemory = memory in use
- // maxMemory - memory in use = max available memory
- if (!usingNativeMap && maxMemory > runtime.maxMemory() - (runtime.totalMemory() - runtime.freeMemory())) {
- log.warn("In-memory map may not fit into local memory space.");
- }
-
- minorCompactionThreadPool = createEs(Property.TSERV_MINC_MAXCONCURRENT, "minor compactor");
-
- // make this thread pool have a priority queue... and execute tablets with the most
- // files first!
- majorCompactionThreadPool = createEs(Property.TSERV_MAJC_MAXCONCURRENT, "major compactor", new CompactionQueue());
- rootMajorCompactionThreadPool = createEs(0, 1, 300, "md root major compactor");
- defaultMajorCompactionThreadPool = createEs(0, 1, 300, "md major compactor");
-
- splitThreadPool = createEs(1, "splitter");
- defaultSplitThreadPool = createEs(0, 1, 60, "md splitter");
-
- defaultMigrationPool = createEs(0, 1, 60, "metadata tablet migration");
- migrationPool = createEs(Property.TSERV_MIGRATE_MAXCONCURRENT, "tablet migration");
-
- // not sure if concurrent assignments can run safely... even if they could there is probably no benefit at startup because
- // individual tablet servers are already running assignments concurrently... having each individual tablet server run
- // concurrent assignments would put more load on the metadata table at startup
- assignmentPool = createEs(1, "tablet assignment");
-
- assignMetaDataPool = createEs(0, 1, 60, "metadata tablet assignment");
-
- readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, "tablet read ahead");
- defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT, "metadata tablets read ahead");
-
- tabletResources = new HashSet<TabletResourceManager>();
-
- int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
-
- fileManager = new FileManager(conf, fs, maxOpenFiles, _dCache, _iCache);
-
- memoryManager = Property.createInstanceFromPropertyName(acuConf, Property.TSERV_MEM_MGMT, MemoryManager.class, new LargestFirstMemoryManager());
- memoryManager.init(conf);
- memMgmt = new MemoryManagementFramework();
- }
-
- private static class TabletStateImpl implements TabletState, Cloneable {
-
- private long lct;
- private Tablet tablet;
- private long mts;
- private long mcmts;
-
- public TabletStateImpl(Tablet t, long mts, long lct, long mcmts) {
- this.tablet = t;
- this.mts = mts;
- this.lct = lct;
- this.mcmts = mcmts;
- }
-
- @Override
- public KeyExtent getExtent() {
- return tablet.getExtent();
- }
-
- Tablet getTablet() {
- return tablet;
- }
-
- @Override
- public long getLastCommitTime() {
- return lct;
- }
-
- @Override
- public long getMemTableSize() {
- return mts;
- }
-
- @Override
- public long getMinorCompactingMemTableSize() {
- return mcmts;
- }
- }
-
- private class MemoryManagementFramework {
- private final Map<KeyExtent,TabletStateImpl> tabletReports;
- private LinkedBlockingQueue<TabletStateImpl> memUsageReports;
- private long lastMemCheckTime = System.currentTimeMillis();
- private long maxMem;
-
- MemoryManagementFramework() {
- tabletReports = Collections.synchronizedMap(new HashMap<KeyExtent,TabletStateImpl>());
- memUsageReports = new LinkedBlockingQueue<TabletStateImpl>();
- maxMem = conf.getConfiguration().getMemoryInBytes(Property.TSERV_MAXMEM);
-
- Runnable r1 = new Runnable() {
- @Override
- public void run() {
- processTabletMemStats();
- }
- };
-
- Thread t1 = new Daemon(new LoggingRunnable(log, r1));
- t1.setPriority(Thread.NORM_PRIORITY + 1);
- t1.setName("Accumulo Memory Guard");
- t1.start();
-
- Runnable r2 = new Runnable() {
- @Override
- public void run() {
- manageMemory();
- }
- };
-
- Thread t2 = new Daemon(new LoggingRunnable(log, r2));
- t2.setName("Accumulo Minor Compaction Initiator");
- t2.start();
-
- }
-
- private long lastMemTotal = 0;
-
- private void processTabletMemStats() {
- while (true) {
- try {
-
- TabletStateImpl report = memUsageReports.take();
-
- while (report != null) {
- tabletReports.put(report.getExtent(), report);
- report = memUsageReports.poll();
- }
-
- long delta = System.currentTimeMillis() - lastMemCheckTime;
- if (holdCommits || delta > 50 || lastMemTotal > 0.90 * maxMem) {
- lastMemCheckTime = System.currentTimeMillis();
-
- long totalMemUsed = 0;
-
- synchronized (tabletReports) {
- for (TabletStateImpl tsi : tabletReports.values()) {
- totalMemUsed += tsi.getMemTableSize();
- totalMemUsed += tsi.getMinorCompactingMemTableSize();
- }
- }
-
- if (totalMemUsed > 0.95 * maxMem) {
- holdAllCommits(true);
- } else {
- holdAllCommits(false);
- }
-
- lastMemTotal = totalMemUsed;
- }
-
- } catch (InterruptedException e) {
- log.warn(e, e);
- }
- }
- }
-
- private void manageMemory() {
- while (true) {
- MemoryManagementActions mma = null;
-
- try {
- ArrayList<TabletState> tablets;
- synchronized (tabletReports) {
- tablets = new ArrayList<TabletState>(tabletReports.values());
- }
- mma = memoryManager.getMemoryManagementActions(tablets);
-
- } catch (Throwable t) {
- log.error("Memory manager failed " + t.getMessage(), t);
- }
-
- try {
- if (mma != null && mma.tabletsToMinorCompact != null && mma.tabletsToMinorCompact.size() > 0) {
- for (KeyExtent keyExtent : mma.tabletsToMinorCompact) {
- TabletStateImpl tabletReport = tabletReports.get(keyExtent);
-
- if (tabletReport == null) {
- log.warn("Memory manager asked to compact nonexistant tablet " + keyExtent);
- continue;
- }
-
- if (!tabletReport.getTablet().initiateMinorCompaction(MinorCompactionReason.SYSTEM)) {
- if (tabletReport.getTablet().isClosed()) {
- tabletReports.remove(tabletReport.getExtent());
- log.debug("Ignoring memory manager recommendation: not minor compacting closed tablet " + keyExtent);
- } else {
- log.info("Ignoring memory manager recommendation: not minor compacting " + keyExtent);
- }
- }
- }
-
- // log.debug("mma.tabletsToMinorCompact = "+mma.tabletsToMinorCompact);
- }
- } catch (Throwable t) {
- log.error("Minor compactions for memory managment failed", t);
- }
-
- UtilWaitThread.sleep(250);
- }
- }
-
- public void updateMemoryUsageStats(Tablet tablet, long size, long lastCommitTime, long mincSize) {
- memUsageReports.add(new TabletStateImpl(tablet, size, lastCommitTime, mincSize));
- }
-
- public void tabletClosed(KeyExtent extent) {
- tabletReports.remove(extent);
- }
- }
-
- private final Object commitHold = new Object();
- private volatile boolean holdCommits = false;
- private long holdStartTime;
-
- protected void holdAllCommits(boolean holdAllCommits) {
- synchronized (commitHold) {
- if (holdCommits != holdAllCommits) {
- holdCommits = holdAllCommits;
-
- if (holdCommits) {
- holdStartTime = System.currentTimeMillis();
- }
-
- if (!holdCommits) {
- log.debug(String.format("Commits held for %6.2f secs", (System.currentTimeMillis() - holdStartTime) / 1000.0));
- commitHold.notifyAll();
- }
- }
- }
-
- }
-
- void waitUntilCommitsAreEnabled() {
- if (holdCommits) {
- long timeout = System.currentTimeMillis() + conf.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
- synchronized (commitHold) {
- while (holdCommits) {
- try {
- if (System.currentTimeMillis() > timeout)
- throw new HoldTimeoutException("Commits are held");
- commitHold.wait(1000);
- } catch (InterruptedException e) {}
- }
- }
- }
- }
-
- public long holdTime() {
- if (!holdCommits)
- return 0;
- synchronized (commitHold) {
- return System.currentTimeMillis() - holdStartTime;
- }
- }
-
- public void close() {
- for (ExecutorService executorService : threadPools.values()) {
- executorService.shutdown();
- }
-
- for (Entry<String,ExecutorService> entry : threadPools.entrySet()) {
- while (true) {
- try {
- if (entry.getValue().awaitTermination(60, TimeUnit.SECONDS))
- break;
- log.info("Waiting for thread pool " + entry.getKey() + " to shutdown");
- } catch (InterruptedException e) {
- log.warn(e);
- }
- }
- }
- }
-
- public synchronized TabletResourceManager createTabletResourceManager() {
- TabletResourceManager trm = new TabletResourceManager();
- return trm;
- }
-
- synchronized private void addTabletResource(TabletResourceManager tr) {
- tabletResources.add(tr);
- }
-
- synchronized private void removeTabletResource(TabletResourceManager tr) {
- tabletResources.remove(tr);
- }
-
- public class TabletResourceManager {
-
- private final long creationTime = System.currentTimeMillis();
-
- private volatile boolean openFilesReserved = false;
-
- private volatile boolean closed = false;
-
- private Tablet tablet;
-
- private AccumuloConfiguration tableConf;
-
- TabletResourceManager() {}
-
- void setTablet(Tablet tablet, AccumuloConfiguration tableConf) {
- this.tablet = tablet;
- this.tableConf = tableConf;
- // TabletResourceManager is not really initialized until this
- // function is called.... so do not make it publicly available
- // until now
-
- addTabletResource(this);
- }
-
- // BEGIN methods that Tablets call to manage their set of open map files
-
- public void importedMapFiles() {
- lastReportedCommitTime = System.currentTimeMillis();
- }
-
- synchronized ScanFileManager newScanFileManager() {
- if (closed)
- throw new IllegalStateException("closed");
- return fileManager.newScanFileManager(tablet.getExtent());
- }
-
- // END methods that Tablets call to manage their set of open map files
-
- // BEGIN methods that Tablets call to manage memory
-
- private AtomicLong lastReportedSize = new AtomicLong();
- private AtomicLong lastReportedMincSize = new AtomicLong();
- private volatile long lastReportedCommitTime = 0;
-
- public void updateMemoryUsageStats(long size, long mincSize) {
-
- // do not want to update stats for every little change,
- // so only do it under certain circumstances... the reason
- // for this is that reporting stats acquires a lock, do
- // not want all tablets locking on the same lock for every
- // commit
- long totalSize = size + mincSize;
- long lrs = lastReportedSize.get();
- long delta = totalSize - lrs;
- long lrms = lastReportedMincSize.get();
- boolean report = false;
- // the atomic longs are considered independently, when one is set
- // the other is not set intentionally because this method is not
- // synchronized... therefore there are not transactional semantics
- // for reading and writing two variables
- if ((lrms > 0 && mincSize == 0 || lrms == 0 && mincSize > 0) && lastReportedMincSize.compareAndSet(lrms, mincSize)) {
- report = true;
- }
-
- long currentTime = System.currentTimeMillis();
- if ((delta > 32000 || delta < 0 || (currentTime - lastReportedCommitTime > 1000)) && lastReportedSize.compareAndSet(lrs, totalSize)) {
- if (delta > 0)
- lastReportedCommitTime = currentTime;
- report = true;
- }
-
- if (report)
- memMgmt.updateMemoryUsageStats(tablet, size, lastReportedCommitTime, mincSize);
- }
-
- // END methods that Tablets call to manage memory
-
- // BEGIN methods that Tablets call to make decisions about major compaction
- // when too many files are open, we may want tablets to compact down
- // to one map file
- boolean needsMajorCompaction(SortedMap<FileRef,DataFileValue> tabletFiles, MajorCompactionReason reason) {
- if (closed)
- return false;// throw new IOException("closed");
-
- // int threshold;
-
- if (reason == MajorCompactionReason.USER)
- return true;
-
- if (reason == MajorCompactionReason.IDLE) {
- // threshold = 1;
- long idleTime;
- if (lastReportedCommitTime == 0) {
- // no commits, so compute how long the tablet has been assigned to the
- // tablet server
- idleTime = System.currentTimeMillis() - creationTime;
- } else {
- idleTime = System.currentTimeMillis() - lastReportedCommitTime;
- }
-
- if (idleTime < tableConf.getTimeInMillis(Property.TABLE_MAJC_COMPACTALL_IDLETIME)) {
- return false;
- }
- }
- CompactionStrategy strategy = Property.createInstanceFromPropertyName(tableConf, Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class,
- new DefaultCompactionStrategy());
- strategy.init(Property.getCompactionStrategyOptions(tableConf));
- MajorCompactionRequest request = new MajorCompactionRequest(tablet.getExtent(), reason, TabletServerResourceManager.this.fs, tableConf);
- request.setFiles(tabletFiles);
- try {
- return strategy.shouldCompact(request);
- } catch (IOException ex) {
- return false;
- }
- }
-
- // END methods that Tablets call to make decisions about major compaction
-
- // tablets call this method to run minor compactions,
- // this allows us to control how many minor compactions
- // run concurrently in a tablet server
- void executeMinorCompaction(final Runnable r) {
- minorCompactionThreadPool.execute(new LoggingRunnable(log, r));
- }
-
- void close() throws IOException {
- // always obtain locks in same order to avoid deadlock
- synchronized (TabletServerResourceManager.this) {
- synchronized (this) {
- if (closed)
- throw new IOException("closed");
- if (openFilesReserved)
- throw new IOException("tired to close files while open files reserved");
-
- TabletServerResourceManager.this.removeTabletResource(this);
-
- memMgmt.tabletClosed(tablet.getExtent());
- memoryManager.tabletClosed(tablet.getExtent());
-
- closed = true;
- }
- }
- }
-
- public TabletServerResourceManager getTabletServerResourceManager() {
- return TabletServerResourceManager.this;
- }
-
- public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) {
- TabletServerResourceManager.this.executeMajorCompaction(tablet, compactionTask);
- }
-
- }
-
- public void executeSplit(KeyExtent tablet, Runnable splitTask) {
- if (tablet.isMeta()) {
- if (tablet.isRootTablet()) {
- log.warn("Saw request to split root tablet, ignoring");
- return;
- }
- defaultSplitThreadPool.execute(splitTask);
- } else {
- splitThreadPool.execute(splitTask);
- }
- }
-
- public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) {
- if (tablet.isRootTablet()) {
- rootMajorCompactionThreadPool.execute(compactionTask);
- } else if (tablet.isMeta()) {
- defaultMajorCompactionThreadPool.execute(compactionTask);
- } else {
- majorCompactionThreadPool.execute(compactionTask);
- }
- }
-
- public void executeReadAhead(KeyExtent tablet, Runnable task) {
- if (tablet.isRootTablet()) {
- task.run();
- } else if (tablet.isMeta()) {
- defaultReadAheadThreadPool.execute(task);
- } else {
- readAheadThreadPool.execute(task);
- }
- }
-
- public void addAssignment(Runnable assignmentHandler) {
- assignmentPool.execute(assignmentHandler);
- }
-
- public void addMetaDataAssignment(Runnable assignmentHandler) {
- assignMetaDataPool.execute(assignmentHandler);
- }
-
- public void addMigration(KeyExtent tablet, Runnable migrationHandler) {
- if (tablet.isRootTablet()) {
- migrationHandler.run();
- } else if (tablet.isMeta()) {
- defaultMigrationPool.execute(migrationHandler);
- } else {
- migrationPool.execute(migrationHandler);
- }
- }
-
- public void stopSplits() {
- splitThreadPool.shutdown();
- defaultSplitThreadPool.shutdown();
- while (true) {
- try {
- while (!splitThreadPool.awaitTermination(1, TimeUnit.MINUTES)) {
- log.info("Waiting for metadata split thread pool to stop");
- }
- while (!defaultSplitThreadPool.awaitTermination(1, TimeUnit.MINUTES)) {
- log.info("Waiting for split thread pool to stop");
- }
- break;
- } catch (InterruptedException ex) {
- log.info(ex, ex);
- }
- }
- }
-
- public void stopNormalAssignments() {
- assignmentPool.shutdown();
- while (true) {
- try {
- while (!assignmentPool.awaitTermination(1, TimeUnit.MINUTES)) {
- log.info("Waiting for assignment thread pool to stop");
- }
- break;
- } catch (InterruptedException ex) {
- log.info(ex, ex);
- }
- }
- }
-
- public void stopMetadataAssignments() {
- assignMetaDataPool.shutdown();
- while (true) {
- try {
- while (!assignMetaDataPool.awaitTermination(1, TimeUnit.MINUTES)) {
- log.info("Waiting for metadata assignment thread pool to stop");
- }
- break;
- } catch (InterruptedException ex) {
- log.info(ex, ex);
- }
- }
- }
-
- public LruBlockCache getIndexCache() {
- return _iCache;
- }
-
- public LruBlockCache getDataCache() {
- return _dCache;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java
deleted file mode 100644
index aeacb8d..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import org.apache.accumulo.core.data.KeyExtent;
-
-public interface TabletState {
- KeyExtent getExtent();
-
- long getLastCommitTime();
-
- long getMemTableSize();
-
- long getMinorCompactingMemTableSize();
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletStatsKeeper.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletStatsKeeper.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletStatsKeeper.java
deleted file mode 100644
index 81a6c64..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletStatsKeeper.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import org.apache.accumulo.core.tabletserver.thrift.ActionStats;
-import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
-
-public class TabletStatsKeeper {
-
- public static void update(ActionStats summary, ActionStats td) {
- summary.status += td.status;
- summary.elapsed += td.elapsed;
- summary.num += td.num;
- summary.count += td.count;
- summary.sumDev += td.sumDev;
- summary.queueTime += td.queueTime;
- summary.queueSumDev += td.queueSumDev;
- summary.fail += td.fail;
- }
-
- private ActionStats major = new ActionStats();
- private ActionStats minor = new ActionStats();
- private ActionStats split = new ActionStats();
-
- public enum Operation {
- MAJOR, SPLIT, MINOR
- }
-
- private ActionStats[] map = new ActionStats[] {major, split, minor};
-
- public void updateTime(Operation operation, long queued, long start, long count, boolean failed) {
- try {
- ActionStats data = map[operation.ordinal()];
- if (failed) {
- data.fail++;
- data.status--;
- } else {
- double t = (System.currentTimeMillis() - start) / 1000.0;
- double q = (start - queued) / 1000.0;
-
- data.status--;
- data.count += count;
- data.num++;
- data.elapsed += t;
- data.queueTime += q;
- data.sumDev += t * t;
- data.queueSumDev += q * q;
- if (data.elapsed < 0 || data.sumDev < 0 || data.queueSumDev < 0 || data.queueTime < 0)
- resetTimes();
- }
- } catch (Exception E) {
- resetTimes();
- }
-
- }
-
- public void updateTime(Operation operation, long start, long count, boolean failed) {
- try {
- ActionStats data = map[operation.ordinal()];
- if (failed) {
- data.fail++;
- data.status--;
- } else {
- double t = (System.currentTimeMillis() - start) / 1000.0;
-
- data.status--;
- data.num++;
- data.elapsed += t;
- data.sumDev += t * t;
-
- if (data.elapsed < 0 || data.sumDev < 0 || data.queueSumDev < 0 || data.queueTime < 0)
- resetTimes();
- }
- } catch (Exception E) {
- resetTimes();
- }
-
- }
-
- public void saveMinorTimes(TabletStatsKeeper t) {
- update(minor, t.minor);
- }
-
- public void saveMajorTimes(TabletStatsKeeper t) {
- update(major, t.major);
- }
-
- public void resetTimes() {
- major = new ActionStats();
- split = new ActionStats();
- minor = new ActionStats();
- }
-
- public void incrementStatusMinor() {
- minor.status++;
- }
-
- public void incrementStatusMajor() {
- major.status++;
- }
-
- public void incrementStatusSplit() {
- split.status++;
- }
-
- public TabletStats getTabletStats() {
- return new TabletStats(null, major, minor, split, 0, 0, 0, 0);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java
deleted file mode 100644
index c7cfc59..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.accumulo.core.client.admin.TimeType;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.server.data.ServerMutation;
-import org.apache.accumulo.server.util.time.RelativeTime;
-
-public abstract class TabletTime {
- public static final char LOGICAL_TIME_ID = 'L';
- public static final char MILLIS_TIME_ID = 'M';
-
- public static char getTimeID(TimeType timeType) {
- switch (timeType) {
- case LOGICAL:
- return LOGICAL_TIME_ID;
- case MILLIS:
- return MILLIS_TIME_ID;
- }
-
- throw new IllegalArgumentException("Unknown time type " + timeType);
- }
-
- abstract void useMaxTimeFromWALog(long time);
-
- abstract String getMetadataValue(long time);
-
- abstract String getMetadataValue();
-
- // abstract long setUpdateTimes(Mutation mutation);
- abstract long setUpdateTimes(List<Mutation> mutations);
-
- abstract long getTime();
-
- abstract long getAndUpdateTime();
-
- protected void setSystemTimes(Mutation mutation, long lastCommitTime) {
- ServerMutation m = (ServerMutation)mutation;
- m.setSystemTimestamp(lastCommitTime);
- }
-
- static TabletTime getInstance(String metadataValue) {
- if (metadataValue.charAt(0) == LOGICAL_TIME_ID) {
- return new LogicalTime(Long.parseLong(metadataValue.substring(1)));
- } else if (metadataValue.charAt(0) == MILLIS_TIME_ID) {
- return new MillisTime(Long.parseLong(metadataValue.substring(1)));
- }
-
- throw new IllegalArgumentException("Time type unknown : " + metadataValue);
-
- }
-
- public static String maxMetadataTime(String mv1, String mv2) {
- if (mv1 == null) {
- checkType(mv2);
- return mv2;
- }
-
- if (mv2 == null) {
- checkType(mv1);
- return mv1;
- }
-
- if (mv1.charAt(0) != mv2.charAt(0)) throw new IllegalArgumentException("Time types differ " + mv1 + " " + mv2);
- checkType(mv1);
-
- long t1 = Long.parseLong(mv1.substring(1));
- long t2 = Long.parseLong(mv2.substring(1));
-
- if (t1 < t2) return mv2;
- else return mv1;
-
- }
-
- private static void checkType(String mv1) {
- if (mv1.charAt(0) != LOGICAL_TIME_ID && mv1.charAt(0) != MILLIS_TIME_ID) throw new IllegalArgumentException("Invalid time type " + mv1);
- }
-
- static class MillisTime extends TabletTime {
-
- private long lastTime;
- private long lastUpdateTime = 0;
-
- public MillisTime(long time) {
- this.lastTime = time;
- }
-
- @Override
- String getMetadataValue(long time) {
- return MILLIS_TIME_ID + "" + time;
- }
-
- @Override
- public String getMetadataValue() {
- return getMetadataValue(lastTime);
- }
-
- @Override
- void useMaxTimeFromWALog(long time) {
- if (time > lastTime)
- lastTime = time;
- }
-
- @Override
- long setUpdateTimes(List<Mutation> mutations) {
-
- long currTime = RelativeTime.currentTimeMillis();
-
- synchronized (this) {
- if (mutations.size() == 0)
- return lastTime;
-
- currTime = updateTime(currTime);
- }
-
- for (Mutation mutation : mutations)
- setSystemTimes(mutation, currTime);
-
- return currTime;
- }
-
- private long updateTime(long currTime) {
- if (currTime < lastTime) {
- if (currTime - lastUpdateTime > 0) {
- // not in same millisecond as last call
- // to this method so move ahead slowly
- lastTime++;
- }
-
- lastUpdateTime = currTime;
-
- currTime = lastTime;
- } else {
- lastTime = currTime;
- }
- return currTime;
- }
-
- @Override
- long getTime() {
- return lastTime;
- }
-
- @Override
- long getAndUpdateTime() {
- long currTime = RelativeTime.currentTimeMillis();
-
- synchronized (this) {
- currTime = updateTime(currTime);
- }
-
- return currTime;
- }
-
- }
-
- static class LogicalTime extends TabletTime {
- AtomicLong nextTime;
-
- private LogicalTime(Long time) {
- this.nextTime = new AtomicLong(time.longValue() + 1);
- }
-
- @Override
- void useMaxTimeFromWALog(long time) {
- time++;
-
- if (this.nextTime.get() < time) {
- this.nextTime.set(time);
- }
- }
-
- @Override
- public String getMetadataValue() {
- return getMetadataValue(getTime());
- }
-
- @Override
- public String getMetadataValue(long time) {
- return LOGICAL_TIME_ID + "" + time;
- }
-
- @Override
- long setUpdateTimes(List<Mutation> mutations) {
- if (mutations.size() == 0)
- return getTime();
-
- long time = nextTime.getAndAdd(mutations.size());
- for (Mutation mutation : mutations)
- setSystemTimes(mutation, time++);
-
- return time - 1;
- }
-
- @Override
- long getTime() {
- return nextTime.get() - 1;
- }
-
- @Override
- long getAndUpdateTime() {
- return nextTime.getAndIncrement();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/TooManyFilesException.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TooManyFilesException.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TooManyFilesException.java
deleted file mode 100644
index b6d63ad..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TooManyFilesException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.IOException;
-
-public class TooManyFilesException extends IOException {
-
- private static final long serialVersionUID = 1L;
-
- public TooManyFilesException(String msg) {
- super(msg);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/UniqueNameAllocator.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/UniqueNameAllocator.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/UniqueNameAllocator.java
deleted file mode 100644
index 9629948..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/UniqueNameAllocator.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.util.Random;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.util.FastFormat;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-
-/**
- * Allocates unique names for an accumulo instance. The names are unique for the lifetime of the instance.
- *
- * This is useful for filenames because it makes caching easy.
- *
- */
-
-public class UniqueNameAllocator {
- private long next = 0;
- private long maxAllocated = 0;
- private String nextNamePath;
- private Random rand;
-
- private UniqueNameAllocator() {
- nextNamePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZNEXT_FILE;
- rand = new Random();
- }
-
- public synchronized String getNextName() {
-
- while (next >= maxAllocated) {
- final int allocate = 100 + rand.nextInt(100);
-
- try {
- byte[] max = ZooReaderWriter.getRetryingInstance().mutate(nextNamePath, null, ZooUtil.PRIVATE, new ZooReaderWriter.Mutator() {
- public byte[] mutate(byte[] currentValue) throws Exception {
- long l = Long.parseLong(new String(currentValue), Character.MAX_RADIX);
- l += allocate;
- return Long.toString(l, Character.MAX_RADIX).getBytes();
- }
- });
-
- maxAllocated = Long.parseLong(new String(max), Character.MAX_RADIX);
- next = maxAllocated - allocate;
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- return new String(FastFormat.toZeroPaddedString(next++, 7, Character.MAX_RADIX, new byte[0]));
- }
-
- private static UniqueNameAllocator instance = null;
-
- public static synchronized UniqueNameAllocator getInstance() {
- if (instance == null)
- instance = new UniqueNameAllocator();
-
- return instance;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java
deleted file mode 100644
index d1e7b90..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.compaction;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.accumulo.server.fs.FileRef;
-
-/**
- * A plan for a compaction: the input files, the files that are *not* inputs to a compaction that should
- * simply be deleted, and the optional parameters used to create the resulting output file.
- */
-public class CompactionPlan {
- public final List<FileRef> inputFiles = new ArrayList<FileRef>();
- public final List<FileRef> deleteFiles = new ArrayList<FileRef>();
- public WriteParameters writeParameters = null;
-
- public String toString() {
- StringBuilder b = new StringBuilder();
- b.append(inputFiles.toString());
- if (!deleteFiles.isEmpty()) {
- b.append(" files to be deleted ");
- b.append(deleteFiles);
- if (writeParameters != null) {
- if (writeParameters.getCompressType() != null)
- b.append(" compress type " + writeParameters.getCompressType());
- if (writeParameters.getHdfsBlockSize() != 0)
- b.append(" hdfs block size " + writeParameters.getHdfsBlockSize());
- if (writeParameters.getBlockSize() != 0)
- b.append(" data block size " + writeParameters.getBlockSize());
- if (writeParameters.getIndexBlockSize() != 0)
- b.append(" index block size " + writeParameters.getIndexBlockSize());
- if (writeParameters.getReplication() != 0)
- b.append(" replication " + writeParameters.getReplication());
- }
- }
- return b.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
deleted file mode 100644
index 1fe0537..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.compaction;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * The interface for customizing major compactions.
- * <p>
- * The tablet server has one thread to ask many tablets if they should compact. When the strategy returns true, then tablet is added to the queue of tablets
- * waiting for a compaction thread. Once a thread is available, the {@link #gatherInformation(MajorCompactionRequest)} method is called outside the tablets'
- * lock. This gives the strategy the ability to read information that maybe expensive to fetch. Once the gatherInformation returns, the tablet lock is grabbed
- * and the compactionPlan computed. This should *not* do expensive operations, especially not I/O. Note that the number of files may change between calls to
- * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)}.
- * <p>
- * <b>Note:</b> the strategy object used for the {@link #shouldCompact(MajorCompactionRequest)} call is going to be different from the one used in the
- * compaction thread.
- */
-public abstract class CompactionStrategy {
-
- /**
- * The settings for the compaction strategy pulled from zookeeper. The <tt>table.compacations.major.strategy.opts</tt> part of the setting will be removed.
- *
- * @param options
- */
- public void init(Map<String,String> options) {}
-
- /**
- * Determine if this tablet is eligible for a major compaction. It's ok if it later determines (through {@link #gatherInformation(MajorCompactionRequest)} and
- * {@link #getCompactionPlan(MajorCompactionRequest)}) that it does not need to. Any state stored during shouldCompact will no longer exist when
- * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)} are called.
- *
- */
- public abstract boolean shouldCompact(MajorCompactionRequest request) throws IOException;
-
- /**
- * Called prior to obtaining the tablet lock, useful for examining metadata or indexes. State collected during this method will be available during the call
- * the {@link #getCompactionPlan(MajorCompactionRequest)}.
- *
- * @param request
- * basic details about the tablet
- * @throws IOException
- */
- public void gatherInformation(MajorCompactionRequest request) throws IOException {}
-
- /**
- * Get the plan for compacting a tablets files. Called while holding the tablet lock, so it should not be doing any blocking.
- *
- * @param request
- * basic details about the tablet
- * @return the plan for a major compaction, or null to cancel the compaction.
- * @throws IOException
- */
- abstract public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java
deleted file mode 100644
index c088d26..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.compaction;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.server.fs.FileRef;
-
-public class DefaultCompactionStrategy extends CompactionStrategy {
-
- @Override
- public boolean shouldCompact(MajorCompactionRequest request) throws IOException {
- CompactionPlan plan = getCompactionPlan(request);
- return plan != null && !plan.inputFiles.isEmpty();
- }
-
- @Override
- public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException {
- CompactionPlan result = new CompactionPlan();
-
- List<FileRef> toCompact = findMapFilesToCompact(request);
- if (toCompact == null || toCompact.isEmpty())
- return result;
- result.inputFiles.addAll(toCompact);
- return result;
- }
-
- private static class CompactionFile {
- public FileRef file;
- public long size;
- public CompactionFile(FileRef file, long size) {
- super();
- this.file = file;
- this.size = size;
- }
- }
-
-
- private List<FileRef> findMapFilesToCompact(MajorCompactionRequest request) {
- MajorCompactionReason reason = request.getReason();
- if (reason == MajorCompactionReason.USER) {
- return new ArrayList<FileRef>(request.getFiles().keySet());
- }
- if (reason == MajorCompactionReason.CHOP) {
- // should not happen, but this is safe
- return new ArrayList<FileRef>(request.getFiles().keySet());
- }
-
- if (request.getFiles().size() <= 1)
- return null;
- TreeSet<CompactionFile> candidateFiles = new TreeSet<CompactionFile>(new Comparator<CompactionFile>() {
- @Override
- public int compare(CompactionFile o1, CompactionFile o2) {
- if (o1 == o2)
- return 0;
- if (o1.size < o2.size)
- return -1;
- if (o1.size > o2.size)
- return 1;
- return o1.file.compareTo(o2.file);
- }
- });
-
- double ratio = Double.parseDouble(request.getTableConfig(Property.TABLE_MAJC_RATIO.getKey()));
- int maxFilesToCompact = Integer.parseInt(request.getTableConfig(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey()));
- int maxFilesPerTablet = request.getMaxFilesPerTablet();
-
- for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) {
- candidateFiles.add(new CompactionFile(entry.getKey(), entry.getValue().getSize()));
- }
-
- long totalSize = 0;
- for (CompactionFile mfi : candidateFiles) {
- totalSize += mfi.size;
- }
-
- List<FileRef> files = new ArrayList<FileRef>();
-
- while (candidateFiles.size() > 1) {
- CompactionFile max = candidateFiles.last();
- if (max.size * ratio <= totalSize) {
- files.clear();
- for (CompactionFile mfi : candidateFiles) {
- files.add(mfi.file);
- if (files.size() >= maxFilesToCompact)
- break;
- }
-
- break;
- }
- totalSize -= max.size;
- candidateFiles.remove(max);
- }
-
- int totalFilesToCompact = 0;
- if (request.getFiles().size() > maxFilesPerTablet)
- totalFilesToCompact = request.getFiles().size() - maxFilesPerTablet + 1;
-
- totalFilesToCompact = Math.min(totalFilesToCompact, maxFilesToCompact);
-
- if (files.size() < totalFilesToCompact) {
-
- TreeMap<FileRef,Long> tfc = new TreeMap<FileRef,Long>();
- for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) {
- tfc.put(entry.getKey(), entry.getValue().getSize());
- }
- tfc.keySet().removeAll(files);
-
- // put data in candidateFiles to sort it
- candidateFiles.clear();
- for (Entry<FileRef,Long> entry : tfc.entrySet())
- candidateFiles.add(new CompactionFile(entry.getKey(), entry.getValue()));
-
- for (CompactionFile mfi : candidateFiles) {
- files.add(mfi.file);
- if (files.size() >= totalFilesToCompact)
- break;
- }
- }
-
- return files;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionReason.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionReason.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionReason.java
deleted file mode 100644
index e7f4033..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionReason.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.compaction;
-
-
-public enum MajorCompactionReason {
- // do not change the order, the order of this enum determines the order
- // in which queued major compactions are executed
- USER,
- CHOP,
- NORMAL,
- IDLE
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
deleted file mode 100644
index cadf16d..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.compaction;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Information that can be used to determine how a tablet is to be major compacted, if needed.
- */
-public class MajorCompactionRequest implements Cloneable {
- final private KeyExtent extent;
- final private MajorCompactionReason reason;
- final private VolumeManager volumeManager;
- final private AccumuloConfiguration tableConfig;
- private Map<FileRef,DataFileValue> files;
-
- public MajorCompactionRequest(
- KeyExtent extent,
- MajorCompactionReason reason,
- VolumeManager manager,
- AccumuloConfiguration tabletConfig) {
- this.extent = extent;
- this.reason = reason;
- this.volumeManager = manager;
- this.tableConfig = tabletConfig;
- this.files = Collections.emptyMap();
- }
-
- public MajorCompactionRequest(MajorCompactionRequest mcr) {
- this(mcr.extent, mcr.reason, mcr.volumeManager, mcr.tableConfig);
- // know this is already unmodifiable, no need to wrap again
- this.files = mcr.files;
- }
-
- public KeyExtent getExtent() {
- return extent;
- }
-
- public MajorCompactionReason getReason() {
- return reason;
- }
-
- public Map<FileRef,DataFileValue> getFiles() {
- return files;
- }
-
- public void setFiles(Map<FileRef,DataFileValue> update) {
- this.files = Collections.unmodifiableMap(update);
- }
-
- public FileStatus[] listStatus(Path path) throws IOException {
- // @TODO verify the file isn't some random file in HDFS
- return volumeManager.listStatus(path);
- }
-
- public FileSKVIterator openReader(FileRef ref) throws IOException {
- // @TODO verify the file isn't some random file in HDFS
- // @TODO ensure these files are always closed?
- FileOperations fileFactory = FileOperations.getInstance();
- FileSystem ns = volumeManager.getFileSystemByPath(ref.path());
- FileSKVIterator openReader = fileFactory.openReader(ref.path().toString(), true, ns, ns.getConf(), tableConfig);
- return openReader;
- }
-
- public Map<String,String> getTableProperties() {
- return tableConfig.getAllPropertiesWithPrefix(Property.TABLE_PREFIX);
- }
-
- public String getTableConfig(String key) {
- Property property = Property.getPropertyByKey(key);
- if (property == null || property.isSensitive())
- throw new RuntimeException("Unable to access the configuration value " + key);
- return tableConfig.get(property);
- }
-
- public int getMaxFilesPerTablet() {
- return tableConfig.getMaxFilesPerTablet();
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategy.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategy.java
deleted file mode 100644
index f6c62b5..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/SizeLimitCompactionStrategy.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.compaction;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.server.fs.FileRef;
-
-/**
- *
- */
-public class SizeLimitCompactionStrategy extends DefaultCompactionStrategy {
- public static final String SIZE_LIMIT_OPT = "sizeLimit";
-
- private long limit;
-
- @Override
- public void init(Map<String,String> options) {
- limit = AccumuloConfiguration.getMemoryInBytes(options.get(SIZE_LIMIT_OPT));
- }
-
- private MajorCompactionRequest filterFiles(MajorCompactionRequest mcr) {
- Map<FileRef,DataFileValue> filteredFiles = new HashMap<FileRef,DataFileValue>();
- for (Entry<FileRef,DataFileValue> entry : mcr.getFiles().entrySet()) {
- if (entry.getValue().getSize() <= limit) {
- filteredFiles.put(entry.getKey(), entry.getValue());
- }
- }
-
- mcr = new MajorCompactionRequest(mcr);
- mcr.setFiles(filteredFiles);
-
- return mcr;
- }
-
- @Override
- public boolean shouldCompact(MajorCompactionRequest request) throws IOException {
- return super.shouldCompact(filterFiles(request));
- }
-
- @Override
- public void gatherInformation(MajorCompactionRequest request) throws IOException {
- super.gatherInformation(filterFiles(request));
- }
-
- @Override
- public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException {
- return super.getCompactionPlan(filterFiles(request));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/WriteParameters.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/WriteParameters.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/WriteParameters.java
deleted file mode 100644
index 6cb8254..0000000
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/WriteParameters.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.compaction;
-
-public class WriteParameters {
- private String compressType = null;
- private long hdfsBlockSize = 0;
- private long blockSize = 0;
- private long indexBlockSize = 0;
- private int replication = 0;
-
- public String getCompressType() {
- return compressType;
- }
- public void setCompressType(String compressType) {
- this.compressType = compressType;
- }
- public long getHdfsBlockSize() {
- return hdfsBlockSize;
- }
- public void setHdfsBlockSize(long hdfsBlockSize) {
- this.hdfsBlockSize = hdfsBlockSize;
- }
- public long getBlockSize() {
- return blockSize;
- }
- public void setBlockSize(long blockSize) {
- this.blockSize = blockSize;
- }
- public long getIndexBlockSize() {
- return indexBlockSize;
- }
- public void setIndexBlockSize(long indexBlockSize) {
- this.indexBlockSize = indexBlockSize;
- }
- public int getReplication() {
- return replication;
- }
- public void setReplication(int replication) {
- this.replication = replication;
- }
-}
\ No newline at end of file