You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/10/25 01:04:38 UTC
[1/4] ACCUMULO-391 Use more accurate "InputTableConfig" term
Updated Branches:
refs/heads/master a5cf86094 -> 61353d1e3
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
index f2852f9..15fb7c7 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
@@ -53,7 +53,6 @@ import org.apache.accumulo.server.tabletserver.compaction.DefaultCompactionStrat
import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionReason;
import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionRequest;
import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
import org.apache.accumulo.trace.instrument.TraceExecutorService;
import org.apache.log4j.Logger;
@@ -64,7 +63,7 @@ import org.apache.log4j.Logger;
*
*/
public class TabletServerResourceManager {
-
+
private ExecutorService minorCompactionThreadPool;
private ExecutorService majorCompactionThreadPool;
private ExecutorService rootMajorCompactionThreadPool;
@@ -78,23 +77,23 @@ public class TabletServerResourceManager {
private ExecutorService readAheadThreadPool;
private ExecutorService defaultReadAheadThreadPool;
private Map<String,ExecutorService> threadPools = new TreeMap<String,ExecutorService>();
-
+
private HashSet<TabletResourceManager> tabletResources;
-
+
private final VolumeManager fs;
-
+
private FileManager fileManager;
-
+
private MemoryManager memoryManager;
-
+
private MemoryManagementFramework memMgmt;
-
+
private final LruBlockCache _dCache;
private final LruBlockCache _iCache;
private final ServerConfiguration conf;
-
+
private static final Logger log = Logger.getLogger(TabletServerResourceManager.class);
-
+
private ExecutorService addEs(String name, ExecutorService tp) {
if (threadPools.containsKey(name)) {
throw new IllegalArgumentException("Cannot create two executor services with same name " + name);
@@ -103,7 +102,7 @@ public class TabletServerResourceManager {
threadPools.put(name, tp);
return tp;
}
-
+
private ExecutorService addEs(final Property maxThreads, String name, final ThreadPoolExecutor tp) {
ExecutorService result = addEs(name, tp);
SimpleTimer.getInstance().schedule(new Runnable() {
@@ -120,7 +119,7 @@ public class TabletServerResourceManager {
log.error(t, t);
}
}
-
+
}, 1000, 10 * 1000);
return result;
}
@@ -128,7 +127,7 @@ public class TabletServerResourceManager {
private ExecutorService createEs(int max, String name) {
return addEs(name, Executors.newFixedThreadPool(max, new NamingThreadFactory(name)));
}
-
+
private ExecutorService createEs(Property max, String name) {
return createEs(max, name, new LinkedBlockingQueue<Runnable>());
}
@@ -142,22 +141,22 @@ public class TabletServerResourceManager {
private ExecutorService createEs(int min, int max, int timeout, String name) {
return addEs(name, new ThreadPoolExecutor(min, max, timeout, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(name)));
}
-
+
public TabletServerResourceManager(Instance instance, VolumeManager fs) {
this.conf = new ServerConfiguration(instance);
this.fs = fs;
final AccumuloConfiguration acuConf = conf.getConfiguration();
-
+
long maxMemory = acuConf.getMemoryInBytes(Property.TSERV_MAXMEM);
boolean usingNativeMap = acuConf.getBoolean(Property.TSERV_NATIVEMAP_ENABLED) && NativeMap.loadedNativeLibraries();
-
+
long blockSize = acuConf.getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE);
long dCacheSize = acuConf.getMemoryInBytes(Property.TSERV_DATACACHE_SIZE);
long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE);
-
+
_iCache = new LruBlockCache(iCacheSize, blockSize);
_dCache = new LruBlockCache(dCacheSize, blockSize);
-
+
Runtime runtime = Runtime.getRuntime();
if (!usingNativeMap && maxMemory + dCacheSize + iCacheSize > runtime.maxMemory()) {
throw new IllegalArgumentException(String.format(
@@ -171,177 +170,183 @@ public class TabletServerResourceManager {
if (!usingNativeMap && maxMemory > runtime.maxMemory() - (runtime.totalMemory() - runtime.freeMemory())) {
log.warn("In-memory map may not fit into local memory space.");
}
-
+
minorCompactionThreadPool = createEs(Property.TSERV_MINC_MAXCONCURRENT, "minor compactor");
-
+
// make this thread pool have a priority queue... and execute tablets with the most
// files first!
majorCompactionThreadPool = createEs(Property.TSERV_MAJC_MAXCONCURRENT, "major compactor", new CompactionQueue());
rootMajorCompactionThreadPool = createEs(0, 1, 300, "md root major compactor");
defaultMajorCompactionThreadPool = createEs(0, 1, 300, "md major compactor");
-
+
splitThreadPool = createEs(1, "splitter");
defaultSplitThreadPool = createEs(0, 1, 60, "md splitter");
-
+
defaultMigrationPool = createEs(0, 1, 60, "metadata tablet migration");
migrationPool = createEs(Property.TSERV_MIGRATE_MAXCONCURRENT, "tablet migration");
-
+
// not sure if concurrent assignments can run safely... even if they could there is probably no benefit at startup because
// individual tablet servers are already running assignments concurrently... having each individual tablet server run
// concurrent assignments would put more load on the metadata table at startup
assignmentPool = createEs(1, "tablet assignment");
-
+
assignMetaDataPool = createEs(0, 1, 60, "metadata tablet assignment");
-
+
readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, "tablet read ahead");
defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT, "metadata tablets read ahead");
-
+
tabletResources = new HashSet<TabletResourceManager>();
-
+
int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
-
+
fileManager = new FileManager(conf, fs, maxOpenFiles, _dCache, _iCache);
-
+
memoryManager = Property.createInstanceFromPropertyName(acuConf, Property.TSERV_MEM_MGMT, MemoryManager.class, new LargestFirstMemoryManager());
memoryManager.init(conf);
memMgmt = new MemoryManagementFramework();
}
-
+
private static class TabletStateImpl implements TabletState, Cloneable {
-
+
private long lct;
private Tablet tablet;
private long mts;
private long mcmts;
-
+
public TabletStateImpl(Tablet t, long mts, long lct, long mcmts) {
this.tablet = t;
this.mts = mts;
this.lct = lct;
this.mcmts = mcmts;
}
-
+
+ @Override
public KeyExtent getExtent() {
return tablet.getExtent();
}
-
+
Tablet getTablet() {
return tablet;
}
-
+
+ @Override
public long getLastCommitTime() {
return lct;
}
-
+
+ @Override
public long getMemTableSize() {
return mts;
}
-
+
+ @Override
public long getMinorCompactingMemTableSize() {
return mcmts;
}
}
-
+
private class MemoryManagementFramework {
private final Map<KeyExtent,TabletStateImpl> tabletReports;
private LinkedBlockingQueue<TabletStateImpl> memUsageReports;
private long lastMemCheckTime = System.currentTimeMillis();
private long maxMem;
-
+
MemoryManagementFramework() {
tabletReports = Collections.synchronizedMap(new HashMap<KeyExtent,TabletStateImpl>());
memUsageReports = new LinkedBlockingQueue<TabletStateImpl>();
maxMem = conf.getConfiguration().getMemoryInBytes(Property.TSERV_MAXMEM);
-
+
Runnable r1 = new Runnable() {
+ @Override
public void run() {
processTabletMemStats();
}
};
-
+
Thread t1 = new Daemon(new LoggingRunnable(log, r1));
t1.setPriority(Thread.NORM_PRIORITY + 1);
t1.setName("Accumulo Memory Guard");
t1.start();
-
+
Runnable r2 = new Runnable() {
+ @Override
public void run() {
manageMemory();
}
};
-
+
Thread t2 = new Daemon(new LoggingRunnable(log, r2));
t2.setName("Accumulo Minor Compaction Initiator");
t2.start();
-
+
}
-
+
private long lastMemTotal = 0;
-
+
private void processTabletMemStats() {
while (true) {
try {
-
+
TabletStateImpl report = memUsageReports.take();
-
+
while (report != null) {
tabletReports.put(report.getExtent(), report);
report = memUsageReports.poll();
}
-
+
long delta = System.currentTimeMillis() - lastMemCheckTime;
if (holdCommits || delta > 50 || lastMemTotal > 0.90 * maxMem) {
lastMemCheckTime = System.currentTimeMillis();
-
+
long totalMemUsed = 0;
-
+
synchronized (tabletReports) {
for (TabletStateImpl tsi : tabletReports.values()) {
totalMemUsed += tsi.getMemTableSize();
totalMemUsed += tsi.getMinorCompactingMemTableSize();
}
}
-
+
if (totalMemUsed > 0.95 * maxMem) {
holdAllCommits(true);
} else {
holdAllCommits(false);
}
-
+
lastMemTotal = totalMemUsed;
}
-
+
} catch (InterruptedException e) {
log.warn(e, e);
}
}
}
-
+
private void manageMemory() {
while (true) {
MemoryManagementActions mma = null;
-
+
try {
ArrayList<TabletState> tablets;
synchronized (tabletReports) {
tablets = new ArrayList<TabletState>(tabletReports.values());
}
mma = memoryManager.getMemoryManagementActions(tablets);
-
+
} catch (Throwable t) {
log.error("Memory manager failed " + t.getMessage(), t);
}
-
+
try {
if (mma != null && mma.tabletsToMinorCompact != null && mma.tabletsToMinorCompact.size() > 0) {
for (KeyExtent keyExtent : mma.tabletsToMinorCompact) {
TabletStateImpl tabletReport = tabletReports.get(keyExtent);
-
+
if (tabletReport == null) {
log.warn("Memory manager asked to compact nonexistant tablet " + keyExtent);
continue;
}
-
+
if (!tabletReport.getTablet().initiateMinorCompaction(MinorCompactionReason.SYSTEM)) {
if (tabletReport.getTablet().isClosed()) {
tabletReports.remove(tabletReport.getExtent());
@@ -351,48 +356,48 @@ public class TabletServerResourceManager {
}
}
}
-
+
// log.debug("mma.tabletsToMinorCompact = "+mma.tabletsToMinorCompact);
}
} catch (Throwable t) {
log.error("Minor compactions for memory managment failed", t);
}
-
+
UtilWaitThread.sleep(250);
}
}
-
+
public void updateMemoryUsageStats(Tablet tablet, long size, long lastCommitTime, long mincSize) {
memUsageReports.add(new TabletStateImpl(tablet, size, lastCommitTime, mincSize));
}
-
+
public void tabletClosed(KeyExtent extent) {
tabletReports.remove(extent);
}
}
-
+
private final Object commitHold = new Object();
private volatile boolean holdCommits = false;
private long holdStartTime;
-
+
protected void holdAllCommits(boolean holdAllCommits) {
synchronized (commitHold) {
if (holdCommits != holdAllCommits) {
holdCommits = holdAllCommits;
-
+
if (holdCommits) {
holdStartTime = System.currentTimeMillis();
}
-
+
if (!holdCommits) {
log.debug(String.format("Commits held for %6.2f secs", (System.currentTimeMillis() - holdStartTime) / 1000.0));
commitHold.notifyAll();
}
}
}
-
+
}
-
+
void waitUntilCommitsAreEnabled() {
if (holdCommits) {
long timeout = System.currentTimeMillis() + conf.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
@@ -407,7 +412,7 @@ public class TabletServerResourceManager {
}
}
}
-
+
public long holdTime() {
if (!holdCommits)
return 0;
@@ -415,12 +420,12 @@ public class TabletServerResourceManager {
return System.currentTimeMillis() - holdStartTime;
}
}
-
+
public void close() {
for (ExecutorService executorService : threadPools.values()) {
executorService.shutdown();
}
-
+
for (Entry<String,ExecutorService> entry : threadPools.entrySet()) {
while (true) {
try {
@@ -433,66 +438,66 @@ public class TabletServerResourceManager {
}
}
}
-
+
public synchronized TabletResourceManager createTabletResourceManager() {
TabletResourceManager trm = new TabletResourceManager();
return trm;
}
-
+
synchronized private void addTabletResource(TabletResourceManager tr) {
tabletResources.add(tr);
}
-
+
synchronized private void removeTabletResource(TabletResourceManager tr) {
tabletResources.remove(tr);
}
-
+
public class TabletResourceManager {
-
+
private final long creationTime = System.currentTimeMillis();
-
+
private volatile boolean openFilesReserved = false;
-
+
private volatile boolean closed = false;
-
+
private Tablet tablet;
-
+
private AccumuloConfiguration tableConf;
-
+
TabletResourceManager() {}
-
+
void setTablet(Tablet tablet, AccumuloConfiguration tableConf) {
this.tablet = tablet;
this.tableConf = tableConf;
// TabletResourceManager is not really initialized until this
// function is called.... so do not make it publicly available
// until now
-
+
addTabletResource(this);
}
-
+
// BEGIN methods that Tablets call to manage their set of open map files
-
+
public void importedMapFiles() {
lastReportedCommitTime = System.currentTimeMillis();
}
-
+
synchronized ScanFileManager newScanFileManager() {
if (closed)
throw new IllegalStateException("closed");
return fileManager.newScanFileManager(tablet.getExtent());
}
-
+
// END methods that Tablets call to manage their set of open map files
-
+
// BEGIN methods that Tablets call to manage memory
-
+
private AtomicLong lastReportedSize = new AtomicLong();
private AtomicLong lastReportedMincSize = new AtomicLong();
private volatile long lastReportedCommitTime = 0;
-
+
public void updateMemoryUsageStats(long size, long mincSize) {
-
+
// do not want to update stats for every little change,
// so only do it under certain circumstances... the reason
// for this is that reporting stats acquires a lock, do
@@ -510,32 +515,32 @@ public class TabletServerResourceManager {
if ((lrms > 0 && mincSize == 0 || lrms == 0 && mincSize > 0) && lastReportedMincSize.compareAndSet(lrms, mincSize)) {
report = true;
}
-
+
long currentTime = System.currentTimeMillis();
if ((delta > 32000 || delta < 0 || (currentTime - lastReportedCommitTime > 1000)) && lastReportedSize.compareAndSet(lrs, totalSize)) {
if (delta > 0)
lastReportedCommitTime = currentTime;
report = true;
}
-
+
if (report)
memMgmt.updateMemoryUsageStats(tablet, size, lastReportedCommitTime, mincSize);
}
-
+
// END methods that Tablets call to manage memory
-
+
// BEGIN methods that Tablets call to make decisions about major compaction
// when too many files are open, we may want tablets to compact down
// to one map file
boolean needsMajorCompaction(SortedMap<FileRef,DataFileValue> tabletFiles, MajorCompactionReason reason) {
if (closed)
return false;// throw new IOException("closed");
-
+
// int threshold;
-
+
if (reason == MajorCompactionReason.USER)
return true;
-
+
if (reason == MajorCompactionReason.IDLE) {
// threshold = 1;
long idleTime;
@@ -546,12 +551,13 @@ public class TabletServerResourceManager {
} else {
idleTime = System.currentTimeMillis() - lastReportedCommitTime;
}
-
+
if (idleTime < tableConf.getTimeInMillis(Property.TABLE_MAJC_COMPACTALL_IDLETIME)) {
return false;
}
}
- CompactionStrategy strategy = Property.createInstanceFromPropertyName(tableConf, Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class, new DefaultCompactionStrategy());
+ CompactionStrategy strategy = Property.createInstanceFromPropertyName(tableConf, Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class,
+ new DefaultCompactionStrategy());
strategy.init(Property.getCompactionStrategyOptions(tableConf));
MajorCompactionRequest request = new MajorCompactionRequest(tablet.getExtent(), reason, TabletServerResourceManager.this.fs, tableConf);
request.setFiles(tabletFiles);
@@ -561,16 +567,16 @@ public class TabletServerResourceManager {
return false;
}
}
-
+
// END methods that Tablets call to make decisions about major compaction
-
+
// tablets call this method to run minor compactions,
// this allows us to control how many minor compactions
// run concurrently in a tablet server
void executeMinorCompaction(final Runnable r) {
minorCompactionThreadPool.execute(new LoggingRunnable(log, r));
}
-
+
void close() throws IOException {
// always obtain locks in same order to avoid deadlock
synchronized (TabletServerResourceManager.this) {
@@ -579,27 +585,27 @@ public class TabletServerResourceManager {
throw new IOException("closed");
if (openFilesReserved)
throw new IOException("tired to close files while open files reserved");
-
+
TabletServerResourceManager.this.removeTabletResource(this);
-
+
memMgmt.tabletClosed(tablet.getExtent());
memoryManager.tabletClosed(tablet.getExtent());
-
+
closed = true;
}
}
}
-
+
public TabletServerResourceManager getTabletServerResourceManager() {
return TabletServerResourceManager.this;
}
-
+
public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) {
TabletServerResourceManager.this.executeMajorCompaction(tablet, compactionTask);
}
-
+
}
-
+
public void executeSplit(KeyExtent tablet, Runnable splitTask) {
if (tablet.isMeta()) {
if (tablet.isRootTablet()) {
@@ -611,7 +617,7 @@ public class TabletServerResourceManager {
splitThreadPool.execute(splitTask);
}
}
-
+
public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) {
if (tablet.isRootTablet()) {
rootMajorCompactionThreadPool.execute(compactionTask);
@@ -621,7 +627,7 @@ public class TabletServerResourceManager {
majorCompactionThreadPool.execute(compactionTask);
}
}
-
+
public void executeReadAhead(KeyExtent tablet, Runnable task) {
if (tablet.isRootTablet()) {
task.run();
@@ -631,15 +637,15 @@ public class TabletServerResourceManager {
readAheadThreadPool.execute(task);
}
}
-
+
public void addAssignment(Runnable assignmentHandler) {
assignmentPool.execute(assignmentHandler);
}
-
+
public void addMetaDataAssignment(Runnable assignmentHandler) {
assignMetaDataPool.execute(assignmentHandler);
}
-
+
public void addMigration(KeyExtent tablet, Runnable migrationHandler) {
if (tablet.isRootTablet()) {
migrationHandler.run();
@@ -649,7 +655,7 @@ public class TabletServerResourceManager {
migrationPool.execute(migrationHandler);
}
}
-
+
public void stopSplits() {
splitThreadPool.shutdown();
defaultSplitThreadPool.shutdown();
@@ -667,7 +673,7 @@ public class TabletServerResourceManager {
}
}
}
-
+
public void stopNormalAssignments() {
assignmentPool.shutdown();
while (true) {
@@ -681,7 +687,7 @@ public class TabletServerResourceManager {
}
}
}
-
+
public void stopMetadataAssignments() {
assignMetaDataPool.shutdown();
while (true) {
@@ -695,13 +701,13 @@ public class TabletServerResourceManager {
}
}
}
-
+
public LruBlockCache getIndexCache() {
return _iCache;
}
-
+
public LruBlockCache getDataCache() {
return _dCache;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
index 10f41cd..1fe0537 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
@@ -23,43 +23,41 @@ import java.util.Map;
* The interface for customizing major compactions.
* <p>
* The tablet server has one thread to ask many tablets if they should compact. When the strategy returns true, then tablet is added to the queue of tablets
- * waiting for a compaction thread. Once a thread is available, the {@link #gatherInformation(MajorCompactionRequest)} method is called outside the tablets' lock. This gives the strategy the
- * ability to read information that maybe expensive to fetch. Once the gatherInformation returns, the tablet lock is grabbed and the compactionPlan computed.
- * This should *not* do expensive operations, especially not I/O. Note that the number of files may change between calls to {@link #gatherInformation(MajorCompactionRequest)} and
- * {@link #getCompactionPlan(MajorCompactionRequest)}.
+ * waiting for a compaction thread. Once a thread is available, the {@link #gatherInformation(MajorCompactionRequest)} method is called outside the tablets'
+ * lock. This gives the strategy the ability to read information that maybe expensive to fetch. Once the gatherInformation returns, the tablet lock is grabbed
+ * and the compactionPlan computed. This should *not* do expensive operations, especially not I/O. Note that the number of files may change between calls to
+ * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)}.
* <p>
- * <b>Note:</b> the strategy object used for the {@link #shouldCompact(MajorCompactionRequest)} call is going to be different from the one used in the compaction thread.
+ * <b>Note:</b> the strategy object used for the {@link #shouldCompact(MajorCompactionRequest)} call is going to be different from the one used in the
+ * compaction thread.
*/
public abstract class CompactionStrategy {
-
+
/**
- * The settings for the compaction strategy pulled from zookeeper. The <tt>table.compacations.major.strategy.opts</tt> part of the setting will be removed.
+ * The settings for the compaction strategy pulled from zookeeper. The <tt>table.compacations.major.strategy.opts</tt> part of the setting will be removed.
*
* @param options
*/
public void init(Map<String,String> options) {}
-
+
/**
* Determine if this tablet is eligible for a major compaction. It's ok if it later determines (through {@link #gatherInformation(MajorCompactionRequest)} and
* {@link #getCompactionPlan(MajorCompactionRequest)}) that it does not need to. Any state stored during shouldCompact will no longer exist when
* {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)} are called.
*
- * @param request
- * @return
- * @throws IOException
*/
public abstract boolean shouldCompact(MajorCompactionRequest request) throws IOException;
-
+
/**
- * Called prior to obtaining the tablet lock, useful for examining metadata or indexes.
- * State collected during this method will be available during the call the {@link #getCompactionPlan(MajorCompactionRequest)}.
+ * Called prior to obtaining the tablet lock, useful for examining metadata or indexes. State collected during this method will be available during the call
+ * the {@link #getCompactionPlan(MajorCompactionRequest)}.
*
* @param request
* basic details about the tablet
* @throws IOException
*/
public void gatherInformation(MajorCompactionRequest request) throws IOException {}
-
+
/**
* Get the plan for compacting a tablets files. Called while holding the tablet lock, so it should not be doing any blocking.
*
@@ -69,5 +67,5 @@ public abstract class CompactionStrategy {
* @throws IOException
*/
abstract public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException;
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java b/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
index 29e1d23..7873df4 100644
--- a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
@@ -62,7 +62,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -146,7 +145,7 @@ public class TableOperationsIT {
assertEquals(tableName, diskUsages.get(0).getTables().first());
String newTable = makeTableName();
-
+
// clone table
connector.tableOperations().clone(tableName, newTable, false, null, null);
@@ -182,36 +181,36 @@ public class TableOperationsIT {
assertEquals(DefaultKeySizeConstraint.class.getName(), props.get(Property.TABLE_CONSTRAINT_PREFIX.toString() + "1"));
connector.tableOperations().delete(tableName);
}
-
+
@Test(timeout = 30 * 1000)
public void createMergeClonedTable() throws Exception {
String originalTable = makeTableName();
TableOperations tops = connector.tableOperations();
-
+
TreeSet<Text> splits = Sets.newTreeSet(Arrays.asList(new Text("a"), new Text("b"), new Text("c"), new Text("d")));
-
+
tops.create(originalTable);
tops.addSplits(originalTable, splits);
-
+
BatchWriter bw = connector.createBatchWriter(originalTable, new BatchWriterConfig());
for (Text row : splits) {
Mutation m = new Mutation(row);
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
- m.put(Integer.toString(i), Integer.toString(j), Integer.toString(i+j));
+ m.put(Integer.toString(i), Integer.toString(j), Integer.toString(i + j));
}
}
-
+
bw.addMutation(m);
}
-
+
bw.close();
-
+
String clonedTable = makeTableName();
-
+
tops.clone(originalTable, clonedTable, true, null, null);
tops.merge(clonedTable, null, new Text("b"));
-
+
Map<String,Integer> rowCounts = Maps.newHashMap();
Scanner s = connector.createScanner(clonedTable, new Authorizations());
for (Entry<Key,Value> entry : s) {
@@ -219,22 +218,22 @@ public class TableOperationsIT {
String row = key.getRow().toString();
String cf = key.getColumnFamily().toString(), cq = key.getColumnQualifier().toString();
String value = entry.getValue().toString();
-
+
if (rowCounts.containsKey(row)) {
rowCounts.put(row, rowCounts.get(row) + 1);
} else {
rowCounts.put(row, 1);
}
-
+
Assert.assertEquals(Integer.parseInt(cf) + Integer.parseInt(cq), Integer.parseInt(value));
}
-
+
Collection<Text> clonedSplits = tops.listSplits(clonedTable);
Set<Text> expectedSplits = Sets.newHashSet(new Text("b"), new Text("c"), new Text("d"));
for (Text clonedSplit : clonedSplits) {
Assert.assertTrue("Encountered unexpected split on the cloned table: " + clonedSplit, expectedSplits.remove(clonedSplit));
}
-
+
Assert.assertTrue("Did not find all expected splits on the cloned table: " + expectedSplits, expectedSplits.isEmpty());
}
[2/4] ACCUMULO-391 Use more accurate "InputTableConfig" term
Posted by ct...@apache.org.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index a4009ad..bd31c15 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -22,7 +22,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
-import java.lang.reflect.Field;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
@@ -217,8 +216,6 @@ import org.apache.commons.collections.map.LRUMap;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
@@ -236,21 +233,21 @@ enum ScanRunState {
public class TabletServer extends AbstractMetricsImpl implements org.apache.accumulo.server.tabletserver.metrics.TabletServerMBean {
private static final Logger log = Logger.getLogger(TabletServer.class);
-
+
private static HashMap<String,Long> prevGcTime = new HashMap<String,Long>();
private static long lastMemorySize = 0;
private static long gcTimeIncreasedCount;
-
+
private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
-
+
private TabletServerLogger logger;
-
+
protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
-
+
private ServerConfiguration serverConfig;
private LogSorter logSorter = null;
-
+
public TabletServer(ServerConfiguration conf, VolumeManager fs) {
super();
this.serverConfig = conf;
@@ -272,36 +269,36 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}, 5000, 5000);
}
-
+
private synchronized static void logGCInfo(AccumuloConfiguration conf) {
List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
Runtime rt = Runtime.getRuntime();
-
+
StringBuilder sb = new StringBuilder("gc");
-
+
boolean sawChange = false;
-
+
long maxIncreaseInCollectionTime = 0;
-
+
for (GarbageCollectorMXBean gcBean : gcmBeans) {
Long prevTime = prevGcTime.get(gcBean.getName());
long pt = 0;
if (prevTime != null) {
pt = prevTime;
}
-
+
long time = gcBean.getCollectionTime();
-
+
if (time - pt != 0) {
sawChange = true;
}
-
+
long increaseInCollectionTime = time - pt;
sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0));
maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
prevGcTime.put(gcBean.getName(), time);
}
-
+
long mem = rt.freeMemory();
if (maxIncreaseInCollectionTime == 0) {
gcTimeIncreasedCount = 0;
@@ -312,90 +309,90 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
gcTimeIncreasedCount = 0;
}
}
-
+
if (mem > lastMemorySize) {
sawChange = true;
}
-
+
String sign = "+";
if (mem - lastMemorySize <= 0) {
sign = "";
}
-
+
sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory()));
-
+
if (sawChange) {
log.debug(sb.toString());
}
-
+
final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
if (maxIncreaseInCollectionTime > keepAliveTimeout) {
Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1);
}
-
+
lastMemorySize = mem;
}
-
+
private TabletStatsKeeper statsKeeper;
-
+
private static class Session {
long lastAccessTime;
long startTime;
String user;
String client = TServerUtils.clientAddress.get();
public boolean reserved;
-
+
public void cleanup() {}
}
-
+
private static class SessionManager {
-
+
SecureRandom random;
Map<Long,Session> sessions;
long maxIdle;
-
+
SessionManager(AccumuloConfiguration conf) {
random = new SecureRandom();
sessions = new HashMap<Long,Session>();
-
+
maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
-
+
Runnable r = new Runnable() {
@Override
public void run() {
sweep(maxIdle);
}
};
-
+
SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
}
-
+
synchronized long createSession(Session session, boolean reserve) {
long sid = random.nextLong();
-
+
while (sessions.containsKey(sid)) {
sid = random.nextLong();
}
-
+
sessions.put(sid, session);
-
+
session.reserved = reserve;
-
+
session.startTime = session.lastAccessTime = System.currentTimeMillis();
-
+
return sid;
}
-
+
long getMaxIdleTime() {
return maxIdle;
}
-
+
/**
* while a session is reserved, it cannot be canceled or removed
*
* @param sessionId
*/
-
+
synchronized Session reserveSession(long sessionId) {
Session session = sessions.get(sessionId);
if (session != null) {
@@ -403,11 +400,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
throw new IllegalStateException();
session.reserved = true;
}
-
+
return session;
-
+
}
-
+
synchronized Session reserveSession(long sessionId, boolean wait) {
Session session = sessions.get(sessionId);
if (session != null) {
@@ -418,16 +415,16 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
throw new RuntimeException();
}
}
-
+
if (session.reserved)
throw new IllegalStateException();
session.reserved = true;
}
-
+
return session;
-
+
}
-
+
synchronized void unreserveSession(Session session) {
if (!session.reserved)
throw new IllegalStateException();
@@ -435,24 +432,24 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
session.reserved = false;
session.lastAccessTime = System.currentTimeMillis();
}
-
+
synchronized void unreserveSession(long sessionId) {
Session session = getSession(sessionId);
if (session != null)
unreserveSession(session);
}
-
+
synchronized Session getSession(long sessionId) {
Session session = sessions.get(sessionId);
if (session != null)
session.lastAccessTime = System.currentTimeMillis();
return session;
}
-
+
Session removeSession(long sessionId) {
return removeSession(sessionId, false);
}
-
+
Session removeSession(long sessionId, boolean unreserve) {
Session session = null;
synchronized (this) {
@@ -460,14 +457,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (unreserve && session != null)
unreserveSession(session);
}
-
+
// do clean up out side of lock..
if (session != null)
session.cleanup();
-
+
return session;
}
-
+
private void sweep(long maxIdle) {
ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
synchronized (this) {
@@ -481,13 +478,13 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
// do clean up outside of lock
for (Session session : sessionsToCleanup) {
session.cleanup();
}
}
-
+
synchronized void removeIfNotAccessed(final long sessionId, long delay) {
Session session = sessions.get(sessionId);
if (session != null) {
@@ -503,26 +500,26 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
sessionToCleanup = session2;
}
}
-
+
// call clean up outside of lock
if (sessionToCleanup != null)
sessionToCleanup.cleanup();
}
};
-
+
SimpleTimer.getInstance().schedule(r, delay);
}
}
-
+
public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
for (Entry<Long,Session> entry : sessions.entrySet()) {
-
+
Session session = entry.getValue();
@SuppressWarnings("rawtypes")
ScanTask nbt = null;
String tableID = null;
-
+
if (session instanceof ScanSession) {
ScanSession ss = (ScanSession) session;
nbt = ss.nextBatchTask;
@@ -532,40 +529,40 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
nbt = mss.lookupTask;
tableID = mss.threadPoolExtent.getTableId().toString();
}
-
+
if (nbt == null)
continue;
-
+
ScanRunState srs = nbt.getScanRunState();
-
+
if (nbt == null || srs == ScanRunState.FINISHED)
continue;
-
+
MapCounter<ScanRunState> stateCounts = counts.get(tableID);
if (stateCounts == null) {
stateCounts = new MapCounter<ScanRunState>();
counts.put(tableID, stateCounts);
}
-
+
stateCounts.increment(srs, 1);
}
-
+
return counts;
}
-
+
public synchronized List<ActiveScan> getActiveScans() {
-
+
ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
-
+
long ct = System.currentTimeMillis();
-
+
for (Entry<Long,Session> entry : sessions.entrySet()) {
Session session = entry.getValue();
if (session instanceof ScanSession) {
ScanSession ss = (ScanSession) session;
-
+
ScanState state = ScanState.RUNNING;
-
+
ScanTask<ScanBatch> nbt = ss.nextBatchTask;
if (nbt == null) {
state = ScanState.IDLE;
@@ -583,15 +580,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
break;
}
}
-
+
activeScans.add(new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translator.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
-
+
} else if (session instanceof MultiScanSession) {
MultiScanSession mss = (MultiScanSession) session;
-
+
ScanState state = ScanState.RUNNING;
-
+
ScanTask<MultiScanResult> nbt = mss.lookupTask;
if (nbt == null) {
state = ScanState.IDLE;
@@ -609,43 +606,43 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
break;
}
}
-
+
activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translator.CT), mss.ssiList, mss.ssio, mss.auths
.getAuthorizationsBB()));
}
}
-
+
return activeScans;
}
}
-
+
static class TservConstraintEnv implements Environment {
-
+
private TCredentials credentials;
private SecurityOperation security;
private Authorizations auths;
private KeyExtent ke;
-
+
TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
this.security = secOp;
this.credentials = credentials;
}
-
+
void setExtent(KeyExtent ke) {
this.ke = ke;
}
-
+
@Override
public KeyExtent getExtent() {
return ke;
}
-
+
@Override
public String getUser() {
return credentials.getPrincipal();
}
-
+
@Override
public Authorizations getAuthorizations() {
if (auths == null)
@@ -656,119 +653,119 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return auths;
}
-
+
}
-
+
private abstract class ScanTask<T> implements RunnableFuture<T> {
-
+
protected AtomicBoolean interruptFlag;
protected ArrayBlockingQueue<Object> resultQueue;
protected AtomicInteger state;
protected AtomicReference<ScanRunState> runState;
-
+
private static final int INITIAL = 1;
private static final int ADDED = 2;
private static final int CANCELED = 3;
-
+
ScanTask() {
interruptFlag = new AtomicBoolean(false);
runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
state = new AtomicInteger(INITIAL);
resultQueue = new ArrayBlockingQueue<Object>(1);
}
-
+
protected void addResult(Object o) {
if (state.compareAndSet(INITIAL, ADDED))
resultQueue.add(o);
else if (state.get() == ADDED)
throw new IllegalStateException("Tried to add more than one result");
}
-
+
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (!mayInterruptIfRunning)
throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
-
+
if (state.get() == CANCELED)
return true;
-
+
if (state.compareAndSet(INITIAL, CANCELED)) {
interruptFlag.set(true);
resultQueue = null;
return true;
}
-
+
return false;
}
-
+
@Override
public T get() throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException();
}
-
+
@SuppressWarnings("unchecked")
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-
+
ArrayBlockingQueue<Object> localRQ = resultQueue;
-
+
if (state.get() == CANCELED)
throw new CancellationException();
-
+
if (localRQ == null && state.get() == ADDED)
throw new IllegalStateException("Tried to get result twice");
-
+
Object r = localRQ.poll(timeout, unit);
-
+
// could have been canceled while waiting
if (state.get() == CANCELED) {
if (r != null)
throw new IllegalStateException("Nothing should have been added when in canceled state");
-
+
throw new CancellationException();
}
-
+
if (r == null)
throw new TimeoutException();
-
+
// make this method stop working now that something is being
// returned
resultQueue = null;
-
+
if (r instanceof Throwable)
throw new ExecutionException((Throwable) r);
-
+
return (T) r;
}
-
+
@Override
public boolean isCancelled() {
return state.get() == CANCELED;
}
-
+
@Override
public boolean isDone() {
return runState.get().equals(ScanRunState.FINISHED);
}
-
+
public ScanRunState getScanRunState() {
return runState.get();
}
-
+
}
-
+
private static class ConditionalSession extends Session {
public TCredentials credentials;
public Authorizations auths;
public String tableId;
public AtomicBoolean interruptFlag;
-
+
@Override
public void cleanup() {
interruptFlag.set(true);
}
}
-
+
private static class UpdateSession extends Session {
public Tablet currentTablet;
public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
@@ -786,7 +783,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
public long queuedMutationSize = 0;
TservConstraintEnv cenv = null;
}
-
+
private static class ScanSession extends Session {
public KeyExtent extent;
public HashSet<Column> columnSet;
@@ -800,7 +797,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
public AtomicBoolean interruptFlag;
public Scanner scanner;
public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
-
+
@Override
public void cleanup() {
try {
@@ -811,32 +808,32 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
scanner.close();
}
}
-
+
}
-
+
private static class MultiScanSession extends Session {
HashSet<Column> columnSet;
Map<KeyExtent,List<Range>> queries;
public List<IterInfo> ssiList;
public Map<String,Map<String,String>> ssio;
public Authorizations auths;
-
+
// stats
int numRanges;
int numTablets;
int numEntries;
long totalLookupTime;
-
+
public volatile ScanTask<MultiScanResult> lookupTask;
public KeyExtent threadPoolExtent;
-
+
@Override
public void cleanup() {
if (lookupTask != null)
lookupTask.cancel(true);
}
}
-
+
/**
* This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
* are monotonically increasing.
@@ -845,38 +842,38 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
static class WriteTracker {
private static AtomicLong operationCounter = new AtomicLong(1);
private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
-
+
WriteTracker() {
for (TabletType ttype : TabletType.values()) {
inProgressWrites.put(ttype, new TreeSet<Long>());
}
}
-
+
synchronized long startWrite(TabletType ttype) {
long operationId = operationCounter.getAndIncrement();
inProgressWrites.get(ttype).add(operationId);
return operationId;
}
-
+
synchronized void finishWrite(long operationId) {
if (operationId == -1)
return;
-
+
boolean removed = false;
-
+
for (TabletType ttype : TabletType.values()) {
removed = inProgressWrites.get(ttype).remove(operationId);
if (removed)
break;
}
-
+
if (!removed) {
throw new IllegalArgumentException("Attempted to finish write not in progress, operationId " + operationId);
}
-
+
this.notifyAll();
}
-
+
synchronized void waitForWrites(TabletType ttype) {
long operationId = operationCounter.getAndIncrement();
while (inProgressWrites.get(ttype).floor(operationId) != null) {
@@ -887,40 +884,40 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
public long startWrite(Set<Tablet> keySet) {
if (keySet.size() == 0)
return -1;
-
+
ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
-
+
for (Tablet tablet : keySet)
extents.add(tablet.getExtent());
-
+
return startWrite(TabletType.type(extents));
}
}
-
+
public AccumuloConfiguration getSystemConfiguration() {
return serverConfig.getConfiguration();
}
-
+
TransactionWatcher watcher = new TransactionWatcher();
-
+
private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
-
+
SessionManager sessionManager;
-
+
AccumuloConfiguration acuConf = getSystemConfiguration();
-
+
TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics();
-
+
TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics();
-
+
WriteTracker writeTracker = new WriteTracker();
-
+
private RowLocks rowLocks = new RowLocks();
-
+
ThriftClientHandler() {
super(instance, watcher);
log.debug(ThriftClientHandler.class.getName() + " created");
@@ -933,16 +930,16 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error("Exception registering MBean with MBean Server", e);
}
}
-
+
@Override
public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
throws ThriftSecurityException {
-
+
if (!security.canPerformSystemActions(credentials))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
-
+
for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
TKeyExtent tke = entry.getKey();
Map<String,MapFileInfo> fileMap = entry.getValue();
@@ -953,9 +950,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
path = ns.makeQualified(path);
fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue());
}
-
+
Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
-
+
if (importTablet == null) {
failures.add(tke);
} else {
@@ -969,46 +966,46 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return failures;
}
-
+
private class NextBatchTask extends ScanTask<ScanBatch> {
-
+
private long scanID;
-
+
NextBatchTask(long scanID, AtomicBoolean interruptFlag) {
this.scanID = scanID;
this.interruptFlag = interruptFlag;
-
+
if (interruptFlag.get())
cancel(true);
}
-
+
@Override
public void run() {
-
+
final ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
String oldThreadName = Thread.currentThread().getName();
-
+
try {
if (isCancelled() || scanSession == null)
return;
-
+
runState.set(ScanRunState.RUNNING);
-
+
Thread.currentThread().setName(
"User: " + scanSession.user + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent);
-
+
Tablet tablet = onlineTablets.get(scanSession.extent);
-
+
if (tablet == null) {
addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
return;
}
-
+
long t1 = System.currentTimeMillis();
ScanBatch batch = scanSession.scanner.read();
long t2 = System.currentTimeMillis();
scanSession.nbTimes.addStat(t2 - t1);
-
+
// there should only be one thing on the queue at a time, so
// it should be ok to call add()
// instead of put()... if add() fails because queue is at
@@ -1031,53 +1028,53 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
runState.set(ScanRunState.FINISHED);
Thread.currentThread().setName(oldThreadName);
}
-
+
}
}
-
+
private class LookupTask extends ScanTask<MultiScanResult> {
-
+
private long scanID;
-
+
LookupTask(long scanID) {
this.scanID = scanID;
}
-
+
@Override
public void run() {
MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID);
String oldThreadName = Thread.currentThread().getName();
-
+
try {
if (isCancelled() || session == null)
return;
-
+
TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString());
long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
-
+
runState.set(ScanRunState.RUNNING);
Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
-
+
long bytesAdded = 0;
long maxScanTime = 4000;
-
+
long startTime = System.currentTimeMillis();
-
+
ArrayList<KVEntry> results = new ArrayList<KVEntry>();
Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
ArrayList<KeyExtent> fullScans = new ArrayList<KeyExtent>();
KeyExtent partScan = null;
Key partNextKey = null;
boolean partNextKeyInclusive = false;
-
+
Iterator<Entry<KeyExtent,List<Range>>> iter = session.queries.entrySet().iterator();
-
+
// check the time so that the read ahead thread is not monopolized
while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) {
Entry<KeyExtent,List<Range>> entry = iter.next();
-
+
iter.remove();
-
+
// check that tablet server is serving requested tablet
Tablet tablet = onlineTablets.get(entry.getKey());
if (tablet == null) {
@@ -1086,32 +1083,32 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
Thread.currentThread().setName(
"Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString());
-
+
LookupResult lookupResult;
try {
-
+
// do the following check to avoid a race condition
// between setting false below and the task being
// canceled
if (isCancelled())
interruptFlag.set(true);
-
+
lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList,
session.ssio, interruptFlag);
-
+
// if the tablet was closed it it possible that the
// interrupt flag was set.... do not want it set for
// the next
// lookup
interruptFlag.set(false);
-
+
} catch (IOException e) {
log.warn("lookup failed for tablet " + entry.getKey(), e);
throw new RuntimeException(e);
}
-
+
bytesAdded += lookupResult.bytesAdded;
-
+
if (lookupResult.unfinishedRanges.size() > 0) {
if (lookupResult.closed) {
failures.put(entry.getKey(), lookupResult.unfinishedRanges);
@@ -1125,11 +1122,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
fullScans.add(entry.getKey());
}
}
-
+
long finishTime = System.currentTimeMillis();
session.totalLookupTime += (finishTime - startTime);
session.numEntries += results.size();
-
+
// convert everything to thrift before adding result
List<TKeyValue> retResults = new ArrayList<TKeyValue>();
for (KVEntry entry : results)
@@ -1158,7 +1155,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated,
@@ -1167,14 +1164,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
Authorizations userauths = null;
if (!security.canScan(credentials, new String(textent.getTable()), range, columns, ssiList, ssio, authorizations))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
userauths = security.getUserAuthorizations(credentials);
for (ByteBuffer auth : authorizations)
if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
-
+
KeyExtent extent = new KeyExtent(textent);
-
+
// wait for any writes that are in flight.. this done to ensure
// consistency across client restarts... assume a client writes
// to accumulo and dies while waiting for a confirmation from
@@ -1187,11 +1184,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// !METADATA table
if (waitForWrites)
writeTracker.waitForWrites(TabletType.type(extent));
-
+
Tablet tablet = onlineTablets.get(extent);
if (tablet == null)
throw new NotServingTabletException(textent);
-
+
ScanSession scanSession = new ScanSession();
scanSession.user = credentials.getPrincipal();
scanSession.extent = new KeyExtent(extent);
@@ -1201,16 +1198,16 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
scanSession.auths = new Authorizations(authorizations);
scanSession.interruptFlag = new AtomicBoolean();
scanSession.readaheadThreshold = readaheadThreshold;
-
+
for (TColumn tcolumn : columns) {
scanSession.columnSet.add(new Column(tcolumn));
}
-
+
scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated,
scanSession.interruptFlag);
-
+
long sid = sessionManager.createSession(scanSession, true);
-
+
ScanResult scanResult;
try {
scanResult = continueScan(tinfo, sid, scanSession);
@@ -1220,10 +1217,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
sessionManager.unreserveSession(sid);
}
-
+
return new InitialScan(sid, scanResult);
}
-
+
@Override
public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException,
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
@@ -1231,22 +1228,22 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (scanSession == null) {
throw new NoSuchScanIDException();
}
-
+
try {
return continueScan(tinfo, scanID, scanSession);
} finally {
sessionManager.unreserveSession(scanSession);
}
}
-
+
private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException,
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
-
+
if (scanSession.nextBatchTask == null) {
scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
}
-
+
ScanBatch bresult;
try {
bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
@@ -1276,32 +1273,32 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.warn("Failed to get next batch", t);
throw new RuntimeException(t);
}
-
+
ScanResult scanResult = new ScanResult(Key.compress(bresult.results), bresult.more);
-
+
scanSession.entriesReturned += scanResult.results.size();
-
+
scanSession.batchCount++;
-
+
if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold) {
// start reading next batch while current batch is transmitted
// to client
scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
}
-
+
if (!scanResult.more)
closeScan(tinfo, scanID);
-
+
return scanResult;
}
-
+
@Override
public void closeScan(TInfo tinfo, long scanID) {
ScanSession ss = (ScanSession) sessionManager.removeSession(scanID);
if (ss != null) {
long t2 = System.currentTimeMillis();
-
+
log.debug(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", TServerUtils.clientAddress.get(), ss.extent.getTableId()
.toString(), ss.entriesReturned, (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
if (scanMetrics.isEnabled()) {
@@ -1310,7 +1307,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws ThriftSecurityException {
@@ -1319,29 +1316,30 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
for (TKeyExtent keyExtent : tbatch.keySet()) {
tables.add(new String(keyExtent.getTable()));
}
-
+
if (tables.size() != 1)
throw new IllegalArgumentException("Cannot batch scan over multiple tables");
-
+
// check if user has permission to the tables
Authorizations userauths = null;
for (String table : tables)
if (!security.canScan(credentials, table, tbatch, tcolumns, ssiList, ssio, authorizations))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
userauths = security.getUserAuthorizations(credentials);
for (ByteBuffer auth : authorizations)
if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
- Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, new TKeyExtentTranslator(), new Translator.ListTranslator<TRange,Range>(new TRangeTranslator()));
-
+ Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, new TKeyExtentTranslator(), new Translator.ListTranslator<TRange,Range>(
+ new TRangeTranslator()));
+
// This is used to determine which thread pool to use
KeyExtent threadPoolExtent = batch.keySet().iterator().next();
-
+
if (waitForWrites)
writeTracker.waitForWrites(TabletType.type(batch.keySet()));
-
+
MultiScanSession mss = new MultiScanSession();
mss.user = credentials.getPrincipal();
mss.queries = batch;
@@ -1349,19 +1347,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
mss.ssiList = ssiList;
mss.ssio = ssio;
mss.auths = new Authorizations(authorizations);
-
+
mss.numTablets = batch.size();
for (List<Range> ranges : batch.values()) {
mss.numRanges += ranges.size();
}
-
+
for (TColumn tcolumn : tcolumns)
mss.columnSet.add(new Column(tcolumn));
-
+
mss.threadPoolExtent = threadPoolExtent;
-
+
long sid = sessionManager.createSession(mss, true);
-
+
MultiScanResult result;
try {
result = continueMultiScan(tinfo, sid, mss);
@@ -1371,33 +1369,33 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
sessionManager.unreserveSession(sid);
}
-
+
return new InitialMultiScan(sid, result);
}
-
+
@Override
public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
-
+
MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID);
-
+
if (session == null) {
throw new NoSuchScanIDException();
}
-
+
try {
return continueMultiScan(tinfo, scanID, session);
} finally {
sessionManager.unreserveSession(session);
}
}
-
+
private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException {
-
+
if (session.lookupTask == null) {
session.lookupTask = new LookupTask(scanID);
resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
}
-
+
try {
MultiScanResult scanResult = session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
session.lookupTask = null;
@@ -1415,37 +1413,37 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
throw new RuntimeException(t);
}
}
-
+
@Override
public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID);
if (session == null) {
throw new NoSuchScanIDException();
}
-
+
long t2 = System.currentTimeMillis();
log.debug(String.format("MultiScanSess %s %,d entries in %.2f secs (lookup_time:%.2f secs tablets:%,d ranges:%,d) ", TServerUtils.clientAddress.get(),
session.numEntries, (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
}
-
+
@Override
public long startUpdate(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException {
// Make sure user is real
-
+
security.authenticateUser(credentials, credentials);
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
-
+
UpdateSession us = new UpdateSession();
us.violations = new Violations();
us.credentials = credentials;
us.cenv = new TservConstraintEnv(security, us.credentials);
-
+
long sid = sessionManager.createSession(us, false);
-
+
return sid;
}
-
+
private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
long t1 = System.currentTimeMillis();
if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent))
@@ -1454,7 +1452,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// if there were previous failures, then do not accept additional writes
return;
}
-
+
try {
// if user has no permission to write to this table, add it to
// the failures list
@@ -1493,18 +1491,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
return;
}
}
-
+
@Override
public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, List<TMutation> tmutations) {
UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID);
if (us == null) {
throw new RuntimeException("No Such SessionID");
}
-
+
try {
KeyExtent keyExtent = new KeyExtent(tkeyExtent);
setUpdateTablet(us, keyExtent);
-
+
if (us.currentTablet != null) {
List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
for (TMutation tmutation : tmutations) {
@@ -1519,34 +1517,34 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
sessionManager.unreserveSession(us);
}
}
-
+
private void flush(UpdateSession us) {
-
+
int mutationCount = 0;
Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
Throwable error = null;
-
+
long pt1 = System.currentTimeMillis();
-
+
boolean containsMetadataTablet = false;
for (Tablet tablet : us.queuedMutations.keySet())
if (tablet.getExtent().isMeta())
containsMetadataTablet = true;
-
+
if (!containsMetadataTablet && us.queuedMutations.size() > 0)
TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
-
+
Span prep = Trace.start("prep");
try {
for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
-
+
Tablet tablet = entry.getKey();
List<Mutation> mutations = entry.getValue();
if (mutations.size() > 0) {
try {
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size());
-
+
CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations);
if (commitSession == null) {
if (us.currentTablet == tablet) {
@@ -1557,12 +1555,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
sendables.put(commitSession, mutations);
mutationCount += mutations.size();
}
-
+
} catch (TConstraintViolationException e) {
us.violations.add(e.getViolations());
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0);
-
+
if (e.getNonViolators().size() > 0) {
// only log and commit mutations if there were some
// that did not
@@ -1571,9 +1569,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// expects
sendables.put(e.getCommitSession(), e.getNonViolators());
}
-
+
mutationCount += mutations.size();
-
+
} catch (HoldTimeoutException t) {
error = t;
log.debug("Giving up on mutations due to a long memory hold time");
@@ -1588,11 +1586,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
prep.stop();
}
-
+
long pt2 = System.currentTimeMillis();
us.prepareTimes.addStat(pt2 - pt1);
updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
-
+
if (error != null) {
for (Entry<CommitSession,List<Mutation>> e : sendables.entrySet()) {
e.getKey().abortCommit(e.getValue());
@@ -1605,9 +1603,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
while (true) {
try {
long t1 = System.currentTimeMillis();
-
+
logger.logManyTablets(sendables);
-
+
long t2 = System.currentTimeMillis();
us.walogTimes.addStat(t2 - t1);
updateWalogWriteTime((t2 - t1));
@@ -1624,18 +1622,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
wal.stop();
}
-
+
Span commit = Trace.start("commit");
try {
long t1 = System.currentTimeMillis();
for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
CommitSession commitSession = entry.getKey();
List<Mutation> mutations = entry.getValue();
-
+
commitSession.commit(mutations);
-
+
Tablet tablet = commitSession.getTablet();
-
+
if (tablet == us.currentTablet) {
// because constraint violations may filter out some
// mutations, for proper
@@ -1650,7 +1648,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
us.flushTime += (t2 - pt1);
us.commitTimes.addStat(t2 - t1);
-
+
updateAvgCommitTime(t2 - t1, sendables.size());
} finally {
commit.stop();
@@ -1669,7 +1667,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, time);
}
-
+
private void updateAvgCommitTime(long time, int size) {
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.commitTime, (long) ((time) / (double) size));
@@ -1679,25 +1677,25 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (long) ((time) / (double) size));
}
-
+
@Override
public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID);
if (us == null) {
throw new NoSuchScanIDException();
}
-
+
// clients may or may not see data from an update session while
// it is in progress, however when the update session is closed
// want to ensure that reads wait for the write to finish
long opid = writeTracker.startWrite(us.queuedMutations.keySet());
-
+
try {
flush(us);
} finally {
writeTracker.finishWrite(opid);
}
-
+
log.debug(String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", TServerUtils.clientAddress.get(), us.totalUpdates,
(System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(), us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0));
@@ -1714,15 +1712,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
KeyExtent first = us.authFailures.keySet().iterator().next();
log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(), first.toString()));
}
-
+
return new UpdateErrors(Translator.translate(us.failures, Translator.KET), Translator.translate(violations, Translator.CVST), Translator.translate(
us.authFailures, Translator.KET));
}
-
+
@Override
public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, TMutation tmutation) throws NotServingTabletException,
ConstraintViolationException, ThriftSecurityException {
-
+
if (!security.canWrite(credentials, new String(tkeyExtent.getTable())))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
KeyExtent keyExtent = new KeyExtent(tkeyExtent);
@@ -1730,16 +1728,16 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (tablet == null) {
throw new NotServingTabletException(tkeyExtent);
}
-
+
if (!keyExtent.isMeta())
TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
-
+
long opid = writeTracker.startWrite(TabletType.type(keyExtent));
-
+
try {
Mutation mutation = new ServerMutation(tmutation);
List<Mutation> mutations = Collections.singletonList(mutation);
-
+
Span prep = Trace.start("prep");
CommitSession cs;
try {
@@ -1750,7 +1748,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (cs == null) {
throw new NotServingTabletException(tkeyExtent);
}
-
+
while (true) {
try {
Span wal = Trace.start("wal");
@@ -1764,7 +1762,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.warn(ex, ex);
}
}
-
+
Span commit = Trace.start("commit");
try {
cs.commit(mutations);
@@ -1777,69 +1775,69 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
writeTracker.finishWrite(opid);
}
}
-
+
private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession cs,
List<String> symbols) throws IOException {
Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
-
+
CompressedIterators compressedIters = new CompressedIterators(symbols);
-
+
while (iter.hasNext()) {
Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
Tablet tablet = onlineTablets.get(entry.getKey());
-
+
if (tablet == null || tablet.isClosed()) {
for (ServerConditionalMutation scm : entry.getValue())
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
iter.remove();
} else {
List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size());
-
+
for (ServerConditionalMutation scm : entry.getValue()) {
if (checkCondition(results, cs, compressedIters, tablet, scm))
okMutations.add(scm);
}
-
+
entry.setValue(okMutations);
}
-
+
}
}
-
+
boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession cs, CompressedIterators compressedIters, Tablet tablet,
ServerConditionalMutation scm) throws IOException {
boolean add = true;
-
+
Set<Column> emptyCols = Collections.emptySet();
-
+
for (TCondition tc : scm.getConditions()) {
-
+
Range range;
if (tc.hasTimestamp)
range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
else
range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
-
+
IterConfig ic = compressedIters.decompress(tc.iterators);
-
+
Scanner scanner = tablet.createScanner(range, 1, emptyCols, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag);
-
+
try {
ScanBatch batch = scanner.read();
-
+
Value val = null;
-
+
for (KVEntry entry2 : batch.results) {
val = entry2.getValue();
break;
}
-
+
if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
add = false;
break;
}
-
+
} catch (TabletClosedException e) {
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
add = false;
@@ -1856,14 +1854,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return add;
}
-
+
private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession sess) {
Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
-
+
Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
-
+
boolean sessionCanceled = sess.interruptFlag.get();
-
+
Span prepSpan = Trace.start("prep");
try {
long t1 = System.currentTimeMillis();
@@ -1874,13 +1872,13 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
} else {
try {
-
+
@SuppressWarnings("unchecked")
List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
if (mutations.size() > 0) {
-
+
CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations);
-
+
if (cs == null) {
for (ServerConditionalMutation scm : entry.getValue())
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
@@ -1896,19 +1894,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
for (Mutation m : e.getNonViolators())
results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
}
-
+
for (Mutation m : e.getViolators())
results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED));
}
}
}
-
+
long t2 = System.currentTimeMillis();
updateAvgPrepTime(t2 - t1, es.size());
} finally {
prepSpan.stop();
}
-
+
Span walSpan = Trace.start("wal");
try {
while (true && sendables.size() > 0) {
@@ -1930,14 +1928,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
walSpan.stop();
}
-
+
Span commitSpan = Trace.start("commit");
try {
long t1 = System.currentTimeMillis();
for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
CommitSession commitSession = entry.getKey();
List<Mutation> mutations = entry.getValue();
-
+
commitSession.commit(mutations);
}
long t2 = System.currentTimeMillis();
@@ -1945,19 +1943,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
commitSpan.stop();
}
-
+
}
-
+
private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs, Map<KeyExtent,List<ServerConditionalMutation>> updates,
ArrayList<TCMResult> results, List<String> symbols) throws IOException {
// sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows.
ConditionalMutationSet.sortConditionalMutations(updates);
-
+
Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<KeyExtent,List<ServerConditionalMutation>>();
-
+
// can not process two mutations for the same row, because one will not see what the other writes
ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
-
+
// get as many locks as possible w/o blocking... defer any rows that are locked
List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
try {
@@ -1967,7 +1965,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} finally {
checkSpan.stop();
}
-
+
Span updateSpan = Trace.start("apply conditional mutations");
try {
writeConditionalMutations(updates, results, cs);
@@ -1979,61 +1977,61 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return deferred;
}
-
+
@Override
public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID)
throws ThriftSecurityException, TException {
-
+
Authorizations userauths = null;
if (!security.canConditionallyUpdate(credentials, tableID, authorizations))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
userauths = security.getUserAuthorizations(credentials);
for (ByteBuffer auth : authorizations)
if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
-
+
ConditionalSession cs = new ConditionalSession();
cs.auths = new Authorizations(authorizations);
cs.credentials = credentials;
cs.tableId = tableID;
cs.interruptFlag = new AtomicBoolean();
-
+
long sid = sessionManager.createSession(cs, false);
return new TConditionalSession(sid, lockID, sessionManager.getMaxIdleTime());
}
-
+
@Override
public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
throws NoSuchScanIDException, TException {
-
+
ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
-
+
if (cs == null || cs.interruptFlag.get())
throw new NoSuchScanIDException();
-
+
if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID))
TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
Text tid = new Text(cs.tableId);
long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
-
+
try {
Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
-
+
for (KeyExtent ke : updates.keySet())
if (!ke.getTableId().equals(tid))
throw new IllegalArgumentException("Unexpected table id " + tid + " != " + ke.getTableId());
-
+
ArrayList<TCMResult> results = new ArrayList<TCMResult>();
-
+
Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs, updates, results, symbols);
-
+
while (deferred.size() > 0) {
deferred = conditionalUpdate(cs, deferred, results, symbols);
}
-
+
return results;
} catch (IOException ioe) {
throw new TException(ioe);
@@ -2042,41 +2040,41 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
sessionManager.unreserveSession(sessID);
}
}
-
+
@Override
public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
// this method should wait for any running conditional update to complete
// after this method returns a conditional update should not be able to start
-
+
ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID);
if (cs != null)
cs.interruptFlag.set(true);
-
+
cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
if (cs != null)
sessionManager.removeSession(sessID, true);
}
-
+
@Override
public void closeConditionalUpdate(TInfo tinfo, long sessID) throws TException {
sessionManager.removeSession(sessID, false);
}
-
+
@Override
public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException,
ThriftSecurityException {
-
+
String tableId = new String(ByteBufferUtil.toBytes(tkeyExtent.table));
if (!security.canSplitTablet(credentials, tableId))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
KeyExtent keyExtent = new KeyExtent(tkeyExtent);
-
+
Tablet tablet = onlineTablets.get(keyExtent);
if (tablet == null) {
throw new NotServingTabletException(tkeyExtent);
}
-
+
if (keyExtent.getEndRow() == null || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
try {
if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
@@ -2088,12 +2086,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
return getStats(sessionManager.getActiveScansPerTable());
}
-
+
@Override
public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException {
TreeMap<KeyExtent,Tablet> onlineTabletsCopy;
@@ -2118,9 +2116,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return result;
}
-
+
private ZooCache masterLockCache = new ZooCache();
-
+
private void checkPermission(TCredentials credentials, String lock, final String request) throws ThriftSecurityException {
boolean fatal = false;
try {
@@ -2146,12 +2144,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
});
}
}
-
+
if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) {
log.warn("Got " + request + " message from master before lock acquired, ignoring...");
throw new RuntimeException("Lock not acquired");
}
-
+
if (tabletServerLock != null && tabletServerLock.wasLockAcquired() && !tabletServerLock.isLocked()) {
Halt.halt(1, new Runnable() {
@Override
@@ -2161,10 +2159,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
});
}
-
+
if (lock != null) {
ZooUtil.LockID lid = new ZooUtil.LockID(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK, lock);
-
+
try {
if (!ZooLock.isLockHeld(masterLockCache, lid)) {
// maybe the cache is out of date and a new master holds the
@@ -2180,38 +2178,38 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, final TKeyExtent textent) {
-
+
try {
checkPermission(credentials, lock, "loadTablet");
} catch (ThriftSecurityException e) {
log.error(e, e);
throw new RuntimeException(e);
}
-
+
final KeyExtent extent = new KeyExtent(textent);
-
+
synchronized (unopenedTablets) {
synchronized (openingTablets) {
synchronized (onlineTablets) {
-
+
// checking if this exact tablet is in any of the sets
// below is not a strong enough check
// when splits and fix splits occurring
-
+
Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
Set<KeyExtent> onlineOverlapping = KeyExtent.findOverlapping(extent, onlineTablets);
-
+
Set<KeyExtent> all = new HashSet<KeyExtent>();
all.addAll(unopenedOverlapping);
all.addAll(openingOverlapping);
all.addAll(onlineOverlapping);
-
+
if (!all.isEmpty()) {
-
+
// ignore any tablets that have recently split, for error logging
for (KeyExtent e2 : onlineOverlapping) {
Tablet tablet = onlineTablets.get(e2);
@@ -2219,25 +2217,25 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
all.remove(e2);
}
}
-
+
// ignore self, for error logging
all.remove(extent);
-
+
if (all.size() > 0) {
log.error("Tablet " + extent + " overlaps previously assigned " + unopenedOverlapping + " " + openingOverlapping + " " + onlineOverlapping
+ " " + all);
}
return;
}
-
+
unopenedTablets.add(extent);
}
}
}
-
+
// add the assignment job to the appropriate queue
log.info("Loading tablet " + extent);
-
+
final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent));
// Root tablet assignment must take place immediately
if (extent.isRootTablet()) {
@@ -2250,7 +2248,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} else {
log.info("Root tablet failed to load");
}
-
+
}
}.start();
} else {
@@ -2261,7 +2259,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent, boolean save) {
try {
@@ -2270,12 +2268,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
KeyExtent extent = new KeyExtent(textent);
-
+
resourceManager.addMigration(extent, new LoggingRunnable(log, new UnloadTabletHandler(extent, save)));
}
-
+
@Override
public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) {
try {
@@ -2284,19 +2282,19 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
ArrayList<Tablet> tabletsToFlush = new ArrayList<Tablet>();
-
+
KeyExtent ke = new KeyExtent(new Text(tableId), ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow));
-
+
synchronized (onlineTablets) {
for (Tablet tablet : onlineTablets.values())
if (ke.overlaps(tablet.getExtent()))
tabletsToFlush.add(tablet);
}
-
+
Long flushID = null;
-
+
for (Tablet tablet : tabletsToFlush) {
if (flushID == null) {
// read the flush id once from zookeeper instead of reading
@@ -2312,7 +2310,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
tablet.flush(flushID);
}
}
-
+
@Override
public void flushTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) throws TException {
try {
@@ -2321,7 +2319,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
Tablet tablet = onlineTablets.get(new KeyExtent(textent));
if (tablet != null) {
log.info("Flushing " + tablet.getExtent());
@@ -2332,12 +2330,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public void halt(TInfo tinfo, TCredentials credentials, String lock) throws ThriftSecurityException {
-
+
checkPermission(credentials, lock, "halt");
-
+
Halt.halt(0, new Runnable() {
@Override
public void run() {
@@ -2352,7 +2350,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
});
}
-
+
@Override
public void fastHalt(TInfo info, TCredentials credentials, String lock) {
try {
@@ -2361,12 +2359,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.warn("Error halting", e);
}
}
-
+
@Override
public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
return statsKeeper.getTabletStats();
}
-
+
@Override
public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
try {
@@ -2375,10 +2373,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
return sessionManager.getActiveScans();
}
-
+
@Override
public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) throws TException {
try {
@@ -2387,15 +2385,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
KeyExtent ke = new KeyExtent(textent);
-
+
Tablet tablet = onlineTablets.get(ke);
if (tablet != null) {
tablet.chopFiles();
}
}
-
+
@Override
public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException {
try {
@@ -2404,18 +2402,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
KeyExtent ke = new KeyExtent(new Text(tableId), ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow));
-
+
ArrayList<Tablet> tabletsToCompact = new ArrayList<Tablet>();
synchronized (onlineTablets) {
for (Tablet tablet : onlineTablets.values())
if (ke.overlaps(tablet.getExtent()))
tabletsToCompact.add(tablet);
}
-
+
Long compactionId = null;
-
+
for (Tablet tablet : tabletsToCompact) {
// all for the same table id, so only need to read
// compaction id once
@@ -2428,9 +2426,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
tablet.compactAll(compactionId);
}
-
+
}
-
+
@Override
public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
String myname = getClientAddressString();
@@ -2455,7 +2453,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
try {
Path source = new Path(filename);
if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
@@ -2485,7 +2483,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
@Override
public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
try {
@@ -2494,25 +2492,25 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error(e, e);
throw new RuntimeException(e);
}
-
+
List<CompactionInfo> compactions = Compactor.getRunningCompactions();
List<ActiveCompaction> ret = new ArrayList<ActiveCompaction>(compactions.size());
-
+
for (CompactionInfo compactionInfo : compactions) {
ret.add(compactionInfo.toThrift());
}
-
+
return ret;
}
}
-
+
private class SplitRunner implements Runnable {
private Tablet tablet;
-
+
public SplitRunner(Tablet tablet) {
this.tablet = tablet;
}
-
+
@Override
public void run() {
if (majorCompactorDisabled) {
@@ -2520,21 +2518,21 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// initiated exit
return;
}
-
+
splitTablet(tablet);
}
}
-
+
boolean isMajorCompactionDisabled() {
return majorCompactorDisabled;
}
-
+
void executeSplit(Tablet tablet) {
resourceManager.executeSplit(tablet.getExtent(), new LoggingRunnable(log, new SplitRunner(tablet)));
}
-
+
private class MajorCompactor implements Runnable {
-
+
public MajorCompactor(AccumuloConfiguration config) {
CompactionWatcher.startWatching(config);
}
@@ -2544,40 +2542,40 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
while (!majorCompactorDisabled) {
try {
UtilWaitThread.sleep(getSystemConfiguration().getTimeInMillis(Property.TSERV_MAJC_DELAY));
-
+
TreeMap<KeyExtent,Tablet> copyOnlineTablets = new TreeMap<KeyExtent,Tablet>();
-
+
synchronized (onlineTablets) {
copyOnlineTablets.putAll(onlineTablets); // avoid
// concurrent
// modification
}
-
+
int numMajorCompactionsInProgress = 0;
-
+
Iterator<Entry<KeyExtent,Tablet>> iter = copyOnlineTablets.entrySet().iterator();
-
+
// bail early now if we're shutting down
while (iter.hasNext() && !majorCompactorDisabled) {
-
+
Entry<KeyExtent,Tablet> entry = iter.next();
-
+
Tablet tablet = entry.getValue();
-
+
// if we need to split AND compact, we need a good way
// to decide what to do
if (tablet.needsSplit()) {
executeSplit(tablet);
continue;
}
-
+
int maxLogEntriesPerTablet = getTableConfiguration(tablet.getExtent()).getCount(Property.TABLE_MINC_LOGS_MAX);
-
+
if (tablet.getLogCount() >= maxLogEntriesPerTablet) {
log.debug("Initiating minor compaction for " + tablet.getExtent() + " because it has " + tablet.getLogCount() + " write ahead logs");
tablet.initiateMinorCompaction(MinorCompactionReason.SYSTEM);
}
-
+
synchronized (tablet) {
if (tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL) || tablet.majorCompactionQueued() || tablet.majorCompactionRunning()) {
numMajorCompactionsInProgress++;
@@ -2585,18 +2583,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
int idleCompactionsToStart = Math.max(1, getSystemConfiguration().getCount(Property.TSERV_MAJC_MAXCONCURRENT) / 2);
-
+
if (numMajorCompactionsInProgress < idleCompactionsToStart) {
// system is not major compacting, can schedule some
// idle compactions
iter = copyOnlineTablets.entrySet().iterator();
-
+
while (iter.hasNext() && !majorCompactorDisabled && numMajorCompactionsInProgress < idleCompactionsToStart) {
Entry<KeyExtent,Tablet> entry = iter.next();
Tablet tablet = entry.getValue();
-
+
if (tablet.initiateMajorCompaction(MajorCompactionReason.IDLE)) {
numMajorCompactionsInProgress++;
}
@@ -2609,10 +2607,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
private void splitTablet(Tablet tablet) {
try {
-
+
TreeMap<KeyExtent,SplitInfo> tabletInfo = splitTablet(tablet, null);
if (tabletInfo == null) {
// either split or compact not both
@@ -2628,34 +2626,34 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.error("Unknown error on split: " + e, e);
}
}
-
+
private TreeMap<KeyExtent,SplitInfo> splitTablet(Tablet tablet, byte[] splitPoint) throws IOException {
long t1 = System.currentTimeMillis();
-
+
TreeMap<KeyExtent,SplitInfo> tabletInfo = tablet.split(splitPoint);
if (tabletInfo == null) {
return null;
}
-
+
log.info("Starting split: " + tablet.getExtent());
statsKeeper.incrementStatusSplit();
long start = System.currentTimeMillis();
-
+
Tablet[] newTablets = new Tablet[2];
-
+
Entry<KeyExtent,SplitInfo> first = tabletInfo.firstEntry();
newTablets[0] = new Tablet(TabletServer.this, new Text(first.getValue().dir), first.getKey(), resourceManager.createTabletResourceManager(),
first.getValue().datafiles, first.getValue().time, first.getValue().initFlushID, first.getValue().initCompactID);
-
+
Entry<KeyExtent,SplitInfo> last = tabletInfo.lastEntry();
newTablets[1] = new Tablet(TabletServer.this, new Text(last.getValue().dir), last.getKey(), resourceManager.createTabletResourceManager(),
last.getValue().datafiles, last.getValue().time, last.getValue().initFlushID, last.getValue().initCompactID);
-
+
// roll tablet stats over into tablet server's statsKeeper object as
// historical data
statsKeeper.saveMinorTimes(tablet.timer);
statsKeeper.saveMajorTimes(tablet.timer);
-
+
// lose the reference to the old tablet and open two new ones
synchronized (onlineTablets) {
onlineTablets.remove(tablet.getExtent());
@@ -2665,40 +2663,40 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
// tell the master
enqueueMasterMessage(new SplitReportMessage(tablet.getExtent(), newTablets[0].getExtent(), new Text("/" + newTablets[0].getLocation().getName()),
newTablets[1].getExtent(), new Text("/" + newTablets[1].getLocation().getName())));
-
+
statsKeeper.updateTime(Operation.SPLIT, start, 0, false);
long t2 = System.currentTimeMillis();
log.info("Tablet split: " + tablet.getExtent() + " size0 " + newTablets[0].estimateTabletSize() + " size1 " + newTablets[1].estimateTabletSize() + " time "
+ (t2 - t1) + "ms");
-
+
return tabletInfo;
}
-
+
public long lastPingTime = System.currentTimeMillis();
public Socket currentMaster;
-
+
// a queue to hold messages that are to be sent back to the master
private BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>();
-
+
// add a message for the main thread to send back to the master
void enqueueMasterMessage(MasterMessage m) {
masterMessages.addLast(m);
}
-
+
private class UnloadTabletHandler implements Runnable {
private KeyExtent extent;
private boolean saveState;
-
+
public UnloadTabletHandler(KeyExtent extent, boolean saveState) {
this.extent = extent;
this.saveState = saveState;
}
-
+
@Override
public void run() {
-
+
Tablet t = null;
-
+
synchronized (unopenedTablets) {
if (unopenedTablets.contains(extent)) {
unopenedTablets.remove(extent);
@@ -2718,7 +2716,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
t = onlineTablets.get(extent);
}
}
-
+
if (t == null) {
// Tablet has probably been recently unloaded: repeated master
// unload request is crossing the successful unloaded message
@@ -2728,11 +2726,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return;
}
-
+
try {
t.close(saveState);
} catch (Throwable e) {
-
+
if ((t.isClosing() || t.isClosed()) && e instanceof IllegalStateException) {
log.debug("Failed to unload tablet " + extent + "... it was alread closing or closed : " + e.getMessage());
} else {
@@ -2741,12 +2739,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
return;
}
-
+
// stop serving tablet - client will get not serving tablet
// exceptions
recentlyUnloadedCache.put(extent, System.currentTimeMillis());
onlineTablets.remove(extent);
-
+
try {
TServerInstance instance = new TServerInstance(clientAddress, getLock().getSessionId());
TabletLocationState tls = null;
@@ -2764,37 +2762,37 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
} catch (InterruptedException e) {
log.warn("Interrupted while getting our zookeeper session information", e);
}
-
+
// tell the master how it went
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOADED, extent));
-
+
// roll tablet stats over into tablet server's statsKeeper object as
// historical data
statsKeeper.saveMinorTimes(t.timer);
statsKeeper.saveMajorTimes(t.timer);
-
+
log.info("unloaded " + extent);
-
+
}
}
-
+
private class AssignmentHandler implements Runnable {
private KeyExtent extent;
private int retryAttempt = 0;
-
+
public AssignmentHandler(KeyExtent extent) {
this.extent = extent;
}
-
+
public AssignmentHandler(KeyExtent extent, int retryAttempt) {
this(extent);
this.retryAttempt = retryAttempt;
}
-
+
@Override
public void run() {
log.info(clientAddress + ": got assignment from master: " + extent);
-
+
synchronized (unopenedTablets) {
synchronized (openingTablets) {
synchronized (onlineTablets) {
@@ -2803,23 +2801,23 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
Set<KeyExtent> onlineOverlapping = KeyExtent.findOverlapping(extent, onlineTablets);
-
+
if (openingOverlapping.contains(extent) || onlineOverlapping.contains(extent))
return;
-
+
if (!unopenedTablets.contains(extent) || unopenedOverlapping.size() != 1 || openingOverlapping.size() > 0 || onlineOverlapping.size() > 0) {
throw new IllegalStateException("overlaps assigned " + extent + " " + !unopenedTablets.contains(extent) + " " + unopenedOverlapping + " "
+ openingOverlapping + " " + onlineOverlapping);
}
}
-
+
unopenedTablets.remove(extent);
openingTablets.add(extent);
}
}
-
+
log.debug("Loading extent: " + extent);
-
+
// check Metadata table before accepting assignment
Text locationToOpen = null;
SortedMap<Key,Value> tabletsKeyValues = new TreeMap<Key,Value>();
@@ -2849,7 +2847,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
throw new RuntimeException(e);
}
-
+
if (locationToOpen == null) {
log.debug("Reporting tablet " + extent + " assignment failure: unable to verify Tablet Information");
synchronized (openingTablets) {
@@ -2859,13 +2857,13 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
return;
}
-
+
Tablet tablet = null;
boolean successful = false;
-
+
try {
TabletResourceManager trm = resourceManager.createTabletResourceManager();
-
+
// this opens the tablet file and fills in the endKey in the
// extent
tablet = new Tablet(TabletServer.this, locationToOpen, extent, trm, tabletsKeyValues);
@@ -2882,10 +2880,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (tablet.getNumEntriesInMemory() > 0 && !tablet.minorCompactNow(MinorCompactionReason.SYSTEM)) {
throw new RuntimeException("Minor compaction after recovery fails for " + extent);
}
-
+
Assignment assignment = new Assignment(extent, getTabletSession());
TabletStateStore.setLocation(assignment);
-
+
synchronized (openingTablets) {
synchronized (onlineTablets) {
openingTablets.remove(extent);
@@ -2903,7 +2901,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
String table = extent.getTableId().toString();
ProblemReports.getInstance().report(new ProblemReport(table, TABLET_LOAD, extent.getUUID().toString(), getClientAddressString(), e));
}
-
+
if (!successful) {
synchronized (unopenedTablets) {
synchronized (openingTablets) {
@@ -2937,50 +2935,50 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}
-
+
private VolumeManager fs;
private Instance instance;
-
+
private final SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
private final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
private final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
@SuppressWarnings("unchecked")
private final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap(1000));
-
+
private Thread majorCompactorThread;
-
+
// used for stopping the server and MasterListener thread
private volatile boolean serverStopRequested = false;
-
+
private HostAndPort clientAddress;
-
+
private TabletServerResourceManager resourceManager;
private SecurityOperation security;
private volatile boolean majorCompactorDisabled = false;
-
+
private volatile boolean shutdownComplete = false;
-
+
private ZooLock tabletServerLock;
-
+
private TServer server;
-
+
private DistributedWorkQueue bulkFailedCopyQ;
-
+
private String lockID;
-
+
private static final String METRICS_PREFIX = "tserver";
-
+
private static ObjectName OBJECT_NAME = null;
-
+
static AtomicLong seekCount = new AtomicLong(0);
-
+
public TabletStatsKeeper getStatsKeeper() {
return statsKeeper;
}
-
+
public void addLoggersToMetadata(List<DfsLogger> logs, KeyExtent extent, int id) {
log.info("Adding " + logs.size() + " logs for extent " + extent + " as alias " + id);
-
+
long now = RelativeTime.currentTimeMillis();
List<String> logSet = new ArrayList<String>();
for (DfsLogger log : logs)
@@ -2994,15 +2992,16 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
entry.logSet = logSet;
MetadataTableUtil.addLogEntry(SystemCredentials.get(), entry, getLock());
}
-
- private HostAndPort startServer(AccumuloConfiguration conf, String address, Property portHint, TProcessor processor, String threadName) throws UnknownHostException {
+
+ private HostAndPort startServer(AccumuloConfiguration conf, String address, Property portHint, TProcessor processor, String threadName)
+ throws UnknownHostException {
Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Prop
<TRUNCATED>
[3/4] ACCUMULO-391 Use more accurate "InputTableConfig" term
Posted by ct...@apache.org.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index 670a764..19d107b 100644
--- a/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++ b/server/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@ -44,7 +44,6 @@ import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.security.handler.Authenticator;
import org.apache.accumulo.server.security.handler.Authorizor;
import org.apache.accumulo.server.security.handler.PermissionHandler;
@@ -60,21 +59,21 @@ import org.apache.log4j.Logger;
*/
public class SecurityOperation {
private static final Logger log = Logger.getLogger(SecurityOperationsImpl.class);
-
+
protected Authorizor authorizor;
protected Authenticator authenticator;
protected PermissionHandler permHandle;
private static String rootUserName = null;
private final ZooCache zooCache;
private final String ZKUserPath;
-
+
protected static SecurityOperation instance;
-
+
public static synchronized SecurityOperation getInstance() {
String instanceId = HdfsZooInstance.getInstance().getInstanceID();
return getInstance(instanceId, false);
}
-
+
public static synchronized SecurityOperation getInstance(String instanceId, boolean initialize) {
if (instance == null) {
instance = new SecurityOperation(getAuthorizor(instanceId, initialize), getAuthenticator(instanceId, initialize), getPermHandler(instanceId, initialize),
@@ -82,48 +81,51 @@ public class SecurityOperation {
}
return instance;
}
-
+
protected static Authorizor getAuthorizor(String instanceId, boolean initialize) {
- Authorizor toRet = Property.createInstanceFromPropertyName(ServerConfiguration.getSiteConfiguration(), Property.INSTANCE_SECURITY_AUTHORIZOR, Authorizor.class, ZKAuthorizor.getInstance());
+ Authorizor toRet = Property.createInstanceFromPropertyName(ServerConfiguration.getSiteConfiguration(), Property.INSTANCE_SECURITY_AUTHORIZOR,
+ Authorizor.class, ZKAuthorizor.getInstance());
toRet.initialize(instanceId, initialize);
return toRet;
}
-
+
protected static Authenticator getAuthenticator(String instanceId, boolean initialize) {
- Authenticator toRet = Property.createInstanceFromPropertyName(ServerConfiguration.getSiteConfiguration(), Property.INSTANCE_SECURITY_AUTHENTICATOR, Authenticator.class, ZKAuthenticator.getInstance());
+ Authenticator toRet = Property.createInstanceFromPropertyName(ServerConfiguration.getSiteConfiguration(), Property.INSTANCE_SECURITY_AUTHENTICATOR,
+ Authenticator.class, ZKAuthenticator.getInstance());
toRet.initialize(instanceId, initialize);
return toRet;
}
-
+
protected static PermissionHandler getPermHandler(String instanceId, boolean initialize) {
- PermissionHandler toRet = Property.createInstanceFromPropertyName(ServerConfiguration.getSiteConfiguration(), Property.INSTANCE_SECURITY_PERMISSION_HANDLER, PermissionHandler.class, ZKPermHandler.getInstance());
+ PermissionHandler toRet = Property.createInstanceFromPropertyName(ServerConfiguration.getSiteConfiguration(),
+ Property.INSTANCE_SECURITY_PERMISSION_HANDLER, PermissionHandler.class, ZKPermHandler.getInstance());
toRet.initialize(instanceId, initialize);
return toRet;
}
-
+
protected SecurityOperation(String instanceId) {
ZKUserPath = Constants.ZROOT + "/" + instanceId + "/users";
zooCache = new ZooCache();
}
-
+
public SecurityOperation(Authorizor author, Authenticator authent, PermissionHandler pm, String instanceId) {
this(instanceId);
authorizor = author;
authenticator = authent;
permHandle = pm;
-
+
if (!authorizor.validSecurityHandlers(authenticator, pm) || !authenticator.validSecurityHandlers(authorizor, pm)
|| !permHandle.validSecurityHandlers(authent, author))
throw new RuntimeException(authorizor + ", " + authenticator + ", and " + pm
+ " do not play nice with eachother. Please choose authentication and authorization mechanisms that are compatible with one another.");
}
-
+
public void initializeSecurity(TCredentials credentials, String rootPrincipal, byte[] token) throws AccumuloSecurityException, ThriftSecurityException {
authenticate(credentials);
-
+
if (!isSystemUser(credentials))
throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
authenticator.initializeSecurity(credentials, rootPrincipal, token);
authorizor.initializeSecurity(credentials, rootPrincipal);
permHandle.initializeSecurity(credentials, rootPrincipal);
@@ -134,21 +136,21 @@ public class SecurityOperation {
throw new RuntimeException(e);
}
}
-
+
public synchronized String getRootUsername() {
if (rootUserName == null)
rootUserName = new String(zooCache.get(ZKUserPath));
return rootUserName;
}
-
+
public boolean isSystemUser(TCredentials credentials) {
return SystemCredentials.get().getToken().getClass().getName().equals(credentials.getTokenClassName());
}
-
+
private void authenticate(TCredentials credentials) throws ThriftSecurityException {
if (!credentials.getInstanceId().equals(HdfsZooInstance.getInstance().getInstanceID()))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.INVALID_INSTANCEID);
-
+
if (isSystemUser(credentials)) {
authenticateSystemUser(credentials);
} else {
@@ -163,19 +165,19 @@ public class SecurityOperation {
}
}
}
-
+
private void authenticateSystemUser(TCredentials credentials) throws ThriftSecurityException {
if (SystemCredentials.get().getToken().equals(credentials.getToken()))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_CREDENTIALS);
}
-
+
public boolean canAskAboutUser(TCredentials credentials, String user) throws ThriftSecurityException {
// Authentication done in canPerformSystemActions
if (!(canPerformSystemActions(credentials) || credentials.getPrincipal().equals(user)))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
return true;
}
-
+
public boolean authenticateUser(TCredentials credentials, TCredentials toAuth) throws ThriftSecurityException {
canAskAboutUser(credentials, toAuth.getPrincipal());
// User is already authenticated from canAskAboutUser
@@ -188,7 +190,7 @@ public class SecurityOperation {
throw e.asThriftException();
}
}
-
+
private AuthenticationToken reassembleToken(TCredentials toAuth) throws AccumuloSecurityException {
String tokenClass = toAuth.getTokenClassName();
if (authenticator.validTokenClass(tokenClass)) {
@@ -196,22 +198,22 @@ public class SecurityOperation {
}
throw new AccumuloSecurityException(toAuth.getPrincipal(), SecurityErrorCode.INVALID_TOKEN);
}
-
+
public Authorizations getUserAuthorizations(TCredentials credentials, String user) throws ThriftSecurityException {
authenticate(credentials);
-
+
targetUserExists(user);
-
+
if (!credentials.getPrincipal().equals(user) && !hasSystemPermission(credentials, SystemPermission.SYSTEM, false))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
try {
return authorizor.getCachedUserAuthorizations(user);
} catch (AccumuloSecurityException e) {
throw e.asThriftException();
}
}
-
+
public Authorizations getUserAuthorizations(TCredentials credentials) throws ThriftSecurityException {
// system user doesn't need record-level authorizations for the tables it reads
if (isSystemUser(credentials)) {
@@ -220,7 +222,7 @@ public class SecurityOperation {
}
return getUserAuthorizations(credentials, credentials.getPrincipal());
}
-
+
/**
* Checks if a user has a system permission
*
@@ -231,7 +233,7 @@ public class SecurityOperation {
return true;
return _hasSystemPermission(credentials.getPrincipal(), permission, useCached);
}
-
+
/**
* Checks if a user has a system permission<br/>
* This cannot check if a system user has permission.
@@ -241,9 +243,9 @@ public class SecurityOperation {
private boolean _hasSystemPermission(String user, SystemPermission permission, boolean useCached) throws ThriftSecurityException {
if (user.equals(getRootUsername()))
return true;
-
+
targetUserExists(user);
-
+
try {
if (useCached)
return permHandle.hasCachedSystemPermission(user, permission);
@@ -252,7 +254,7 @@ public class SecurityOperation {
throw e.asThriftException();
}
}
-
+
/**
* Checks if a user has a table permission
*
@@ -263,7 +265,7 @@ public class SecurityOperation {
return true;
return _hasTablePermission(credentials.getPrincipal(), table, permission, useCached);
}
-
+
/**
* Checks if a user has a table permission<br/>
* This cannot check if a system user has permission.
@@ -272,10 +274,10 @@ public class SecurityOperation {
*/
protected boolean _hasTablePermission(String user, String table, TablePermission permission, boolean useCached) throws ThriftSecurityException {
targetUserExists(user);
-
+
if ((table.equals(MetadataTable.ID) || table.equals(RootTable.ID)) && permission.equals(TablePermission.READ))
return true;
-
+
try {
if (useCached)
return permHandle.hasCachedTablePermission(user, table, permission);
@@ -286,7 +288,7 @@ public class SecurityOperation {
throw new ThriftSecurityException(user, SecurityErrorCode.TABLE_DOESNT_EXIST);
}
}
-
+
// some people just aren't allowed to ask about other users; here are those who can ask
private boolean canAskAboutOtherUsers(TCredentials credentials, String user) throws ThriftSecurityException {
authenticate(credentials);
@@ -294,7 +296,7 @@ public class SecurityOperation {
|| hasSystemPermission(credentials, SystemPermission.CREATE_USER, false) || hasSystemPermission(credentials, SystemPermission.ALTER_USER, false)
|| hasSystemPermission(credentials, SystemPermission.DROP_USER, false);
}
-
+
private void targetUserExists(String user) throws ThriftSecurityException {
if (user.equals(getRootUsername()))
return;
@@ -305,40 +307,40 @@ public class SecurityOperation {
throw e.asThriftException();
}
}
-
+
public boolean canScan(TCredentials credentials, String table) throws ThriftSecurityException {
authenticate(credentials);
return hasTablePermission(credentials, table, TablePermission.READ, true);
}
-
+
public boolean canScan(TCredentials credentials, String table, TRange range, List<TColumn> columns, List<IterInfo> ssiList,
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations) throws ThriftSecurityException {
return canScan(credentials, table);
}
-
+
public boolean canScan(TCredentials credentials, String table, Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns, List<IterInfo> ssiList,
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations) throws ThriftSecurityException {
return canScan(credentials, table);
}
-
+
public boolean canWrite(TCredentials credentials, String table) throws ThriftSecurityException {
authenticate(credentials);
return hasTablePermission(credentials, table, TablePermission.WRITE, true);
}
-
+
public boolean canConditionallyUpdate(TCredentials credentials, String tableID, List<ByteBuffer> authorizations) throws ThriftSecurityException {
-
+
authenticate(credentials);
-
+
return hasTablePermission(credentials, tableID, TablePermission.WRITE, true) && hasTablePermission(credentials, tableID, TablePermission.READ, true);
}
-
+
public boolean canSplitTablet(TCredentials credentials, String table) throws ThriftSecurityException {
authenticate(credentials);
return hasSystemPermission(credentials, SystemPermission.ALTER_TABLE, false) || hasSystemPermission(credentials, SystemPermission.SYSTEM, false)
|| hasTablePermission(credentials, table, TablePermission.ALTER_TABLE, false);
}
-
+
/**
* This is the check to perform any system action. This includes tserver's loading of a tablet, shutting the system down, or altering system properties.
*/
@@ -346,95 +348,95 @@ public class SecurityOperation {
authenticate(credentials);
return hasSystemPermission(credentials, SystemPermission.SYSTEM, false);
}
-
+
public boolean canFlush(TCredentials c, String tableId) throws ThriftSecurityException {
authenticate(c);
return hasTablePermission(c, tableId, TablePermission.WRITE, false) || hasTablePermission(c, tableId, TablePermission.ALTER_TABLE, false);
}
-
+
public boolean canAlterTable(TCredentials c, String tableId) throws ThriftSecurityException {
authenticate(c);
return hasTablePermission(c, tableId, TablePermission.ALTER_TABLE, false) || hasSystemPermission(c, SystemPermission.ALTER_TABLE, false);
}
-
+
public boolean canCreateTable(TCredentials c, String tableName) throws ThriftSecurityException {
return canCreateTable(c);
}
-
+
public boolean canCreateTable(TCredentials c) throws ThriftSecurityException {
authenticate(c);
return hasSystemPermission(c, SystemPermission.CREATE_TABLE, false);
}
-
+
public boolean canRenameTable(TCredentials c, String tableId, String oldTableName, String newTableName) throws ThriftSecurityException {
authenticate(c);
return hasSystemPermission(c, SystemPermission.ALTER_TABLE, false) || hasTablePermission(c, tableId, TablePermission.ALTER_TABLE, false);
}
-
+
public boolean canCloneTable(TCredentials c, String tableId, String tableName) throws ThriftSecurityException {
authenticate(c);
return hasSystemPermission(c, SystemPermission.CREATE_TABLE, false) && hasTablePermission(c, tableId, TablePermission.READ, false);
}
-
+
public boolean canDeleteTable(TCredentials c, String tableId) throws ThriftSecurityException {
authenticate(c);
return hasSystemPermission(c, SystemPermission.DROP_TABLE, false) || hasTablePermission(c, tableId, TablePermission.DROP_TABLE, false);
}
-
+
public boolean canOnlineOfflineTable(TCredentials c, String tableId, TableOperation op) throws ThriftSecurityException {
authenticate(c);
return hasSystemPermission(c, SystemPermission.SYSTEM, false) || hasSystemPermission(c, SystemPermission.ALTER_TABLE, false)
|| hasTablePermission(c, tableId, TablePermission.ALTER_TABLE, false);
}
-
+
public boolean canMerge(TCredentials c, String tableId) throws ThriftSecurityException {
authenticate(c);
return hasSystemPermission(c, SystemPermission.SYSTEM, false) || hasSystemPermission(c, SystemPermission.ALTER_TABLE, false)
|| hasTablePermission(c, tableId, TablePermission.ALTER_TABLE, false);
}
-
+
public boolean canDeleteRange(TCredentials c, String tableId, String tableName, Text startRow, Text endRow) throws ThriftSecurityException {
authenticate(c);
return hasSystemPermission(c, SystemPermission.SYSTEM, false) || hasTablePermission(c, tableId, TablePermission.WRITE, false);
}
-
+
public boolean canBulkImport(TCredentials c, String tableId, String tableName, String dir, String failDir) throws ThriftSecurityException {
return canBulkImport(c, tableId);
}
-
+
public boolean canBulkImport(TCredentials c, String tableId) throws ThriftSecurityException {
authenticate(c);
return hasTablePermission(c, tableId, TablePermission.BULK_IMPORT, false);
}
-
+
public boolean canCompact(TCredentials c, String tableId) throws ThriftSecurityException {
authenticate(c);
return hasSystemPermission(c, SystemPermission.ALTER_TABLE, false) || hasTablePermission(c, tableId, TablePermission.ALTER_TABLE, false)
|| hasTablePermission(c, tableId, TablePermission.WRITE, false);
}
-
+
public boolean canChangeAuthorizations(TCredentials c, String user) throws ThriftSecurityException {
authenticate(c);
return hasSystemPermission(c, SystemPermission.ALTER_USER, false);
}
-
+
public boolean canChangePassword(TCredentials c, String user) throws ThriftSecurityException {
authenticate(c);
return c.getPrincipal().equals(user) || hasSystemPermission(c, SystemPermission.ALTER_USER, false);
}
-
+
public boolean canCreateUser(TCredentials c, String user) throws ThriftSecurityException {
authenticate(c);
return hasSystemPermission(c, SystemPermission.CREATE_USER, false);
}
-
+
public boolean canDropUser(TCredentials c, String user) throws ThriftSecurityException {
authenticate(c);
if (user.equals(getRootUsername()))
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
return hasSystemPermission(c, SystemPermission.DROP_USER, false);
}
-
+
public boolean canGrantSystem(TCredentials c, String user, SystemPermission sysPerm) throws ThriftSecurityException {
authenticate(c);
// can't grant GRANT
@@ -442,36 +444,36 @@ public class SecurityOperation {
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.GRANT_INVALID);
return hasSystemPermission(c, SystemPermission.GRANT, false);
}
-
+
public boolean canGrantTable(TCredentials c, String user, String table) throws ThriftSecurityException {
authenticate(c);
return hasSystemPermission(c, SystemPermission.ALTER_TABLE, false) || hasTablePermission(c, table, TablePermission.GRANT, false);
}
-
+
public boolean canRevokeSystem(TCredentials c, String user, SystemPermission sysPerm) throws ThriftSecurityException {
authenticate(c);
// can't modify root user
if (user.equals(getRootUsername()))
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
// can't revoke GRANT
if (sysPerm.equals(SystemPermission.GRANT))
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.GRANT_INVALID);
-
+
return hasSystemPermission(c, SystemPermission.GRANT, false);
}
-
+
public boolean canRevokeTable(TCredentials c, String user, String table) throws ThriftSecurityException {
authenticate(c);
return hasSystemPermission(c, SystemPermission.ALTER_TABLE, false) || hasTablePermission(c, table, TablePermission.GRANT, false);
}
-
+
public void changeAuthorizations(TCredentials credentials, String user, Authorizations authorizations) throws ThriftSecurityException {
if (!canChangeAuthorizations(credentials, user))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
targetUserExists(user);
-
+
try {
authorizor.changeAuthorizations(user, authorizations);
log.info("Changed authorizations for user " + user + " at the request of user " + credentials.getPrincipal());
@@ -479,7 +481,7 @@ public class SecurityOperation {
throw ase.asThriftException();
}
}
-
+
public void changePassword(TCredentials credentials, Credentials toChange) throws ThriftSecurityException {
if (!canChangePassword(credentials, toChange.getPrincipal()))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
@@ -491,7 +493,7 @@ public class SecurityOperation {
throw e.asThriftException();
}
}
-
+
public void createUser(TCredentials credentials, Credentials newUser, Authorizations authorizations) throws ThriftSecurityException {
if (!canCreateUser(credentials, newUser.getPrincipal()))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
@@ -507,7 +509,7 @@ public class SecurityOperation {
throw ase.asThriftException();
}
}
-
+
public void dropUser(TCredentials credentials, String user) throws ThriftSecurityException {
if (!canDropUser(credentials, user))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
@@ -520,13 +522,13 @@ public class SecurityOperation {
throw e.asThriftException();
}
}
-
+
public void grantSystemPermission(TCredentials credentials, String user, SystemPermission permissionById) throws ThriftSecurityException {
if (!canGrantSystem(credentials, user, permissionById))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
targetUserExists(user);
-
+
try {
permHandle.grantSystemPermission(user, permissionById);
log.info("Granted system permission " + permissionById + " for user " + user + " at the request of user " + credentials.getPrincipal());
@@ -534,13 +536,13 @@ public class SecurityOperation {
throw e.asThriftException();
}
}
-
+
public void grantTablePermission(TCredentials c, String user, String tableId, TablePermission permission) throws ThriftSecurityException {
if (!canGrantTable(c, user, tableId))
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
targetUserExists(user);
-
+
try {
permHandle.grantTablePermission(user, tableId, permission);
log.info("Granted table permission " + permission + " for user " + user + " on the table " + tableId + " at the request of user " + c.getPrincipal());
@@ -550,51 +552,51 @@ public class SecurityOperation {
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.TABLE_DOESNT_EXIST);
}
}
-
+
public void revokeSystemPermission(TCredentials credentials, String user, SystemPermission permission) throws ThriftSecurityException {
if (!canRevokeSystem(credentials, user, permission))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
targetUserExists(user);
-
+
try {
permHandle.revokeSystemPermission(user, permission);
log.info("Revoked system permission " + permission + " for user " + user + " at the request of user " + credentials.getPrincipal());
-
+
} catch (AccumuloSecurityException e) {
throw e.asThriftException();
}
}
-
+
public void revokeTablePermission(TCredentials c, String user, String tableId, TablePermission permission) throws ThriftSecurityException {
if (!canRevokeTable(c, user, tableId))
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-
+
targetUserExists(user);
-
+
try {
permHandle.revokeTablePermission(user, tableId, permission);
log.info("Revoked table permission " + permission + " for user " + user + " on the table " + tableId + " at the request of user " + c.getPrincipal());
-
+
} catch (AccumuloSecurityException e) {
throw e.asThriftException();
} catch (TableNotFoundException e) {
throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.TABLE_DOESNT_EXIST);
}
}
-
+
public boolean hasSystemPermission(TCredentials credentials, String user, SystemPermission permissionById) throws ThriftSecurityException {
if (!canAskAboutOtherUsers(credentials, user))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
return _hasSystemPermission(user, permissionById, false);
}
-
+
public boolean hasTablePermission(TCredentials credentials, String user, String tableId, TablePermission permissionById) throws ThriftSecurityException {
if (!canAskAboutOtherUsers(credentials, user))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
return _hasTablePermission(user, tableId, permissionById, false);
}
-
+
public Set<String> listUsers(TCredentials credentials) throws ThriftSecurityException {
authenticate(credentials);
try {
@@ -603,7 +605,7 @@ public class SecurityOperation {
throw e.asThriftException();
}
}
-
+
public void deleteTable(TCredentials credentials, String tableId) throws ThriftSecurityException {
if (!canDeleteTable(credentials, tableId))
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
@@ -616,12 +618,12 @@ public class SecurityOperation {
throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.TABLE_DOESNT_EXIST);
}
}
-
+
public boolean canExport(TCredentials credentials, String tableId, String tableName, String exportDir) throws ThriftSecurityException {
authenticate(credentials);
return hasTablePermission(credentials, tableId, TablePermission.READ, false);
}
-
+
public boolean canImport(TCredentials credentials, String tableName, String importDir) throws ThriftSecurityException {
authenticate(credentials);
return hasSystemPermission(credentials, SystemPermission.CREATE_TABLE, false);
[4/4] git commit: ACCUMULO-391 Use more accurate "InputTableConfig"
term
Posted by ct...@apache.org.
ACCUMULO-391 Use more accurate "InputTableConfig" term
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/61353d1e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/61353d1e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/61353d1e
Branch: refs/heads/master
Commit: 61353d1e3f838f566aa7006e19e0af1ccd02d18a
Parents: a5cf860
Author: Christopher Tubbs <ct...@apache.org>
Authored: Thu Oct 24 19:03:45 2013 -0400
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Thu Oct 24 19:03:45 2013 -0400
----------------------------------------------------------------------
.../core/client/mapred/AbstractInputFormat.java | 29 +-
.../mapred/AccumuloMultiTableInputFormat.java | 10 +-
.../client/mapreduce/AbstractInputFormat.java | 24 +-
.../AccumuloMultiTableInputFormat.java | 10 +-
.../core/client/mapreduce/BatchScanConfig.java | 367 ------
.../core/client/mapreduce/InputTableConfig.java | 370 ++++++
.../mapreduce/lib/util/InputConfigurator.java | 87 +-
.../AccumuloMultiTableInputFormatTest.java | 24 +-
.../AccumuloMultiTableInputFormatTest.java | 22 +-
.../core/conf/TableQueryConfigTest.java | 20 +-
.../accumulo_user_manual/chapters/analytics.tex | 8 +-
.../server/monitor/servlets/trace/Basic.java | 15 +-
.../server/security/SecurityOperation.java | 198 +--
.../server/tabletserver/TabletServer.java | 1223 +++++++++---------
.../TabletServerResourceManager.java | 252 ++--
.../compaction/CompactionStrategy.java | 30 +-
.../apache/accumulo/test/TableOperationsIT.java | 31 +-
17 files changed, 1362 insertions(+), 1358 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
index eaf99cb..856936e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
@@ -39,7 +39,7 @@ import org.apache.accumulo.core.client.impl.OfflineScanner;
import org.apache.accumulo.core.client.impl.ScannerImpl;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.TabletLocator;
-import org.apache.accumulo.core.client.mapreduce.BatchScanConfig;
+import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
@@ -292,19 +292,19 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
}
/**
- * Fetches all {@link BatchScanConfig}s that have been set on the given Hadoop job.
+ * Fetches all {@link InputTableConfig}s that have been set on the given Hadoop job.
*
* @param job
* the Hadoop job instance to be configured
- * @return the {@link BatchScanConfig} objects set on the job
+ * @return the {@link InputTableConfig} objects set on the job
* @since 1.6.0
*/
- public static Map<String,BatchScanConfig> getBatchScanConfigs(JobConf job) {
- return InputConfigurator.getBatchScanConfigs(CLASS, job);
+ public static Map<String,InputTableConfig> getInputTableConfigs(JobConf job) {
+ return InputConfigurator.getInputTableConfigs(CLASS, job);
}
/**
- * Fetches a {@link org.apache.accumulo.core.client.mapreduce.BatchScanConfig} that has been set on the configuration for a specific table.
+ * Fetches a {@link InputTableConfig} that has been set on the configuration for a specific table.
*
* <p>
* null is returned in the event that the table doesn't exist.
@@ -313,11 +313,11 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
* the Hadoop job instance to be configured
* @param tableName
* the table name for which to grab the config object
- * @return the {@link org.apache.accumulo.core.client.mapreduce.BatchScanConfig} for the given table
+ * @return the {@link InputTableConfig} for the given table
* @since 1.6.0
*/
- public static BatchScanConfig getBatchScanConfig(JobConf job, String tableName) {
- return InputConfigurator.getBatchScanConfig(CLASS, job, tableName);
+ public static InputTableConfig getInputTableConfigs(JobConf job, String tableName) {
+ return InputConfigurator.getInputTableConfig(CLASS, job, tableName);
}
/**
@@ -362,8 +362,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
// in case the table name changed, we can still use the previous name for terms of configuration,
// but the scanner will use the table id resolved at job setup time
- BatchScanConfig tableConfig = getBatchScanConfig(job, split.getTableName());
-
+ InputTableConfig tableConfig = getInputTableConfigs(job, split.getTableName());
try {
log.debug("Creating connector with user: " + principal);
@@ -432,7 +431,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
Instance instance = getInstance(job);
Connector conn = instance.getConnector(getPrincipal(job), getAuthenticationToken(job));
-
+
return InputConfigurator.binOffline(tableId, ranges, instance, conn);
}
@@ -445,10 +444,10 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> {
validateOptions(job);
LinkedList<InputSplit> splits = new LinkedList<InputSplit>();
- Map<String,BatchScanConfig> tableConfigs = getBatchScanConfigs(job);
- for (Map.Entry<String,BatchScanConfig> tableConfigEntry : tableConfigs.entrySet()) {
+ Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(job);
+ for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) {
String tableName = tableConfigEntry.getKey();
- BatchScanConfig tableConfig = tableConfigEntry.getValue();
+ InputTableConfig tableConfig = tableConfigEntry.getValue();
boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
String tableId = null;
List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java
index ed51866..61838db 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java
@@ -19,7 +19,7 @@ package org.apache.accumulo.core.client.mapred;
import java.io.IOException;
import java.util.Map;
-import org.apache.accumulo.core.client.mapreduce.BatchScanConfig;
+import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapred.Reporter;
* <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)}
* <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, org.apache.accumulo.core.security.Authorizations)}
* <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, String, String)} OR {@link AccumuloInputFormat#setMockInstance(JobConf, String)}
- * <li>{@link AccumuloMultiTableInputFormat#setBatchScanConfigs(org.apache.hadoop.mapred.JobConf, java.util.Map)}
+ * <li>{@link AccumuloMultiTableInputFormat#setInputTableConfigs(org.apache.hadoop.mapred.JobConf, java.util.Map)}
* </ul>
*
* Other static methods are optional.
@@ -49,7 +49,7 @@ import org.apache.hadoop.mapred.Reporter;
public class AccumuloMultiTableInputFormat extends AbstractInputFormat<Key,Value> {
/**
- * Sets the {@link BatchScanConfig} objects on the given Hadoop configuration
+ * Sets the {@link InputTableConfig} objects on the given Hadoop configuration
*
* @param job
* the Hadoop job instance to be configured
@@ -57,8 +57,8 @@ public class AccumuloMultiTableInputFormat extends AbstractInputFormat<Key,Value
* the table query configs to be set on the configuration.
* @since 1.6.0
*/
- public static void setBatchScanConfigs(JobConf job, Map<String,BatchScanConfig> configs) {
- InputConfigurator.setBatchScanConfigs(CLASS, job, configs);
+ public static void setInputTableConfigs(JobConf job, Map<String,InputTableConfig> configs) {
+ InputConfigurator.setInputTableConfigs(CLASS, job, configs);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index d426caf..626a785 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@ -273,19 +273,19 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
}
/**
- * Fetches all {@link BatchScanConfig}s that have been set on the given job.
+ * Fetches all {@link InputTableConfig}s that have been set on the given job.
*
* @param context
* the Hadoop job instance to be configured
- * @return the {@link BatchScanConfig} objects for the job
+ * @return the {@link InputTableConfig} objects for the job
* @since 1.6.0
*/
- protected static Map<String,BatchScanConfig> getBatchScanConfigs(JobContext context) {
- return InputConfigurator.getBatchScanConfigs(CLASS, getConfiguration(context));
+ protected static Map<String,InputTableConfig> getInputTableConfigs(JobContext context) {
+ return InputConfigurator.getInputTableConfigs(CLASS, getConfiguration(context));
}
/**
- * Fetches a {@link BatchScanConfig} that has been set on the configuration for a specific table.
+ * Fetches a {@link InputTableConfig} that has been set on the configuration for a specific table.
*
* <p>
* null is returned in the event that the table doesn't exist.
@@ -294,11 +294,11 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
* the Hadoop job instance to be configured
* @param tableName
* the table name for which to grab the config object
- * @return the {@link BatchScanConfig} for the given table
+ * @return the {@link InputTableConfig} for the given table
* @since 1.6.0
*/
- protected static BatchScanConfig getBatchScanConfig(JobContext context, String tableName) {
- return InputConfigurator.getBatchScanConfig(CLASS, getConfiguration(context), tableName);
+ protected static InputTableConfig getInputTableConfig(JobContext context, String tableName) {
+ return InputConfigurator.getInputTableConfig(CLASS, getConfiguration(context), tableName);
}
/**
@@ -377,7 +377,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
// in case the table name changed, we can still use the previous name for terms of configuration,
// but the scanner will use the table id resolved at job setup time
- BatchScanConfig tableConfig = getBatchScanConfig(attempt, split.getTableName());
+ InputTableConfig tableConfig = getInputTableConfig(attempt, split.getTableName());
try {
@@ -471,11 +471,11 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> {
validateOptions(context);
LinkedList<InputSplit> splits = new LinkedList<InputSplit>();
- Map<String,BatchScanConfig> tableConfigs = getBatchScanConfigs(context);
- for (Map.Entry<String,BatchScanConfig> tableConfigEntry : tableConfigs.entrySet()) {
+ Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(context);
+ for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) {
String tableName = tableConfigEntry.getKey();
- BatchScanConfig tableConfig = tableConfigEntry.getValue();
+ InputTableConfig tableConfig = tableConfigEntry.getValue();
boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
String tableId = null;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java
index 06bcd01..bd15447 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
* <li>{@link AccumuloMultiTableInputFormat#setConnectorInfo(Job, String, org.apache.accumulo.core.client.security.tokens.AuthenticationToken)}
* <li>{@link AccumuloMultiTableInputFormat#setScanAuthorizations(Job, org.apache.accumulo.core.security.Authorizations)}
* <li>{@link AccumuloMultiTableInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloInputFormat#setMockInstance(Job, String)}
- * <li>{@link AccumuloMultiTableInputFormat#setBatchScanConfigs(org.apache.hadoop.mapreduce.Job, Map<String,BatchScanConfig>)}
+ * <li>{@link AccumuloMultiTableInputFormat#setInputTableConfigs(Job, Map)}
* </ul>
*
* Other static methods are optional.
@@ -51,7 +51,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class AccumuloMultiTableInputFormat extends AbstractInputFormat<Key,Value> {
/**
- * Sets the {@link BatchScanConfig} objects on the given Hadoop configuration
+ * Sets the {@link InputTableConfig} objects on the given Hadoop configuration
*
* @param job
* the Hadoop job instance to be configured
@@ -59,9 +59,9 @@ public class AccumuloMultiTableInputFormat extends AbstractInputFormat<Key,Value
* the table query configs to be set on the configuration.
* @since 1.6.0
*/
- public static void setBatchScanConfigs(Job job, Map<String,BatchScanConfig> configs) {
+ public static void setInputTableConfigs(Job job, Map<String,InputTableConfig> configs) {
checkNotNull(configs);
- InputConfigurator.setBatchScanConfigs(CLASS, getConfiguration(job), configs);
+ InputConfigurator.setInputTableConfigs(CLASS, getConfiguration(job), configs);
}
@Override
@@ -84,7 +84,7 @@ public class AccumuloMultiTableInputFormat extends AbstractInputFormat<Key,Value
@Override
protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName) {
- List<IteratorSetting> iterators = getBatchScanConfig(context, tableName).getIterators();
+ List<IteratorSetting> iterators = getInputTableConfig(context, tableName).getIterators();
for (IteratorSetting setting : iterators) {
scanner.addScanIterator(setting);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/core/src/main/java/org/apache/accumulo/core/client/mapreduce/BatchScanConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/BatchScanConfig.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/BatchScanConfig.java
deleted file mode 100644
index 470b460..0000000
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/BatchScanConfig.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-/**
- * This class to holds a batch scan configuration for a table. It contains all the properties needed to specify how rows should be returned from the table.
- */
-public class BatchScanConfig implements Writable {
-
- private List<IteratorSetting> iterators;
- private List<Range> ranges;
- private Collection<Pair<Text,Text>> columns;
-
- private boolean autoAdjustRanges = true;
- private boolean useLocalIterators = false;
- private boolean useIsolatedScanners = false;
- private boolean offlineScan = false;
-
- public BatchScanConfig() {}
-
- /**
- * Creates a batch scan config object out of a previously serialized batch scan config object.
- * @param input
- * the data input of the serialized batch scan config
- * @throws IOException
- */
- public BatchScanConfig(DataInput input) throws IOException {
- readFields(input);
- }
-
- /**
- * Sets the input ranges to scan for all tables associated with this job. This will be added to any per-table ranges that have been set using
- *
- * @param ranges
- * the ranges that will be mapped over
- * @since 1.6.0
- */
- public BatchScanConfig setRanges(List<Range> ranges) {
- this.ranges = ranges;
- return this;
- }
-
- /**
- * Returns the ranges to be queried in the configuration
- */
- public List<Range> getRanges() {
- return ranges != null ? ranges : new ArrayList<Range>();
- }
-
- /**
- * Restricts the columns that will be mapped over for this job for the default input table.
- *
- * @param columns
- * a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
- * selected. An empty set is the default and is equivalent to scanning the all columns.
- * @since 1.6.0
- */
- public BatchScanConfig fetchColumns(Collection<Pair<Text,Text>> columns) {
- this.columns = columns;
- return this;
- }
-
- /**
- * Returns the columns to be fetched for this configuration
- */
- public Collection<Pair<Text,Text>> getFetchedColumns() {
- return columns != null ? columns : new HashSet<Pair<Text,Text>>();
- }
-
- /**
- * Set iterators on to be used in the query.
- *
- * @param iterators
- * the configurations for the iterators
- * @since 1.6.0
- */
- public BatchScanConfig setIterators(List<IteratorSetting> iterators) {
- this.iterators = iterators;
- return this;
- }
-
- /**
- * Returns the iterators to be set on this configuration
- */
- public List<IteratorSetting> getIterators() {
- return iterators != null ? iterators : new ArrayList<IteratorSetting>();
- }
-
- /**
- * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
- * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
- *
- * <p>
- * By default, this feature is <b>enabled</b>.
- *
- * @param autoAdjustRanges
- * the feature is enabled if true, disabled otherwise
- * @see #setRanges(java.util.List)
- * @since 1.6.0
- */
- public BatchScanConfig setAutoAdjustRanges(boolean autoAdjustRanges) {
- this.autoAdjustRanges = autoAdjustRanges;
- return this;
- }
-
- /**
- * Determines whether a configuration has auto-adjust ranges enabled.
- *
- * @return false if the feature is disabled, true otherwise
- * @since 1.6.0
- * @see #setAutoAdjustRanges(boolean)
- */
- public boolean shouldAutoAdjustRanges() {
- return autoAdjustRanges;
- }
-
- /**
- * Controls the use of the {@link org.apache.accumulo.core.client.ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack
- * to be constructed within the Map task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be
- * available on the classpath for the task.
- *
- * <p>
- * By default, this feature is <b>disabled</b>.
- *
- * @param useLocalIterators
- * the feature is enabled if true, disabled otherwise
- * @since 1.6.0
- */
- public BatchScanConfig setUseLocalIterators(boolean useLocalIterators) {
- this.useLocalIterators = useLocalIterators;
- return this;
- }
-
- /**
- * Determines whether a configuration uses local iterators.
- *
- * @return true if the feature is enabled, false otherwise
- * @since 1.6.0
- * @see #setUseLocalIterators(boolean)
- */
- public boolean shouldUseLocalIterators() {
- return useLocalIterators;
- }
-
- /**
- * <p>
- * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
- * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
- * fail.
- *
- * <p>
- * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
- *
- * <p>
- * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
- * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
- *
- * <p>
- * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
- * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
- * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
- *
- * <p>
- * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
- * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
- *
- * <p>
- * By default, this feature is <b>disabled</b>.
- *
- * @param offlineScan
- * the feature is enabled if true, disabled otherwise
- * @since 1.6.0
- */
- public BatchScanConfig setOfflineScan(boolean offlineScan) {
- this.offlineScan = offlineScan;
- return this;
- }
-
- /**
- * Determines whether a configuration has the offline table scan feature enabled.
- *
- * @return true if the feature is enabled, false otherwise
- * @since 1.6.0
- * @see #setOfflineScan(boolean)
- */
- public boolean isOfflineScan() {
- return offlineScan;
- }
-
- /**
- * Controls the use of the {@link org.apache.accumulo.core.client.IsolatedScanner} in this job.
- *
- * <p>
- * By default, this feature is <b>disabled</b>.
- *
- * @param useIsolatedScanners
- * the feature is enabled if true, disabled otherwise
- * @since 1.6.0
- */
- public BatchScanConfig setUseIsolatedScanners(boolean useIsolatedScanners) {
- this.useIsolatedScanners = useIsolatedScanners;
- return this;
- }
-
- /**
- * Determines whether a configuration has isolation enabled.
- *
- * @return true if the feature is enabled, false otherwise
- * @since 1.6.0
- * @see #setUseIsolatedScanners(boolean)
- */
- public boolean shouldUseIsolatedScanners() {
- return useIsolatedScanners;
- }
-
- /**
- * Writes the state for the current object out to the specified {@see DataOutput}
- * @param dataOutput
- * the output for which to write the object's state
- * @throws IOException
- */
- @Override
- public void write(DataOutput dataOutput) throws IOException {
- if (iterators != null) {
- dataOutput.writeInt(iterators.size());
- for (IteratorSetting setting : iterators)
- setting.write(dataOutput);
- } else {
- dataOutput.writeInt(0);
- }
- if (ranges != null) {
- dataOutput.writeInt(ranges.size());
- for (Range range : ranges)
- range.write(dataOutput);
- } else {
- dataOutput.writeInt(0);
- }
- if (columns != null) {
- dataOutput.writeInt(columns.size());
- for (Pair<Text,Text> column : columns) {
- if (column.getSecond() == null) {
- dataOutput.writeInt(1);
- column.getFirst().write(dataOutput);
- } else {
- dataOutput.writeInt(2);
- column.getFirst().write(dataOutput);
- column.getSecond().write(dataOutput);
- }
- }
- } else {
- dataOutput.writeInt(0);
- }
- dataOutput.writeBoolean(autoAdjustRanges);
- dataOutput.writeBoolean(useLocalIterators);
- dataOutput.writeBoolean(useIsolatedScanners);
- }
-
- /**
- * Reads the fields in the {@see DataInput} into the current object
- * @param dataInput
- * the input fields to read into the current object
- * @throws IOException
- */
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- // load iterators
- long iterSize = dataInput.readInt();
- if (iterSize > 0)
- iterators = new ArrayList<IteratorSetting>();
- for (int i = 0; i < iterSize; i++)
- iterators.add(new IteratorSetting(dataInput));
- // load ranges
- long rangeSize = dataInput.readInt();
- if (rangeSize > 0)
- ranges = new ArrayList<Range>();
- for (int i = 0; i < rangeSize; i++) {
- Range range = new Range();
- range.readFields(dataInput);
- ranges.add(range);
- }
- // load columns
- long columnSize = dataInput.readInt();
- if (columnSize > 0)
- columns = new HashSet<Pair<Text,Text>>();
- for (int i = 0; i < columnSize; i++) {
- long numPairs = dataInput.readInt();
- Text colFam = new Text();
- colFam.readFields(dataInput);
- if (numPairs == 1) {
- columns.add(new Pair<Text,Text>(colFam, null));
- } else if (numPairs == 2) {
- Text colQual = new Text();
- colQual.readFields(dataInput);
- columns.add(new Pair<Text,Text>(colFam, colQual));
- }
- }
- autoAdjustRanges = dataInput.readBoolean();
- useLocalIterators = dataInput.readBoolean();
- useIsolatedScanners = dataInput.readBoolean();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- BatchScanConfig that = (BatchScanConfig) o;
-
- if (autoAdjustRanges != that.autoAdjustRanges)
- return false;
- if (offlineScan != that.offlineScan)
- return false;
- if (useIsolatedScanners != that.useIsolatedScanners)
- return false;
- if (useLocalIterators != that.useLocalIterators)
- return false;
- if (columns != null ? !columns.equals(that.columns) : that.columns != null)
- return false;
- if (iterators != null ? !iterators.equals(that.iterators) : that.iterators != null)
- return false;
- if (ranges != null ? !ranges.equals(that.ranges) : that.ranges != null)
- return false;
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = 31 * (iterators != null ? iterators.hashCode() : 0);
- result = 31 * result + (ranges != null ? ranges.hashCode() : 0);
- result = 31 * result + (columns != null ? columns.hashCode() : 0);
- result = 31 * result + (autoAdjustRanges ? 1 : 0);
- result = 31 * result + (useLocalIterators ? 1 : 0);
- result = 31 * result + (useIsolatedScanners ? 1 : 0);
- result = 31 * result + (offlineScan ? 1 : 0);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java
new file mode 100644
index 0000000..808bd7c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputTableConfig.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class to holds a batch scan configuration for a table. It contains all the properties needed to specify how rows should be returned from the table.
+ */
+public class InputTableConfig implements Writable {
+
+ private List<IteratorSetting> iterators;
+ private List<Range> ranges;
+ private Collection<Pair<Text,Text>> columns;
+
+ private boolean autoAdjustRanges = true;
+ private boolean useLocalIterators = false;
+ private boolean useIsolatedScanners = false;
+ private boolean offlineScan = false;
+
+ public InputTableConfig() {}
+
+ /**
+ * Creates a batch scan config object out of a previously serialized batch scan config object.
+ *
+ * @param input
+ * the data input of the serialized batch scan config
+ * @throws IOException
+ */
+ public InputTableConfig(DataInput input) throws IOException {
+ readFields(input);
+ }
+
+ /**
+ * Sets the input ranges to scan for all tables associated with this job. This will be added to any per-table ranges that have been set using
+ *
+ * @param ranges
+ * the ranges that will be mapped over
+ * @since 1.6.0
+ */
+ public InputTableConfig setRanges(List<Range> ranges) {
+ this.ranges = ranges;
+ return this;
+ }
+
+ /**
+ * Returns the ranges to be queried in the configuration
+ */
+ public List<Range> getRanges() {
+ return ranges != null ? ranges : new ArrayList<Range>();
+ }
+
+ /**
+ * Restricts the columns that will be mapped over for this job for the default input table.
+ *
+ * @param columns
+ * a pair of {@link Text} objects corresponding to column family and column qualifier. If the column qualifier is null, the entire column family is
+ * selected. An empty set is the default and is equivalent to scanning the all columns.
+ * @since 1.6.0
+ */
+ public InputTableConfig fetchColumns(Collection<Pair<Text,Text>> columns) {
+ this.columns = columns;
+ return this;
+ }
+
+ /**
+ * Returns the columns to be fetched for this configuration
+ */
+ public Collection<Pair<Text,Text>> getFetchedColumns() {
+ return columns != null ? columns : new HashSet<Pair<Text,Text>>();
+ }
+
+ /**
+ * Set iterators on to be used in the query.
+ *
+ * @param iterators
+ * the configurations for the iterators
+ * @since 1.6.0
+ */
+ public InputTableConfig setIterators(List<IteratorSetting> iterators) {
+ this.iterators = iterators;
+ return this;
+ }
+
+ /**
+ * Returns the iterators to be set on this configuration
+ */
+ public List<IteratorSetting> getIterators() {
+ return iterators != null ? iterators : new ArrayList<IteratorSetting>();
+ }
+
+ /**
+ * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries.
+ * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. *
+ *
+ * <p>
+ * By default, this feature is <b>enabled</b>.
+ *
+ * @param autoAdjustRanges
+ * the feature is enabled if true, disabled otherwise
+ * @see #setRanges(java.util.List)
+ * @since 1.6.0
+ */
+ public InputTableConfig setAutoAdjustRanges(boolean autoAdjustRanges) {
+ this.autoAdjustRanges = autoAdjustRanges;
+ return this;
+ }
+
+ /**
+ * Determines whether a configuration has auto-adjust ranges enabled.
+ *
+ * @return false if the feature is disabled, true otherwise
+ * @since 1.6.0
+ * @see #setAutoAdjustRanges(boolean)
+ */
+ public boolean shouldAutoAdjustRanges() {
+ return autoAdjustRanges;
+ }
+
+ /**
+ * Controls the use of the {@link org.apache.accumulo.core.client.ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack
+ * to be constructed within the Map task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be
+ * available on the classpath for the task.
+ *
+ * <p>
+ * By default, this feature is <b>disabled</b>.
+ *
+ * @param useLocalIterators
+ * the feature is enabled if true, disabled otherwise
+ * @since 1.6.0
+ */
+ public InputTableConfig setUseLocalIterators(boolean useLocalIterators) {
+ this.useLocalIterators = useLocalIterators;
+ return this;
+ }
+
+ /**
+ * Determines whether a configuration uses local iterators.
+ *
+ * @return true if the feature is enabled, false otherwise
+ * @since 1.6.0
+ * @see #setUseLocalIterators(boolean)
+ */
+ public boolean shouldUseLocalIterators() {
+ return useLocalIterators;
+ }
+
+ /**
+ * <p>
+ * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the
+ * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will
+ * fail.
+ *
+ * <p>
+ * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS.
+ *
+ * <p>
+ * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be
+ * on the mapper's classpath. The accumulo-site.xml may need to be on the mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
+ *
+ * <p>
+ * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map
+ * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The
+ * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file.
+ *
+ * <p>
+ * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support
+ * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server.
+ *
+ * <p>
+ * By default, this feature is <b>disabled</b>.
+ *
+ * @param offlineScan
+ * the feature is enabled if true, disabled otherwise
+ * @since 1.6.0
+ */
+ public InputTableConfig setOfflineScan(boolean offlineScan) {
+ this.offlineScan = offlineScan;
+ return this;
+ }
+
+ /**
+ * Determines whether a configuration has the offline table scan feature enabled.
+ *
+ * @return true if the feature is enabled, false otherwise
+ * @since 1.6.0
+ * @see #setOfflineScan(boolean)
+ */
+ public boolean isOfflineScan() {
+ return offlineScan;
+ }
+
+ /**
+ * Controls the use of the {@link org.apache.accumulo.core.client.IsolatedScanner} in this job.
+ *
+ * <p>
+ * By default, this feature is <b>disabled</b>.
+ *
+ * @param useIsolatedScanners
+ * the feature is enabled if true, disabled otherwise
+ * @since 1.6.0
+ */
+ public InputTableConfig setUseIsolatedScanners(boolean useIsolatedScanners) {
+ this.useIsolatedScanners = useIsolatedScanners;
+ return this;
+ }
+
+ /**
+ * Determines whether a configuration has isolation enabled.
+ *
+ * @return true if the feature is enabled, false otherwise
+ * @since 1.6.0
+ * @see #setUseIsolatedScanners(boolean)
+ */
+ public boolean shouldUseIsolatedScanners() {
+ return useIsolatedScanners;
+ }
+
+ /**
+ * Writes the state for the current object out to the specified {@link DataOutput}
+ *
+ * @param dataOutput
+ * the output for which to write the object's state
+ * @throws IOException
+ */
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ if (iterators != null) {
+ dataOutput.writeInt(iterators.size());
+ for (IteratorSetting setting : iterators)
+ setting.write(dataOutput);
+ } else {
+ dataOutput.writeInt(0);
+ }
+ if (ranges != null) {
+ dataOutput.writeInt(ranges.size());
+ for (Range range : ranges)
+ range.write(dataOutput);
+ } else {
+ dataOutput.writeInt(0);
+ }
+ if (columns != null) {
+ dataOutput.writeInt(columns.size());
+ for (Pair<Text,Text> column : columns) {
+ if (column.getSecond() == null) {
+ dataOutput.writeInt(1);
+ column.getFirst().write(dataOutput);
+ } else {
+ dataOutput.writeInt(2);
+ column.getFirst().write(dataOutput);
+ column.getSecond().write(dataOutput);
+ }
+ }
+ } else {
+ dataOutput.writeInt(0);
+ }
+ dataOutput.writeBoolean(autoAdjustRanges);
+ dataOutput.writeBoolean(useLocalIterators);
+ dataOutput.writeBoolean(useIsolatedScanners);
+ }
+
+ /**
+ * Reads the fields in the {@link DataInput} into the current object
+ *
+ * @param dataInput
+ * the input fields to read into the current object
+ * @throws IOException
+ */
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ // load iterators
+ long iterSize = dataInput.readInt();
+ if (iterSize > 0)
+ iterators = new ArrayList<IteratorSetting>();
+ for (int i = 0; i < iterSize; i++)
+ iterators.add(new IteratorSetting(dataInput));
+ // load ranges
+ long rangeSize = dataInput.readInt();
+ if (rangeSize > 0)
+ ranges = new ArrayList<Range>();
+ for (int i = 0; i < rangeSize; i++) {
+ Range range = new Range();
+ range.readFields(dataInput);
+ ranges.add(range);
+ }
+ // load columns
+ long columnSize = dataInput.readInt();
+ if (columnSize > 0)
+ columns = new HashSet<Pair<Text,Text>>();
+ for (int i = 0; i < columnSize; i++) {
+ long numPairs = dataInput.readInt();
+ Text colFam = new Text();
+ colFam.readFields(dataInput);
+ if (numPairs == 1) {
+ columns.add(new Pair<Text,Text>(colFam, null));
+ } else if (numPairs == 2) {
+ Text colQual = new Text();
+ colQual.readFields(dataInput);
+ columns.add(new Pair<Text,Text>(colFam, colQual));
+ }
+ }
+ autoAdjustRanges = dataInput.readBoolean();
+ useLocalIterators = dataInput.readBoolean();
+ useIsolatedScanners = dataInput.readBoolean();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ InputTableConfig that = (InputTableConfig) o;
+
+ if (autoAdjustRanges != that.autoAdjustRanges)
+ return false;
+ if (offlineScan != that.offlineScan)
+ return false;
+ if (useIsolatedScanners != that.useIsolatedScanners)
+ return false;
+ if (useLocalIterators != that.useLocalIterators)
+ return false;
+ if (columns != null ? !columns.equals(that.columns) : that.columns != null)
+ return false;
+ if (iterators != null ? !iterators.equals(that.iterators) : that.iterators != null)
+ return false;
+ if (ranges != null ? !ranges.equals(that.ranges) : that.ranges != null)
+ return false;
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 31 * (iterators != null ? iterators.hashCode() : 0);
+ result = 31 * result + (ranges != null ? ranges.hashCode() : 0);
+ result = 31 * result + (columns != null ? columns.hashCode() : 0);
+ result = 31 * result + (autoAdjustRanges ? 1 : 0);
+ result = 31 * result + (useLocalIterators ? 1 : 0);
+ result = 31 * result + (useIsolatedScanners ? 1 : 0);
+ result = 31 * result + (offlineScan ? 1 : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
index 4aeffca..11a1619 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java
@@ -46,7 +46,7 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.TabletLocator;
-import org.apache.accumulo.core.client.mapreduce.BatchScanConfig;
+import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
import org.apache.accumulo.core.client.mock.MockTabletLocator;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.data.Key;
@@ -495,12 +495,12 @@ public class InputConfigurator extends ConfiguratorBase {
* @param conf
* the Hadoop configuration object to configure
* @param configs
- * an array of {@link BatchScanConfig} objects to associate with the job
+ * an array of {@link InputTableConfig} objects to associate with the job
* @since 1.6.0
*/
- public static void setBatchScanConfigs(Class<?> implementingClass, Configuration conf, Map<String,BatchScanConfig> configs) {
+ public static void setInputTableConfigs(Class<?> implementingClass, Configuration conf, Map<String,InputTableConfig> configs) {
MapWritable mapWritable = new MapWritable();
- for (Map.Entry<String,BatchScanConfig> tableConfig : configs.entrySet())
+ for (Map.Entry<String,InputTableConfig> tableConfig : configs.entrySet())
mapWritable.put(new Text(tableConfig.getKey()), tableConfig.getValue());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -515,7 +515,7 @@ public class InputConfigurator extends ConfiguratorBase {
}
/**
- * Returns all {@link BatchScanConfig} objects associated with this job.
+ * Returns all {@link InputTableConfig} objects associated with this job.
*
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
@@ -524,10 +524,10 @@ public class InputConfigurator extends ConfiguratorBase {
* @return all of the table query configs for the job
* @since 1.6.0
*/
- public static Map<String,BatchScanConfig> getBatchScanConfigs(Class<?> implementingClass, Configuration conf) {
- Map<String,BatchScanConfig> configs = new HashMap<String,BatchScanConfig>();
- Map.Entry<String, BatchScanConfig> defaultConfig = getDefaultBatchScanConfig(implementingClass, conf);
- if(defaultConfig != null)
+ public static Map<String,InputTableConfig> getInputTableConfigs(Class<?> implementingClass, Configuration conf) {
+ Map<String,InputTableConfig> configs = new HashMap<String,InputTableConfig>();
+ Map.Entry<String,InputTableConfig> defaultConfig = getDefaultInputTableConfig(implementingClass, conf);
+ if (defaultConfig != null)
configs.put(defaultConfig.getKey(), defaultConfig.getValue());
String configString = conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_CONFIGS));
MapWritable mapWritable = new MapWritable();
@@ -542,13 +542,13 @@ public class InputConfigurator extends ConfiguratorBase {
}
}
for (Map.Entry<Writable,Writable> entry : mapWritable.entrySet())
- configs.put(((Text) entry.getKey()).toString(), (BatchScanConfig) entry.getValue());
+ configs.put(((Text) entry.getKey()).toString(), (InputTableConfig) entry.getValue());
return configs;
}
/**
- * Returns the {@link BatchScanConfig} for the given table
+ * Returns the {@link InputTableConfig} for the given table
*
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
@@ -559,8 +559,8 @@ public class InputConfigurator extends ConfiguratorBase {
* @return the table query config for the given table name (if it exists) and null if it does not
* @since 1.6.0
*/
- public static BatchScanConfig getBatchScanConfig(Class<?> implementingClass, Configuration conf, String tableName) {
- Map<String,BatchScanConfig> queryConfigs = getBatchScanConfigs(implementingClass, conf);
+ public static InputTableConfig getInputTableConfig(Class<?> implementingClass, Configuration conf, String tableName) {
+ Map<String,InputTableConfig> queryConfigs = getInputTableConfigs(implementingClass, conf);
return queryConfigs.get(tableName);
}
@@ -599,8 +599,8 @@ public class InputConfigurator extends ConfiguratorBase {
* @since 1.5.0
*/
public static void validateOptions(Class<?> implementingClass, Configuration conf) throws IOException {
-
- Map<String, BatchScanConfig> batchScanConfigs = getBatchScanConfigs(implementingClass, conf);
+
+ Map<String,InputTableConfig> inputTableConfigs = getInputTableConfigs(implementingClass, conf);
if (!isConnectorInfoSet(implementingClass, conf))
throw new IOException("Input info has not been set.");
String instanceKey = conf.get(enumToConfKey(implementingClass, InstanceOpts.TYPE));
@@ -613,16 +613,16 @@ public class InputConfigurator extends ConfiguratorBase {
Connector c = getInstance(implementingClass, conf).getConnector(principal, token);
if (!c.securityOperations().authenticateUser(principal, token))
throw new IOException("Unable to authenticate user");
-
- if(getBatchScanConfigs(implementingClass, conf).size() == 0)
+
+ if (getInputTableConfigs(implementingClass, conf).size() == 0)
throw new IOException("No table set.");
-
- for (Map.Entry<String, BatchScanConfig> tableConfig : batchScanConfigs.entrySet()) {
+
+ for (Map.Entry<String,InputTableConfig> tableConfig : inputTableConfigs.entrySet()) {
if (!c.securityOperations().hasTablePermission(getPrincipal(implementingClass, conf), tableConfig.getKey(), TablePermission.READ))
throw new IOException("Unable to access table");
}
- for (Map.Entry<String, BatchScanConfig> tableConfigEntry : batchScanConfigs.entrySet()) {
- BatchScanConfig tableConfig = tableConfigEntry.getValue();
+ for (Map.Entry<String,InputTableConfig> tableConfigEntry : inputTableConfigs.entrySet()) {
+ InputTableConfig tableConfig = tableConfigEntry.getValue();
if (!tableConfig.shouldUseLocalIterators()) {
if (tableConfig.getIterators() != null) {
for (IteratorSetting iter : tableConfig.getIterators()) {
@@ -642,7 +642,7 @@ public class InputConfigurator extends ConfiguratorBase {
}
/**
- * Returns the {@link org.apache.accumulo.core.client.mapreduce.BatchScanConfig} for the configuration based on the properties set using the single-table
+ * Returns the {@link org.apache.accumulo.core.client.mapreduce.InputTableConfig} for the configuration based on the properties set using the single-table
* input methods.
*
* @param implementingClass
@@ -650,13 +650,12 @@ public class InputConfigurator extends ConfiguratorBase {
* @param conf
* the Hadoop instance for which to retrieve the configuration
* @return the config object built from the single input table properties set on the job
- * @throws IOException
* @since 1.6.0
*/
- protected static Map.Entry<String, BatchScanConfig> getDefaultBatchScanConfig(Class<?> implementingClass, Configuration conf) {
+ protected static Map.Entry<String,InputTableConfig> getDefaultInputTableConfig(Class<?> implementingClass, Configuration conf) {
String tableName = getInputTableName(implementingClass, conf);
if (tableName != null) {
- BatchScanConfig queryConfig = new BatchScanConfig();
+ InputTableConfig queryConfig = new InputTableConfig();
List<IteratorSetting> itrs = getIterators(implementingClass, conf);
if (itrs != null)
queryConfig.setIterators(itrs);
@@ -682,22 +681,22 @@ public class InputConfigurator extends ConfiguratorBase {
public static Map<String,Map<KeyExtent,List<Range>>> binOffline(String tableId, List<Range> ranges, Instance instance, Connector conn)
throws AccumuloException, TableNotFoundException {
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-
+
if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
Tables.clearCache(instance);
if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
throw new AccumuloException("Table is online tableId:" + tableId + " cannot scan table in offline mode ");
}
}
-
+
for (Range range : ranges) {
Text startRow;
-
+
if (range.getStartKey() != null)
startRow = range.getStartKey().getRow();
else
startRow = new Text();
-
+
Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false);
Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
@@ -705,7 +704,7 @@ public class InputConfigurator extends ConfiguratorBase {
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME);
scanner.setRange(metadataRange);
-
+
RowIterator rowIter = new RowIterator(scanner);
KeyExtent lastExtent = null;
while (rowIter.hasNext()) {
@@ -713,58 +712,58 @@ public class InputConfigurator extends ConfiguratorBase {
String last = "";
KeyExtent extent = null;
String location = null;
-
+
while (row.hasNext()) {
Map.Entry<Key,Value> entry = row.next();
Key key = entry.getKey();
-
+
if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) {
last = entry.getValue().toString();
}
-
+
if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)
|| key.getColumnFamily().equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) {
location = entry.getValue().toString();
}
-
+
if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
extent = new KeyExtent(key.getRow(), entry.getValue());
}
-
+
}
-
+
if (location != null)
return null;
-
+
if (!extent.getTableId().toString().equals(tableId)) {
throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
}
-
+
if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
}
-
+
Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
if (tabletRanges == null) {
tabletRanges = new HashMap<KeyExtent,List<Range>>();
binnedRanges.put(last, tabletRanges);
}
-
+
List<Range> rangeList = tabletRanges.get(extent);
if (rangeList == null) {
rangeList = new ArrayList<Range>();
tabletRanges.put(extent, rangeList);
}
-
+
rangeList.add(range);
-
+
if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
break;
}
-
+
lastExtent = extent;
}
-
+
}
return binnedRanges;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormatTest.java
index 97f8d72..68f88cb 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormatTest.java
@@ -28,7 +28,7 @@ import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.mapreduce.BatchScanConfig;
+import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
@@ -113,14 +113,14 @@ public class AccumuloMultiTableInputFormatTest {
AccumuloMultiTableInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
AccumuloMultiTableInputFormat.setMockInstance(job, INSTANCE_NAME);
- BatchScanConfig tableConfig1 = new BatchScanConfig();
- BatchScanConfig tableConfig2 = new BatchScanConfig();
+ InputTableConfig tableConfig1 = new InputTableConfig();
+ InputTableConfig tableConfig2 = new InputTableConfig();
- Map<String,BatchScanConfig> configMap = new HashMap<String,BatchScanConfig>();
+ Map<String,InputTableConfig> configMap = new HashMap<String,InputTableConfig>();
configMap.put(table1, tableConfig1);
configMap.put(table2, tableConfig2);
- AccumuloMultiTableInputFormat.setBatchScanConfigs(job, configMap);
+ AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap);
job.setMapperClass(TestMapper.class);
job.setMapOutputKeyClass(Key.class);
@@ -162,27 +162,27 @@ public class AccumuloMultiTableInputFormatTest {
}
/**
- * Verify {@link org.apache.accumulo.core.client.mapreduce.BatchScanConfig} objects get correctly serialized in the JobContext.
+ * Verify {@link org.apache.accumulo.core.client.mapreduce.InputTableConfig} objects get correctly serialized in the JobContext.
*/
@Test
public void testTableQueryConfigSerialization() throws IOException {
JobConf job = new JobConf();
- BatchScanConfig table1 = new BatchScanConfig().setRanges(Collections.singletonList(new Range("a", "b")))
+ InputTableConfig table1 = new InputTableConfig().setRanges(Collections.singletonList(new Range("a", "b")))
.fetchColumns(Collections.singleton(new Pair<Text,Text>(new Text("CF1"), new Text("CQ1"))))
.setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
- BatchScanConfig table2 = new BatchScanConfig().setRanges(Collections.singletonList(new Range("a", "b")))
+ InputTableConfig table2 = new InputTableConfig().setRanges(Collections.singletonList(new Range("a", "b")))
.fetchColumns(Collections.singleton(new Pair<Text,Text>(new Text("CF1"), new Text("CQ1"))))
.setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
- Map<String,BatchScanConfig> configMap = new HashMap<String,BatchScanConfig>();
+ Map<String,InputTableConfig> configMap = new HashMap<String,InputTableConfig>();
configMap.put(TEST_TABLE_1, table1);
configMap.put(TEST_TABLE_2, table2);
- AccumuloMultiTableInputFormat.setBatchScanConfigs(job, configMap);
+ AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap);
- assertEquals(table1, AccumuloMultiTableInputFormat.getBatchScanConfig(job, TEST_TABLE_1));
- assertEquals(table2, AccumuloMultiTableInputFormat.getBatchScanConfig(job, TEST_TABLE_2));
+ assertEquals(table1, AccumuloMultiTableInputFormat.getInputTableConfigs(job, TEST_TABLE_1));
+ assertEquals(table2, AccumuloMultiTableInputFormat.getInputTableConfigs(job, TEST_TABLE_2));
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java
index 27149eb..9951367 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormatTest.java
@@ -105,14 +105,14 @@ public class AccumuloMultiTableInputFormatTest {
AccumuloMultiTableInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
- BatchScanConfig tableConfig1 = new BatchScanConfig();
- BatchScanConfig tableConfig2 = new BatchScanConfig();
+ InputTableConfig tableConfig1 = new InputTableConfig();
+ InputTableConfig tableConfig2 = new InputTableConfig();
- Map<String,BatchScanConfig> configMap = new HashMap<String,BatchScanConfig>();
+ Map<String,InputTableConfig> configMap = new HashMap<String,InputTableConfig>();
configMap.put(table1, tableConfig1);
configMap.put(table2, tableConfig2);
- AccumuloMultiTableInputFormat.setBatchScanConfigs(job, configMap);
+ AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap);
AccumuloMultiTableInputFormat.setMockInstance(job, INSTANCE_NAME);
job.setMapperClass(TestMapper.class);
@@ -160,25 +160,25 @@ public class AccumuloMultiTableInputFormatTest {
}
/**
- * Verify {@link BatchScanConfig} objects get correctly serialized in the JobContext.
+ * Verify {@link InputTableConfig} objects get correctly serialized in the JobContext.
*/
@Test
- public void testBatchScanConfigSerialization() throws IOException {
+ public void testInputTableConfigSerialization() throws IOException {
Job job = new Job();
- BatchScanConfig tableConfig = new BatchScanConfig().setRanges(Collections.singletonList(new Range("a", "b")))
+ InputTableConfig tableConfig = new InputTableConfig().setRanges(Collections.singletonList(new Range("a", "b")))
.fetchColumns(Collections.singleton(new Pair<Text,Text>(new Text("CF1"), new Text("CQ1"))))
.setIterators(Collections.singletonList(new IteratorSetting(50, "iter1", "iterclass1")));
- Map<String,BatchScanConfig> configMap = new HashMap<String,BatchScanConfig>();
+ Map<String,InputTableConfig> configMap = new HashMap<String,InputTableConfig>();
configMap.put(TEST_TABLE_1, tableConfig);
configMap.put(TEST_TABLE_2, tableConfig);
- AccumuloMultiTableInputFormat.setBatchScanConfigs(job, configMap);
+ AccumuloMultiTableInputFormat.setInputTableConfigs(job, configMap);
- assertEquals(tableConfig, AccumuloMultiTableInputFormat.getBatchScanConfig(job, TEST_TABLE_1));
- assertEquals(tableConfig, AccumuloMultiTableInputFormat.getBatchScanConfig(job, TEST_TABLE_2));
+ assertEquals(tableConfig, AccumuloMultiTableInputFormat.getInputTableConfig(job, TEST_TABLE_1));
+ assertEquals(tableConfig, AccumuloMultiTableInputFormat.getInputTableConfig(job, TEST_TABLE_2));
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java b/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java
index a2b0db0..65845f3 100644
--- a/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java
@@ -29,7 +29,7 @@ import java.util.List;
import java.util.Set;
import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.mapreduce.BatchScanConfig;
+import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.util.Pair;
import org.apache.hadoop.io.Text;
@@ -38,17 +38,17 @@ import org.junit.Test;
public class TableQueryConfigTest {
- private BatchScanConfig tableQueryConfig;
+ private InputTableConfig tableQueryConfig;
@Before
public void setUp() {
- tableQueryConfig = new BatchScanConfig();
+ tableQueryConfig = new InputTableConfig();
}
@Test
public void testSerialization_OnlyTable() throws IOException {
byte[] serialized = serialize(tableQueryConfig);
- BatchScanConfig actualConfig = deserialize(serialized);
+ InputTableConfig actualConfig = deserialize(serialized);
assertEquals(tableQueryConfig, actualConfig);
}
@@ -61,7 +61,7 @@ public class TableQueryConfigTest {
tableQueryConfig.setRanges(ranges);
byte[] serialized = serialize(tableQueryConfig);
- BatchScanConfig actualConfig = deserialize(serialized);
+ InputTableConfig actualConfig = deserialize(serialized);
assertEquals(ranges, actualConfig.getRanges());
}
@@ -74,7 +74,7 @@ public class TableQueryConfigTest {
tableQueryConfig.fetchColumns(columns);
byte[] serialized = serialize(tableQueryConfig);
- BatchScanConfig actualConfig = deserialize(serialized);
+ InputTableConfig actualConfig = deserialize(serialized);
assertEquals(actualConfig.getFetchedColumns(), columns);
}
@@ -86,21 +86,21 @@ public class TableQueryConfigTest {
settings.add(new IteratorSetting(55, "iter2", "iterclass2"));
tableQueryConfig.setIterators(settings);
byte[] serialized = serialize(tableQueryConfig);
- BatchScanConfig actualConfig = deserialize(serialized);
+ InputTableConfig actualConfig = deserialize(serialized);
assertEquals(actualConfig.getIterators(), settings);
}
- private byte[] serialize(BatchScanConfig tableQueryConfig) throws IOException {
+ private byte[] serialize(InputTableConfig tableQueryConfig) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
tableQueryConfig.write(new DataOutputStream(baos));
baos.close();
return baos.toByteArray();
}
- private BatchScanConfig deserialize(byte[] bytes) throws IOException {
+ private InputTableConfig deserialize(byte[] bytes) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- BatchScanConfig actualConfig = new BatchScanConfig(new DataInputStream(bais));
+ InputTableConfig actualConfig = new InputTableConfig(new DataInputStream(bais));
bais.close();
return actualConfig;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/docs/src/main/latex/accumulo_user_manual/chapters/analytics.tex
----------------------------------------------------------------------
diff --git a/docs/src/main/latex/accumulo_user_manual/chapters/analytics.tex b/docs/src/main/latex/accumulo_user_manual/chapters/analytics.tex
index 7bbd177..fc50d4a 100644
--- a/docs/src/main/latex/accumulo_user_manual/chapters/analytics.tex
+++ b/docs/src/main/latex/accumulo_user_manual/chapters/analytics.tex
@@ -133,8 +133,8 @@ used for each table.
\small
\begin{verbatim}
-BatchScanConfig tableOneConfig = new BatchScanConfig();
-BatchScanConfig tableTwoConfig = new BatchScanConfig();
+InputTableConfig tableOneConfig = new InputTableConfig();
+InputTableConfig tableTwoConfig = new InputTableConfig();
\end{verbatim}
\normalsize
@@ -142,10 +142,10 @@ To set the configuration objects on the job:
\small
\begin{verbatim}
-Map<String, BatchScanConfig> configs = new HashMap<String,BatchScanConfig>();
+Map<String, InputTableConfig> configs = new HashMap<String,InputTableConfig>();
configs.put("table1", tableOneConfig);
configs.put("table2", tableTwoConfig);
-AccumuloMultiTableInputFormat.setBatchScanConfigs(job, configs);
+AccumuloMultiTableInputFormat.setInputTableConfigs(job, configs);
\end{verbatim}
\normalsize
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61353d1e/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/Basic.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/Basic.java b/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/Basic.java
index 2faa9b1..69f2ace 100644
--- a/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/Basic.java
+++ b/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/Basic.java
@@ -36,12 +36,11 @@ import org.apache.accumulo.core.trace.TraceFormatter;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.monitor.Monitor;
import org.apache.accumulo.server.monitor.servlets.BasicServlet;
-import org.apache.accumulo.start.classloader.AccumuloClassLoader;
abstract class Basic extends BasicServlet {
-
+
private static final long serialVersionUID = 1L;
-
+
public static String getStringParameter(HttpServletRequest req, String name, String defaultValue) {
String result = req.getParameter(name);
if (result == null) {
@@ -49,7 +48,7 @@ abstract class Basic extends BasicServlet {
}
return result;
}
-
+
public static int getIntParameter(HttpServletRequest req, String name, int defaultMinutes) {
String valueString = req.getParameter(name);
if (valueString == null)
@@ -62,11 +61,11 @@ abstract class Basic extends BasicServlet {
}
return result;
}
-
+
public static String dateString(long millis) {
return TraceFormatter.formatDate(new Date(millis));
}
-
+
protected Scanner getScanner(StringBuilder sb) throws AccumuloException, AccumuloSecurityException {
AccumuloConfiguration conf = Monitor.getSystemConfiguration();
String principal = conf.get(Property.TRACE_USER);
@@ -81,12 +80,12 @@ abstract class Basic extends BasicServlet {
for (Entry<String,String> entry : loginMap.entrySet()) {
props.put(entry.getKey().substring(prefixLength), entry.getValue());
}
-
+
AuthenticationToken token = Property.createInstanceFromPropertyName(conf, Property.TRACE_TOKEN_TYPE, AuthenticationToken.class, new PasswordToken());
token.init(props);
at = token;
}
-
+
String table = conf.get(Property.TRACE_TABLE);
try {
Connector conn = HdfsZooInstance.getInstance().getConnector(principal, at);