You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2016/01/28 03:01:23 UTC
[4/4] accumulo git commit: Merge branch '1.6' into 1.7
Merge branch '1.6' into 1.7
Conflicts:
core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d4882a15
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d4882a15
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d4882a15
Branch: refs/heads/1.7
Commit: d4882a15fd85482855783e3a56babc14d31bb5bb
Parents: 0faf8b9 21d2f61
Author: Keith Turner <kt...@apache.org>
Authored: Wed Jan 27 19:57:22 2016 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Wed Jan 27 19:57:22 2016 -0500
----------------------------------------------------------------------
.../core/client/impl/CompressedIterators.java | 14 +-
.../core/client/impl/ConditionalWriterImpl.java | 44 ++++-
.../accumulo/core/iterators/IteratorUtil.java | 93 ++++++-----
.../client/impl/ConditionalComparatorTest.java | 53 ++++++
.../tserver/ConditionCheckerContext.java | 164 +++++++++++++++++++
.../apache/accumulo/tserver/TabletServer.java | 76 ++-------
.../accumulo/tserver/tablet/ScanDataSource.java | 23 ++-
.../apache/accumulo/tserver/tablet/Tablet.java | 18 ++
.../accumulo/test/ConditionalWriterIT.java | 137 +++++++++++++++-
9 files changed, 507 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index 24040e6,9030d77..c7756ad
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@@ -17,11 -17,11 +17,13 @@@
package org.apache.accumulo.core.client.impl;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
+ import java.util.Arrays;
import java.util.Collections;
+ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@@ -69,14 -69,13 +71,15 @@@ import org.apache.accumulo.core.securit
import org.apache.accumulo.core.security.VisibilityParseException;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.trace.Trace;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.BadArgumentException;
import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.util.LoggingRunnable;
+ import org.apache.accumulo.core.util.NamingThreadFactory;
-import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.util.LoggingRunnable;
import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.fate.zookeeper.ZooUtil.LockID;
@@@ -379,12 -373,13 +382,12 @@@ class ConditionalWriterImpl implements
}
}
- ConditionalWriterImpl(Instance instance, Credentials credentials, String tableId, ConditionalWriterConfig config) {
- this.instance = instance;
- this.credentials = credentials;
+ ConditionalWriterImpl(ClientContext context, String tableId, ConditionalWriterConfig config) {
+ this.context = context;
this.auths = config.getAuthorizations();
this.ve = new VisibilityEvaluator(config.getAuthorizations());
- this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads());
+ this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads(), new NamingThreadFactory(this.getClass().getSimpleName()));
- this.locator = TabletLocator.getLocator(instance, new Text(tableId));
+ this.locator = TabletLocator.getLocator(context, new Text(tableId));
this.serverQueues = new HashMap<String,ServerQueue>();
this.tableId = tableId;
this.timeout = config.getTimeout(TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
index 031d13f,6f76d77..c739e56
--- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
@@@ -124,22 -109,27 +125,27 @@@ public class IteratorUtil
return props;
}
- public static int getMaxPriority(IteratorScope scope, AccumuloConfiguration conf) {
- List<IterInfo> iters = new ArrayList<IterInfo>();
- parseIterConf(scope, iters, new HashMap<String,Map<String,String>>(), conf);
+ public static void mergeIteratorConfig(List<IterInfo> destList, Map<String,Map<String,String>> destOpts, List<IterInfo> tableIters,
+ Map<String,Map<String,String>> tableOpts, List<IterInfo> ssi, Map<String,Map<String,String>> ssio) {
+ destList.addAll(tableIters);
+ destList.addAll(ssi);
+ Collections.sort(destList, new IterInfoComparator());
- int max = 0;
-
- for (IterInfo iterInfo : iters) {
- if (iterInfo.priority > max)
- max = iterInfo.priority;
+ Set<Entry<String,Map<String,String>>> es = tableOpts.entrySet();
+ for (Entry<String,Map<String,String>> entry : es) {
+ if (entry.getValue() == null) {
+ destOpts.put(entry.getKey(), null);
+ } else {
- destOpts.put(entry.getKey(), new HashMap<String,String>(entry.getValue()));
++ destOpts.put(entry.getKey(), new HashMap<>(entry.getValue()));
+ }
}
- return max;
+ IteratorUtil.mergeOptions(ssio, destOpts);
+
}
- protected static void parseIterConf(IteratorScope scope, List<IterInfo> iters, Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf) {
+ public static void parseIterConf(IteratorScope scope, List<IterInfo> iters, Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf) {
- final Property scopeProperty = IteratorScope.getProperty(scope);
+ final Property scopeProperty = getProperty(scope);
final String scopePropertyKey = scopeProperty.getKey();
for (Entry<String,String> entry : conf.getAllPropertiesWithPrefix(scopeProperty).entrySet()) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
index 0000000,2e34f38..39aa684
mode 000000,100644..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
@@@ -1,0 -1,164 +1,164 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.accumulo.tserver;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+
+ import org.apache.accumulo.core.client.impl.CompressedIterators;
+ import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
+ import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.ArrayByteSequence;
+ import org.apache.accumulo.core.data.ByteSequence;
+ import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.Range;
+ import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.data.thrift.IterInfo;
+ import org.apache.accumulo.core.data.thrift.TCMResult;
+ import org.apache.accumulo.core.data.thrift.TCMStatus;
+ import org.apache.accumulo.core.data.thrift.TCondition;
+ import org.apache.accumulo.core.iterators.IteratorUtil;
+ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+ import org.apache.accumulo.tserver.data.ServerConditionalMutation;
+ import org.apache.hadoop.io.Text;
+
+ import com.google.common.base.Preconditions;
+
+ public class ConditionCheckerContext {
+ private CompressedIterators compressedIters;
+
+ private List<IterInfo> tableIters;
+ private Map<String,Map<String,String>> tableIterOpts;
+ private TabletIteratorEnvironment tie;
+ private String context;
+ private Map<String,Class<? extends SortedKeyValueIterator<Key,Value>>> classCache;
+
+ private static class MergedIterConfig {
+ List<IterInfo> mergedIters;
+ Map<String,Map<String,String>> mergedItersOpts;
+
+ MergedIterConfig(List<IterInfo> mergedIters, Map<String,Map<String,String>> mergedItersOpts) {
+ this.mergedIters = mergedIters;
+ this.mergedItersOpts = mergedItersOpts;
+ }
+ }
+
- private Map<ByteSequence,MergedIterConfig> mergedIterCache = new HashMap<ByteSequence,MergedIterConfig>();
++ private Map<ByteSequence,MergedIterConfig> mergedIterCache = new HashMap<>();
+
+ ConditionCheckerContext(CompressedIterators compressedIters, AccumuloConfiguration tableConf) {
+ this.compressedIters = compressedIters;
+
- tableIters = new ArrayList<IterInfo>();
- tableIterOpts = new HashMap<String,Map<String,String>>();
++ tableIters = new ArrayList<>();
++ tableIterOpts = new HashMap<>();
+
+ // parse table iterator config once
+ IteratorUtil.parseIterConf(IteratorScope.scan, tableIters, tableIterOpts, tableConf);
+
+ context = tableConf.get(Property.TABLE_CLASSPATH);
+
- classCache = new HashMap<String,Class<? extends SortedKeyValueIterator<Key,Value>>>();
++ classCache = new HashMap<>();
+
+ tie = new TabletIteratorEnvironment(IteratorScope.scan, tableConf);
+ }
+
+ SortedKeyValueIterator<Key,Value> buildIterator(SortedKeyValueIterator<Key,Value> systemIter, TCondition tc) throws IOException {
+
+ ArrayByteSequence key = new ArrayByteSequence(tc.iterators);
+ MergedIterConfig mic = mergedIterCache.get(key);
+ if (mic == null) {
+ IterConfig ic = compressedIters.decompress(tc.iterators);
+
- List<IterInfo> mergedIters = new ArrayList<IterInfo>(tableIters.size() + ic.ssiList.size());
- Map<String,Map<String,String>> mergedItersOpts = new HashMap<String,Map<String,String>>(tableIterOpts.size() + ic.ssio.size());
++ List<IterInfo> mergedIters = new ArrayList<>(tableIters.size() + ic.ssiList.size());
++ Map<String,Map<String,String>> mergedItersOpts = new HashMap<>(tableIterOpts.size() + ic.ssio.size());
+
+ IteratorUtil.mergeIteratorConfig(mergedIters, mergedItersOpts, tableIters, tableIterOpts, ic.ssiList, ic.ssio);
+
+ mic = new MergedIterConfig(mergedIters, mergedItersOpts);
+
+ mergedIterCache.put(key, mic);
+ }
+
+ return IteratorUtil.loadIterators(systemIter, mic.mergedIters, mic.mergedItersOpts, tie, true, context, classCache);
+ }
+
+ boolean checkConditions(SortedKeyValueIterator<Key,Value> systemIter, ServerConditionalMutation scm) throws IOException {
+ boolean add = true;
+
+ for (TCondition tc : scm.getConditions()) {
+
+ Range range;
+ if (tc.hasTimestamp)
+ range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
+ else
+ range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
+
+ SortedKeyValueIterator<Key,Value> iter = buildIterator(systemIter, tc);
+
+ ByteSequence cf = new ArrayByteSequence(tc.getCf());
+ iter.seek(range, Collections.singleton(cf), true);
+ Value val = null;
+ if (iter.hasTop()) {
+ val = iter.getTopValue();
+ }
+
+ if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
+ add = false;
+ break;
+ }
+ }
+ return add;
+ }
+
+ public class ConditionChecker {
+
+ private List<ServerConditionalMutation> conditionsToCheck;
+ private List<ServerConditionalMutation> okMutations;
+ private List<TCMResult> results;
+ private boolean checked = false;
+
+ public ConditionChecker(List<ServerConditionalMutation> conditionsToCheck, List<ServerConditionalMutation> okMutations, List<TCMResult> results) {
+ this.conditionsToCheck = conditionsToCheck;
+ this.okMutations = okMutations;
+ this.results = results;
+ }
+
+ public void check(SortedKeyValueIterator<Key,Value> systemIter) throws IOException {
+ Preconditions.checkArgument(!checked, "check() method should only be called once");
+ checked = true;
+
+ for (ServerConditionalMutation scm : conditionsToCheck) {
+ if (checkConditions(systemIter, scm)) {
+ okMutations.add(scm);
+ } else {
+ results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
+ }
+ }
+ }
+ }
+
+ public ConditionChecker newChecker(List<ServerConditionalMutation> conditionsToCheck, List<ServerConditionalMutation> okMutations, List<TCMResult> results) {
+ return new ConditionChecker(conditionsToCheck, okMutations, results);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 1e0d119,6023ae3..038d3e8
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -57,11 -65,8 +57,10 @@@ import java.util.concurrent.locks.Reent
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.CompressedIterators;
- import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
+import org.apache.accumulo.core.client.impl.DurabilityImpl;
import org.apache.accumulo.core.client.impl.ScannerImpl;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.TabletLocator;
@@@ -91,9 -98,9 +90,8 @@@ import org.apache.accumulo.core.data.th
import org.apache.accumulo.core.data.thrift.TCMResult;
import org.apache.accumulo.core.data.thrift.TCMStatus;
import org.apache.accumulo.core.data.thrift.TColumn;
- import org.apache.accumulo.core.data.thrift.TCondition;
import org.apache.accumulo.core.data.thrift.TConditionalMutation;
import org.apache.accumulo.core.data.thrift.TConditionalSession;
-import org.apache.accumulo.core.data.thrift.TKey;
import org.apache.accumulo.core.data.thrift.TKeyExtent;
import org.apache.accumulo.core.data.thrift.TKeyValue;
import org.apache.accumulo.core.data.thrift.TMutation;
@@@ -197,7 -193,21 +195,8 @@@ import org.apache.accumulo.server.zooke
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
import org.apache.accumulo.start.classloader.vfs.ContextManager;
-import org.apache.accumulo.trace.instrument.Span;
-import org.apache.accumulo.trace.instrument.Trace;
-import org.apache.accumulo.trace.thrift.TInfo;
-import org.apache.accumulo.tserver.Compactor.CompactionInfo;
+ import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker;
import org.apache.accumulo.tserver.RowLocks.RowLock;
-import org.apache.accumulo.tserver.Tablet.CommitSession;
-import org.apache.accumulo.tserver.Tablet.KVEntry;
-import org.apache.accumulo.tserver.Tablet.LookupResult;
-import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
-import org.apache.accumulo.tserver.Tablet.ScanBatch;
-import org.apache.accumulo.tserver.Tablet.Scanner;
-import org.apache.accumulo.tserver.Tablet.SplitInfo;
-import org.apache.accumulo.tserver.Tablet.TConstraintViolationException;
-import org.apache.accumulo.tserver.Tablet.TabletClosedException;
import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
@@@ -209,30 -219,10 +208,28 @@@ import org.apache.accumulo.tserver.log.
import org.apache.accumulo.tserver.mastermessage.MasterMessage;
import org.apache.accumulo.tserver.mastermessage.SplitReportMessage;
import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
-import org.apache.accumulo.tserver.metrics.TabletServerMBean;
-import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
+import org.apache.accumulo.tserver.metrics.TabletServerMetricsFactory;
import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
+import org.apache.accumulo.tserver.replication.ReplicationServicerHandler;
+import org.apache.accumulo.tserver.replication.ReplicationWorker;
+import org.apache.accumulo.tserver.scan.LookupTask;
+import org.apache.accumulo.tserver.scan.NextBatchTask;
+import org.apache.accumulo.tserver.scan.ScanRunState;
+import org.apache.accumulo.tserver.session.ConditionalSession;
+import org.apache.accumulo.tserver.session.MultiScanSession;
+import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.Session;
+import org.apache.accumulo.tserver.session.SessionManager;
+import org.apache.accumulo.tserver.session.UpdateSession;
+import org.apache.accumulo.tserver.tablet.CommitSession;
+import org.apache.accumulo.tserver.tablet.CompactionInfo;
+import org.apache.accumulo.tserver.tablet.CompactionWatcher;
+import org.apache.accumulo.tserver.tablet.Compactor;
- import org.apache.accumulo.tserver.tablet.KVEntry;
+import org.apache.accumulo.tserver.tablet.ScanBatch;
- import org.apache.accumulo.tserver.tablet.Scanner;
+import org.apache.accumulo.tserver.tablet.SplitInfo;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletClosedException;
import org.apache.commons.collections.map.LRUMap;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
@@@ -256,151 -256,781 +253,150 @@@ public class TabletServer extends Accum
private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
private static final long TIME_BETWEEN_GC_CHECKS = 5000;
private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 1000;
- private static final Set<Column> EMPTY_COLUMNS = Collections.emptySet();
- private TabletServerLogger logger;
-
- protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
+ private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
+ private final TransactionWatcher watcher = new TransactionWatcher();
+ private final ZooCache masterLockCache = new ZooCache();
- private ServerConfiguration serverConfig;
- private LogSorter logSorter = null;
+ private final TabletServerLogger logger;
- public TabletServer(ServerConfiguration conf, VolumeManager fs) {
- super();
- this.serverConfig = conf;
- this.instance = conf.getInstance();
- this.fs = fs;
+ private final TabletServerMetricsFactory metricsFactory;
+ private final Metrics updateMetrics;
+ private final Metrics scanMetrics;
+ private final Metrics mincMetrics;
- log.info("Version " + Constants.VERSION);
- log.info("Instance " + instance.getInstanceID());
-
- this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
- SimpleTimer.getInstance().schedule(new Runnable() {
- @Override
- public void run() {
- synchronized (onlineTablets) {
- long now = System.currentTimeMillis();
- for (Tablet tablet : onlineTablets.values())
- try {
- tablet.updateRates(now);
- } catch (Exception ex) {
- log.error(ex, ex);
- }
- }
- }
- }, TIME_BETWEEN_GC_CHECKS, TIME_BETWEEN_GC_CHECKS);
- SimpleTimer.getInstance().schedule(new Runnable() {
- @Override
- public void run() {
- TabletLocator.clearLocators();
- }
- }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
+ public Metrics getMinCMetrics() {
+ return mincMetrics;
}
- private static long jitter(long ms) {
- Random r = new Random();
- // add a random 10% wait
- return (long) ((1. + (r.nextDouble() / 10)) * ms);
- }
-
- private synchronized static void logGCInfo(AccumuloConfiguration conf) {
- long now = System.currentTimeMillis();
-
- List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
- Runtime rt = Runtime.getRuntime();
-
- StringBuilder sb = new StringBuilder("gc");
-
- boolean sawChange = false;
-
- long maxIncreaseInCollectionTime = 0;
-
- for (GarbageCollectorMXBean gcBean : gcmBeans) {
- Long prevTime = prevGcTime.get(gcBean.getName());
- long pt = 0;
- if (prevTime != null) {
- pt = prevTime;
- }
-
- long time = gcBean.getCollectionTime();
-
- if (time - pt != 0) {
- sawChange = true;
- }
-
- long increaseInCollectionTime = time - pt;
- sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0));
- maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
- prevGcTime.put(gcBean.getName(), time);
- }
-
- long mem = rt.freeMemory();
- if (maxIncreaseInCollectionTime == 0) {
- gcTimeIncreasedCount = 0;
- } else {
- gcTimeIncreasedCount++;
- if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
- log.warn("Running low on memory");
- gcTimeIncreasedCount = 0;
- }
- }
-
- if (mem > lastMemorySize) {
- sawChange = true;
- }
-
- String sign = "+";
- if (mem - lastMemorySize <= 0) {
- sign = "";
- }
-
- sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory()));
-
- if (sawChange) {
- log.debug(sb.toString());
- }
-
- final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
- if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
- long diff = now - lastMemoryCheckTime;
- if (diff > keepAliveTimeout + 1000) {
- log.warn(String.format("GC pause checker not called in a timely fashion. Expected every %.1f seconds but was %.1f seconds since last check",
- TIME_BETWEEN_GC_CHECKS / 1000., diff / 1000.));
- }
- lastMemoryCheckTime = now;
- return;
- }
-
- if (maxIncreaseInCollectionTime > keepAliveTimeout) {
- Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1);
- }
-
- lastMemorySize = mem;
- lastMemoryCheckTime = now;
- }
-
- private TabletStatsKeeper statsKeeper;
-
- private static class Session {
- long lastAccessTime;
- long startTime;
- String user;
- String client = TServerUtils.clientAddress.get();
- public boolean reserved;
-
- public boolean cleanup() {
- return true;
- }
- }
-
- private static class SessionManager {
-
- SecureRandom random;
- Map<Long,Session> sessions;
- private long maxIdle;
- private long maxUpdateIdle;
- private List<Session> idleSessions = new ArrayList<Session>();
- private final Long expiredSessionMarker = new Long(-1);
-
- SessionManager(AccumuloConfiguration conf) {
- random = new SecureRandom();
- sessions = new HashMap<Long,Session>();
- maxUpdateIdle = conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE);
- maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
-
- Runnable r = new Runnable() {
- @Override
- public void run() {
- sweep(maxIdle, maxUpdateIdle);
- }
- };
-
- SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
- }
-
- synchronized long createSession(Session session, boolean reserve) {
- long sid = random.nextLong();
-
- while (sessions.containsKey(sid)) {
- sid = random.nextLong();
- }
-
- sessions.put(sid, session);
-
- session.reserved = reserve;
-
- session.startTime = session.lastAccessTime = System.currentTimeMillis();
-
- return sid;
- }
-
- long getMaxIdleTime() {
- return maxIdle;
- }
-
- /**
- * while a session is reserved, it cannot be canceled or removed
- */
- synchronized Session reserveSession(long sessionId) {
- Session session = sessions.get(sessionId);
- if (session != null) {
- if (session.reserved)
- throw new IllegalStateException();
- session.reserved = true;
- }
-
- return session;
-
- }
-
- synchronized Session reserveSession(long sessionId, boolean wait) {
- Session session = sessions.get(sessionId);
- if (session != null) {
- while (wait && session.reserved) {
- try {
- wait(1000);
- } catch (InterruptedException e) {
- throw new RuntimeException();
- }
- }
-
- if (session.reserved)
- throw new IllegalStateException();
- session.reserved = true;
- }
-
- return session;
-
- }
-
- synchronized void unreserveSession(Session session) {
- if (!session.reserved)
- throw new IllegalStateException();
- notifyAll();
- session.reserved = false;
- session.lastAccessTime = System.currentTimeMillis();
- }
-
- synchronized void unreserveSession(long sessionId) {
- Session session = getSession(sessionId);
- if (session != null)
- unreserveSession(session);
- }
-
- synchronized Session getSession(long sessionId) {
- Session session = sessions.get(sessionId);
- if (session != null)
- session.lastAccessTime = System.currentTimeMillis();
- return session;
- }
-
- Session removeSession(long sessionId) {
- return removeSession(sessionId, false);
- }
-
- Session removeSession(long sessionId, boolean unreserve) {
- Session session = null;
- synchronized (this) {
- session = sessions.remove(sessionId);
- if (unreserve && session != null)
- unreserveSession(session);
- }
-
- // do clean up out side of lock..
- if (session != null)
- session.cleanup();
-
- return session;
- }
-
- private void sweep(final long maxIdle, final long maxUpdateIdle) {
- ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
- synchronized (this) {
- Iterator<Session> iter = sessions.values().iterator();
- while (iter.hasNext()) {
- Session session = iter.next();
- long configuredIdle = maxIdle;
- if (session instanceof UpdateSession) {
- configuredIdle = maxUpdateIdle;
- }
- long idleTime = System.currentTimeMillis() - session.lastAccessTime;
- if (idleTime > configuredIdle && !session.reserved) {
- log.info("Closing idle session from user=" + session.user + ", client=" + session.client + ", idle=" + idleTime + "ms");
- iter.remove();
- sessionsToCleanup.add(session);
- }
- }
- }
-
- // do clean up outside of lock for TabletServer in a synchronized block for simplicity vice a synchronized list
-
- synchronized (idleSessions) {
-
- sessionsToCleanup.addAll(idleSessions);
-
- idleSessions.clear();
-
- // perform cleanup for all of the sessions
- for (Session session : sessionsToCleanup) {
- if (!session.cleanup())
- idleSessions.add(session);
- }
- }
-
- }
-
- synchronized void removeIfNotAccessed(final long sessionId, final long delay) {
- Session session = sessions.get(sessionId);
- if (session != null) {
- final long removeTime = session.lastAccessTime;
- TimerTask r = new TimerTask() {
- @Override
- public void run() {
- Session sessionToCleanup = null;
- synchronized (SessionManager.this) {
- Session session2 = sessions.get(sessionId);
- if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
- log.info("Closing not accessed session from user=" + session2.user + ", client=" + session2.client + ", duration=" + delay + "ms");
- sessions.remove(sessionId);
- sessionToCleanup = session2;
- }
- }
-
- // call clean up outside of lock
- if (sessionToCleanup != null)
- sessionToCleanup.cleanup();
- }
- };
-
- SimpleTimer.getInstance().schedule(r, delay);
- }
- }
-
- public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
- Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
- Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>();
-
- synchronized (idleSessions) {
- /**
- * Add sessions so that get the list returned in the active scans call
- */
- for (Session session : idleSessions) {
- copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session));
- }
- }
-
- for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) {
-
- Session session = entry.getValue();
- @SuppressWarnings("rawtypes")
- ScanTask nbt = null;
- String tableID = null;
-
- if (session instanceof ScanSession) {
- ScanSession ss = (ScanSession) session;
- nbt = ss.nextBatchTask;
- tableID = ss.extent.getTableId().toString();
- } else if (session instanceof MultiScanSession) {
- MultiScanSession mss = (MultiScanSession) session;
- nbt = mss.lookupTask;
- tableID = mss.threadPoolExtent.getTableId().toString();
- }
-
- if (nbt == null)
- continue;
+ private final LogSorter logSorter;
+ private ReplicationWorker replWorker = null;
+ private final TabletStatsKeeper statsKeeper;
+ private final AtomicInteger logIdGenerator = new AtomicInteger();
- ScanRunState srs = nbt.getScanRunState();
+ private final AtomicLong flushCounter = new AtomicLong(0);
+ private final AtomicLong syncCounter = new AtomicLong(0);
- if (srs == ScanRunState.FINISHED)
- continue;
+ private final VolumeManager fs;
- MapCounter<ScanRunState> stateCounts = counts.get(tableID);
- if (stateCounts == null) {
- stateCounts = new MapCounter<ScanRunState>();
- counts.put(tableID, stateCounts);
- }
+ private final SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
+ private final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
+ private final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
+ @SuppressWarnings("unchecked")
+ private final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap(1000));
- stateCounts.increment(srs, 1);
- }
+ private final TabletServerResourceManager resourceManager;
+ private final SecurityOperation security;
- return counts;
- }
+ private final BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>();
- public synchronized List<ActiveScan> getActiveScans() {
+ private Thread majorCompactorThread;
- final List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
- final long ct = System.currentTimeMillis();
- final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>();
+ private HostAndPort replicationAddress;
+ private HostAndPort clientAddress;
- synchronized (idleSessions) {
- /**
- * Add sessions so that get the list returned in the active scans call
- */
- for (Session session : idleSessions) {
- copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session));
- }
- }
+ private volatile boolean serverStopRequested = false;
+ private volatile boolean majorCompactorDisabled = false;
+ private volatile boolean shutdownComplete = false;
- for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) {
- Session session = entry.getValue();
- if (session instanceof ScanSession) {
- ScanSession ss = (ScanSession) session;
+ private ZooLock tabletServerLock;
- ScanState state = ScanState.RUNNING;
+ private TServer server;
+ private TServer replServer;
- ScanTask<ScanBatch> nbt = ss.nextBatchTask;
- if (nbt == null) {
- state = ScanState.IDLE;
- } else {
- switch (nbt.getScanRunState()) {
- case QUEUED:
- state = ScanState.QUEUED;
- break;
- case FINISHED:
- state = ScanState.IDLE;
- break;
- case RUNNING:
- default:
- /* do nothing */
- break;
- }
- }
+ private DistributedWorkQueue bulkFailedCopyQ;
- ActiveScan activeScan = new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime,
- ScanType.SINGLE, state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio,
- ss.auths.getAuthorizationsBB());
+ private String lockID;
- // scanId added by ACCUMULO-2641 is an optional thrift argument and not available in ActiveScan constructor
- activeScan.setScanId(entry.getKey());
- activeScans.add(activeScan);
+ public static final AtomicLong seekCount = new AtomicLong(0);
- } else if (session instanceof MultiScanSession) {
- MultiScanSession mss = (MultiScanSession) session;
+ private final AtomicLong totalMinorCompactions = new AtomicLong(0);
+ private final ServerConfigurationFactory confFactory;
- ScanState state = ScanState.RUNNING;
+ private final ZooAuthenticationKeyWatcher authKeyWatcher;
- ScanTask<MultiScanResult> nbt = mss.lookupTask;
- if (nbt == null) {
- state = ScanState.IDLE;
- } else {
- switch (nbt.getScanRunState()) {
- case QUEUED:
- state = ScanState.QUEUED;
- break;
- case FINISHED:
- state = ScanState.IDLE;
- break;
- case RUNNING:
- default:
- /* do nothing */
- break;
+ public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) {
+ super(confFactory);
+ this.confFactory = confFactory;
+ this.fs = fs;
+ AccumuloConfiguration aconf = getConfiguration();
+ Instance instance = getInstance();
+ log.info("Version " + Constants.VERSION);
+ log.info("Instance " + instance.getInstanceID());
+ this.sessionManager = new SessionManager(aconf);
+ this.logSorter = new LogSorter(instance, fs, aconf);
+ this.replWorker = new ReplicationWorker(this, fs);
+ this.statsKeeper = new TabletStatsKeeper();
+ SimpleTimer.getInstance(aconf).schedule(new Runnable() {
+ @Override
+ public void run() {
+ synchronized (onlineTablets) {
+ long now = System.currentTimeMillis();
+ for (Tablet tablet : onlineTablets.values())
+ try {
+ tablet.updateRates(now);
+ } catch (Exception ex) {
+ log.error("Error updating rates for {}", tablet.getExtent(), ex);
}
- }
-
- activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
- ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths
- .getAuthorizationsBB()));
}
}
+ }, 5000, 5000);
- return activeScans;
- }
- }
-
- static class TservConstraintEnv implements Environment {
-
- private TCredentials credentials;
- private SecurityOperation security;
- private Authorizations auths;
- private KeyExtent ke;
-
- TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
- this.security = secOp;
- this.credentials = credentials;
- }
-
- void setExtent(KeyExtent ke) {
- this.ke = ke;
- }
-
- @Override
- public KeyExtent getExtent() {
- return ke;
- }
-
- @Override
- public String getUser() {
- return credentials.getPrincipal();
- }
-
- @Override
- @Deprecated
- public Authorizations getAuthorizations() {
- if (auths == null)
- try {
- this.auths = security.getUserAuthorizations(credentials);
- } catch (ThriftSecurityException e) {
- throw new RuntimeException(e);
- }
- return auths;
- }
-
- @Override
- public AuthorizationContainer getAuthorizationsContainer() {
- return new AuthorizationContainer() {
- @Override
- public boolean contains(ByteSequence auth) {
- try {
- return security.userHasAuthorizations(credentials,
- Collections.<ByteBuffer> singletonList(ByteBuffer.wrap(auth.getBackingArray(), auth.offset(), auth.length())));
- } catch (ThriftSecurityException e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
- }
-
- private abstract class ScanTask<T> implements RunnableFuture<T> {
-
- protected AtomicBoolean interruptFlag;
- protected ArrayBlockingQueue<Object> resultQueue;
- protected AtomicInteger state;
- protected AtomicReference<ScanRunState> runState;
-
- private static final int INITIAL = 1;
- private static final int ADDED = 2;
- private static final int CANCELED = 3;
-
- ScanTask() {
- interruptFlag = new AtomicBoolean(false);
- runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
- state = new AtomicInteger(INITIAL);
- resultQueue = new ArrayBlockingQueue<Object>(1);
- }
-
- protected void addResult(Object o) {
- if (state.compareAndSet(INITIAL, ADDED))
- resultQueue.add(o);
- else if (state.get() == ADDED)
- throw new IllegalStateException("Tried to add more than one result");
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (!mayInterruptIfRunning)
- throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
-
- if (state.get() == CANCELED)
- return true;
-
- if (state.compareAndSet(INITIAL, CANCELED)) {
- interruptFlag.set(true);
- resultQueue = null;
- return true;
- }
-
- return false;
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- throw new UnsupportedOperationException();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-
- ArrayBlockingQueue<Object> localRQ = resultQueue;
-
- if (state.get() == CANCELED)
- throw new CancellationException();
-
- if (localRQ == null) {
- int st = state.get();
- String stateStr;
- switch (st) {
- case ADDED:
- stateStr = "ADDED";
- break;
- case CANCELED:
- stateStr = "CANCELED";
- break;
- case INITIAL:
- stateStr = "INITIAL";
- break;
- default:
- stateStr = "UNKNOWN";
- break;
- }
- throw new IllegalStateException("Tried to get result twice [state=" + stateStr + "(" + st + ")]");
- }
-
- Object r = localRQ.poll(timeout, unit);
-
- // could have been canceled while waiting
- if (state.get() == CANCELED) {
- if (r != null)
- throw new IllegalStateException("Nothing should have been added when in canceled state");
+ final long walogMaxSize = aconf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE);
+ final long minBlockSize = CachedConfiguration.getInstance().getLong("dfs.namenode.fs-limits.min-block-size", 0);
+ if (minBlockSize != 0 && minBlockSize > walogMaxSize)
+ throw new RuntimeException("Unable to start TabletServer. Logger is set to use blocksize " + walogMaxSize + " but hdfs minimum block size is "
+ + minBlockSize + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE + " or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml.");
- throw new CancellationException();
+ final long toleratedWalCreationFailures = aconf.getCount(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES);
+ final long walCreationFailureRetryIncrement = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT);
+ final long walCreationFailureRetryMax = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION);
+ // Tolerate `toleratedWalCreationFailures` failures, waiting `walCreationFailureRetryIncrement` milliseconds after the first failure,
+ // incrementing the next wait period by the same value, for a maximum of `walCreationFailureRetryMax` retries.
+ final RetryFactory walCreationRetryFactory = new RetryFactory(toleratedWalCreationFailures, walCreationFailureRetryIncrement,
+ walCreationFailureRetryIncrement, walCreationFailureRetryMax);
+
+ logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter, walCreationRetryFactory);
+ this.resourceManager = new TabletServerResourceManager(this, fs);
+ this.security = AuditedSecurityOperation.getInstance(this);
+
+ metricsFactory = new TabletServerMetricsFactory(aconf);
+ updateMetrics = metricsFactory.createUpdateMetrics();
+ scanMetrics = metricsFactory.createScanMetrics();
+ mincMetrics = metricsFactory.createMincMetrics();
+ SimpleTimer.getInstance(aconf).schedule(new Runnable() {
+ @Override
+ public void run() {
+ TabletLocator.clearLocators();
}
+ }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
- if (r == null)
- throw new TimeoutException();
-
- // make this method stop working now that something is being
- // returned
- resultQueue = null;
-
- if (r instanceof Throwable)
- throw new ExecutionException((Throwable) r);
-
- return (T) r;
- }
-
- @Override
- public boolean isCancelled() {
- return state.get() == CANCELED;
- }
-
- @Override
- public boolean isDone() {
- return runState.get().equals(ScanRunState.FINISHED);
- }
-
- public ScanRunState getScanRunState() {
- return runState.get();
- }
-
- }
-
- private static class ConditionalSession extends Session {
- public TCredentials credentials;
- public Authorizations auths;
- public String tableId;
- public AtomicBoolean interruptFlag;
-
- @Override
- public boolean cleanup() {
- interruptFlag.set(true);
- return true;
- }
- }
-
- private static class UpdateSession extends Session {
- public Tablet currentTablet;
- public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
- Map<KeyExtent,Long> failures = new HashMap<KeyExtent,Long>();
- HashMap<KeyExtent,SecurityErrorCode> authFailures = new HashMap<KeyExtent,SecurityErrorCode>();
- public Violations violations;
- public TCredentials credentials;
- public long totalUpdates = 0;
- public long flushTime = 0;
- Stat prepareTimes = new Stat();
- Stat walogTimes = new Stat();
- Stat commitTimes = new Stat();
- Stat authTimes = new Stat();
- public Map<Tablet,List<Mutation>> queuedMutations = new HashMap<Tablet,List<Mutation>>();
- public long queuedMutationSize = 0;
- TservConstraintEnv cenv = null;
- }
-
- private static class ScanSession extends Session {
- public KeyExtent extent;
- public HashSet<Column> columnSet;
- public List<IterInfo> ssiList;
- public Map<String,Map<String,String>> ssio;
- public Authorizations auths;
- public long entriesReturned = 0;
- public Stat nbTimes = new Stat();
- public long batchCount = 0;
- public volatile ScanTask<ScanBatch> nextBatchTask;
- public AtomicBoolean interruptFlag;
- public Scanner scanner;
- public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
-
- @Override
- public boolean cleanup() {
- final boolean ret;
- try {
- if (nextBatchTask != null)
- nextBatchTask.cancel(true);
- } finally {
- if (scanner != null)
- ret = scanner.close();
- else
- ret = true;
- }
- return ret;
+ // Create the secret manager
+ setSecretManager(new AuthenticationTokenSecretManager(instance, aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME)));
+ if (aconf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+ log.info("SASL is enabled, creating ZooKeeper watcher for AuthenticationKeys");
+ // Watcher to notice new AuthenticationKeys which enable delegation tokens
+ authKeyWatcher = new ZooAuthenticationKeyWatcher(getSecretManager(), ZooReaderWriter.getInstance(), ZooUtil.getRoot(instance)
+ + Constants.ZDELEGATION_TOKEN_KEYS);
+ } else {
+ authKeyWatcher = null;
}
-
}
- private static class MultiScanSession extends Session {
- HashSet<Column> columnSet;
- Map<KeyExtent,List<Range>> queries;
- public List<IterInfo> ssiList;
- public Map<String,Map<String,String>> ssio;
- public Authorizations auths;
-
- // stats
- int numRanges;
- int numTablets;
- int numEntries;
- long totalLookupTime;
-
- public volatile ScanTask<MultiScanResult> lookupTask;
- public KeyExtent threadPoolExtent;
-
- @Override
- public boolean cleanup() {
- if (lookupTask != null)
- lookupTask.cancel(true);
- // the cancellation should provide us the safety to return true here
- return true;
- }
+ private static long jitter(long ms) {
+ Random r = new Random();
+ // add a random 10% wait
+ return (long) ((1. + (r.nextDouble() / 10)) * ms);
}
- /**
- * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
- * are monotonically increasing.
- *
- */
- static class WriteTracker {
- private static AtomicLong operationCounter = new AtomicLong(1);
- private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
-
- WriteTracker() {
- for (TabletType ttype : TabletType.values()) {
- inProgressWrites.put(ttype, new TreeSet<Long>());
- }
- }
-
- synchronized long startWrite(TabletType ttype) {
- long operationId = operationCounter.getAndIncrement();
- inProgressWrites.get(ttype).add(operationId);
- return operationId;
- }
-
- synchronized void finishWrite(long operationId) {
- if (operationId == -1)
- return;
+ private final SessionManager sessionManager;
- boolean removed = false;
+ private final WriteTracker writeTracker = new WriteTracker();
- for (TabletType ttype : TabletType.values()) {
- removed = inProgressWrites.get(ttype).remove(operationId);
- if (removed)
- break;
- }
+ private final RowLocks rowLocks = new RowLocks();
- if (!removed) {
- throw new IllegalArgumentException("Attempted to finish write not in progress, operationId " + operationId);
- }
-
- this.notifyAll();
- }
-
- synchronized void waitForWrites(TabletType ttype) {
- long operationId = operationCounter.getAndIncrement();
- while (inProgressWrites.get(ttype).floor(operationId) != null) {
- try {
- this.wait();
- } catch (InterruptedException e) {
- log.error(e, e);
- }
- }
- }
-
- public long startWrite(Set<Tablet> keySet) {
- if (keySet.size() == 0)
- return -1;
-
- ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
-
- for (Tablet tablet : keySet)
- extents.add(tablet.getExtent());
-
- return startWrite(TabletType.type(extents));
- }
- }
-
- public AccumuloConfiguration getSystemConfiguration() {
- return serverConfig.getConfiguration();
- }
-
- TransactionWatcher watcher = new TransactionWatcher();
+ private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
+ private final ReentrantLock recoveryLock = new ReentrantLock(true);
private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
@@@ -1071,6 -1930,7 +1067,7 @@@
Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
final CompressedIterators compressedIters = new CompressedIterators(symbols);
- ConditionCheckerContext checkerContext = new ConditionCheckerContext(compressedIters, ServerConfiguration.getTableConfiguration(instance, cs.tableId));
++ ConditionCheckerContext checkerContext = new ConditionCheckerContext(compressedIters, confFactory.getTableConfiguration(cs.tableId));
while (iter.hasNext()) {
final Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
@@@ -1082,65 -1942,44 +1079,28 @@@
iter.remove();
} else {
final List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size());
+ final List<TCMResult> resultsSubList = results.subList(results.size(), results.size());
- for (ServerConditionalMutation scm : entry.getValue()) {
- if (checkCondition(results, cs, compressedIters, tablet, scm))
- okMutations.add(scm);
- }
-
- entry.setValue(okMutations);
- }
-
- }
- }
-
- private boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession cs, CompressedIterators compressedIters, Tablet tablet,
- ServerConditionalMutation scm) throws IOException {
- boolean add = true;
-
- for (TCondition tc : scm.getConditions()) {
-
- Range range;
- if (tc.hasTimestamp)
- range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
- else
- range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
-
- IterConfig ic = compressedIters.decompress(tc.iterators);
-
- Scanner scanner = tablet.createScanner(range, 1, EMPTY_COLUMNS, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag);
-
- try {
- ScanBatch batch = scanner.read();
-
- Value val = null;
+ ConditionChecker checker = checkerContext.newChecker(entry.getValue(), okMutations, resultsSubList);
+ try {
+ tablet.checkConditions(checker, cs.auths, cs.interruptFlag);
- for (KVEntry entry2 : batch.getResults()) {
- val = entry2.getValue();
- break;
- }
+ if (okMutations.size() > 0) {
+ entry.setValue(okMutations);
+ } else {
+ iter.remove();
+ }
- } catch (TabletClosedException e) {
- // clear anything added while checking conditions.
- resultsSubList.clear();
-
- for (ServerConditionalMutation scm : entry.getValue()) {
- results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
- }
- iter.remove();
- } catch (IterationInterruptedException e) {
- // clear anything added while checking conditions.
- resultsSubList.clear();
-
- for (ServerConditionalMutation scm : entry.getValue()) {
- results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
- }
- iter.remove();
- } catch (TooManyFilesException e) {
++ } catch (TabletClosedException | IterationInterruptedException | TooManyFilesException e) {
+ // clear anything added while checking conditions.
+ resultsSubList.clear();
- if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
- results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
- add = false;
- break;
+ for (ServerConditionalMutation scm : entry.getValue()) {
+ results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ }
+ iter.remove();
}
-
- } catch (TabletClosedException e) {
- results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
- add = false;
- break;
- } catch (IterationInterruptedException iie) {
- results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
- add = false;
- break;
- } catch (TooManyFilesException tmfe) {
- results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
- add = false;
- break;
}
}
- return add;
}
private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession sess) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
index 33277bd,0000000..f586e2e
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@@ -1,228 -1,0 +1,247 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
++import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
++import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
+import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
+import org.apache.accumulo.core.iterators.system.StatsIterator;
+import org.apache.accumulo.core.iterators.system.VisibilityFilter;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.tserver.FileManager.ScanFileManager;
+import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
+import org.apache.accumulo.tserver.TabletIteratorEnvironment;
+import org.apache.accumulo.tserver.TabletServer;
+
+class ScanDataSource implements DataSource {
+
+ // data source state
+ private final Tablet tablet;
+ private ScanFileManager fileManager;
+ private SortedKeyValueIterator<Key,Value> iter;
+ private long expectedDeletionCount;
+ private List<MemoryIterator> memIters = null;
+ private long fileReservationId;
+ private AtomicBoolean interruptFlag;
+ private StatsIterator statsIterator;
+
+ private final ScanOptions options;
++ private final boolean loadIters;
++
++ private static final Set<Column> EMPTY_COLS = Collections.emptySet();
+
+ ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList,
+ Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag) {
+ this.tablet = tablet;
+ expectedDeletionCount = tablet.getDataSourceDeletions();
+ this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false);
+ this.interruptFlag = interruptFlag;
++ this.loadIters = true;
+ }
+
+ ScanDataSource(Tablet tablet, ScanOptions options) {
+ this.tablet = tablet;
+ expectedDeletionCount = tablet.getDataSourceDeletions();
+ this.options = options;
+ this.interruptFlag = options.getInterruptFlag();
++ this.loadIters = true;
++ }
++
++ ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] defaultLabels, AtomicBoolean iFlag) {
++ this.tablet = tablet;
++ expectedDeletionCount = tablet.getDataSourceDeletions();
++ this.options = new ScanOptions(-1, authorizations, defaultLabels, EMPTY_COLS, null, null, iFlag, false);
++ this.interruptFlag = iFlag;
++ this.loadIters = false;
+ }
+
+ @Override
+ public DataSource getNewDataSource() {
+ if (!isCurrent()) {
+ // log.debug("Switching data sources during a scan");
+ if (memIters != null) {
+ tablet.getTabletMemory().returnIterators(memIters);
+ memIters = null;
+ tablet.getDatafileManager().returnFilesForScan(fileReservationId);
+ fileReservationId = -1;
+ }
+
+ if (fileManager != null)
+ fileManager.releaseOpenFiles(false);
+
+ expectedDeletionCount = tablet.getDataSourceDeletions();
+ iter = null;
+
+ return this;
+ } else
+ return this;
+ }
+
+ @Override
+ public boolean isCurrent() {
+ return expectedDeletionCount == tablet.getDataSourceDeletions();
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
+ if (iter == null)
+ iter = createIterator();
+ return iter;
+ }
+
+ private SortedKeyValueIterator<Key,Value> createIterator() throws IOException {
+
+ Map<FileRef,DataFileValue> files;
+
+ synchronized (tablet) {
+
+ if (memIters != null)
+ throw new IllegalStateException("Tried to create new scan iterator w/o releasing memory");
+
+ if (tablet.isClosed())
+ throw new TabletClosedException();
+
+ if (interruptFlag.get())
+ throw new IterationInterruptedException(tablet.getExtent().toString() + " " + interruptFlag.hashCode());
+
+ // only acquire the file manager when we know the tablet is open
+ if (fileManager == null) {
+ fileManager = tablet.getTabletResources().newScanFileManager();
+ tablet.addActiveScans(this);
+ }
+
+ if (fileManager.getNumOpenFiles() != 0)
+ throw new IllegalStateException("Tried to create new scan iterator w/o releasing files");
+
+ // set this before trying to get iterators in case
+ // getIterators() throws an exception
+ expectedDeletionCount = tablet.getDataSourceDeletions();
+
+ memIters = tablet.getTabletMemory().getIterators();
+ Pair<Long,Map<FileRef,DataFileValue>> reservation = tablet.getDatafileManager().reserveFilesForScan();
+ fileReservationId = reservation.getFirst();
+ files = reservation.getSecond();
+ }
+
+ Collection<InterruptibleIterator> mapfiles = fileManager.openFiles(files, options.isIsolated());
+
+ List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(mapfiles.size() + memIters.size());
+
+ iters.addAll(mapfiles);
+ iters.addAll(memIters);
+
+ for (SortedKeyValueIterator<Key,Value> skvi : iters)
+ ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag);
+
+ MultiIterator multiIter = new MultiIterator(iters, tablet.getExtent());
+
+ TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan, tablet.getTableConfiguration(), fileManager, files,
+ options.getAuthorizations());
+
+ statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, tablet.getScannedCounter());
+
+ DeletingIterator delIter = new DeletingIterator(statsIterator, false);
+
+ ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+
+ ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, options.getColumnSet());
+
+ VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.getAuthorizations(), options.getDefaultLabels());
+
- return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, tablet.getExtent(), tablet.getTableConfiguration(),
- options.getSsiList(), options.getSsio(), iterEnv));
++ if (!loadIters) {
++ return visFilter;
++ } else {
++ return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, tablet.getExtent(), tablet.getTableConfiguration(),
++ options.getSsiList(), options.getSsio(), iterEnv));
++ }
+ }
+
+ void close(boolean sawErrors) {
+
+ if (memIters != null) {
+ tablet.getTabletMemory().returnIterators(memIters);
+ memIters = null;
+ tablet.getDatafileManager().returnFilesForScan(fileReservationId);
+ fileReservationId = -1;
+ }
+
+ synchronized (tablet) {
+ if (tablet.removeScan(this) == 0)
+ tablet.notifyAll();
+ }
+
+ if (fileManager != null) {
+ fileManager.releaseOpenFiles(sawErrors);
+ fileManager = null;
+ }
+
+ if (statsIterator != null) {
+ statsIterator.report();
+ }
+
+ }
+
+ public void interrupt() {
+ interruptFlag.set(true);
+ }
+
+ @Override
+ public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void reattachFileManager() throws IOException {
+ if (fileManager != null)
+ fileManager.reattach();
+ }
+
+ public void detachFileManager() {
+ if (fileManager != null)
+ fileManager.detach();
+ }
+
+ @Override
+ public void setInterruptFlag(AtomicBoolean flag) {
+ throw new UnsupportedOperationException();
+ }
+
+}