You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/09/08 23:03:41 UTC
git commit: ACCUMULO-1950 use server-wide memory count
Repository: accumulo
Updated Branches:
refs/heads/master 2533b7eff -> ef701cabb
ACCUMULO-1950 use server-wide memory count
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ef701cab
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ef701cab
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ef701cab
Branch: refs/heads/master
Commit: ef701cabbd5e1fed993bbd13e86c40b7d72040d0
Parents: 2533b7e
Author: Eric C. Newton <er...@gmail.com>
Authored: Mon Sep 8 16:38:49 2014 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Mon Sep 8 16:38:49 2014 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/core/conf/Property.java | 6 +-
.../core/master/thrift/TabletServerStatus.java | 198 ++++++++++++++++++-
core/src/main/thrift/master.thrift | 2 +
.../apache/accumulo/tserver/TabletServer.java | 30 ++-
.../tserver/TabletServerResourceManager.java | 2 +-
.../apache/accumulo/tserver/log/DfsLogger.java | 12 +-
.../tserver/log/TabletServerLogger.java | 15 +-
.../test/randomwalk/concurrent/Config.java | 2 +-
.../org/apache/accumulo/test/TotalQueuedIT.java | 129 ++++++++++++
.../accumulo/test/functional/BloomFilterIT.java | 2 +-
10 files changed, 374 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 5401c7c..35cd0a6 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -197,11 +197,15 @@ public enum Property {
TSERV_INDEXCACHE_SIZE("tserver.cache.index.size", "512M", PropertyType.MEMORY, "Specifies the size of the cache for file indices."),
TSERV_PORTSEARCH("tserver.port.search", "false", PropertyType.BOOLEAN, "if the ports above are in use, search higher ports until one is available"),
TSERV_CLIENTPORT("tserver.port.client", "9997", PropertyType.PORT, "The port used for handling client connections on the tablet servers"),
+ @Deprecated
TSERV_MUTATION_QUEUE_MAX("tserver.mutation.queue.max", "1M", PropertyType.MEMORY,
- "The amount of memory to use to store write-ahead-log mutations-per-session before flushing them. Since the buffer is per write session, consider the"
+ "This setting is deprecated. See tserver.total.mutation.queue.max. "
+ + "The amount of memory to use to store write-ahead-log mutations-per-session before flushing them. Since the buffer is per write session, consider the"
+ " max number of concurrent writer when configuring. When using Hadoop 2, Accumulo will call hsync() on the WAL . For a small number of "
+ "concurrent writers, increasing this buffer size decreases the frequncy of hsync calls. For a large number of concurrent writers a small buffers "
+ "size is ok because of group commit."),
+ TSERV_TOTAL_MUTATION_QUEUE_MAX("tserver.total.mutation.queue.max", "50M", PropertyType.MEMORY,
+ "The amount of memory used to store write-ahead-log mutations before flushing them."),
TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN("tserver.tablet.split.midpoint.files.max", "30", PropertyType.COUNT,
"To find a tablets split points, all index files are opened. This setting determines how many index "
+ "files can be opened at once. When there are more index files than this setting multiple passes "
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java b/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java
index 6348537..e7e6e0e 100644
--- a/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java
+++ b/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java
@@ -62,6 +62,8 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField DATA_CACHE_HITS_FIELD_DESC = new org.apache.thrift.protocol.TField("dataCacheHits", org.apache.thrift.protocol.TType.I64, (short)12);
private static final org.apache.thrift.protocol.TField DATA_CACHE_REQUEST_FIELD_DESC = new org.apache.thrift.protocol.TField("dataCacheRequest", org.apache.thrift.protocol.TType.I64, (short)13);
private static final org.apache.thrift.protocol.TField LOG_SORTS_FIELD_DESC = new org.apache.thrift.protocol.TField("logSorts", org.apache.thrift.protocol.TType.LIST, (short)14);
+ private static final org.apache.thrift.protocol.TField FLUSHS_FIELD_DESC = new org.apache.thrift.protocol.TField("flushs", org.apache.thrift.protocol.TType.I64, (short)15);
+ private static final org.apache.thrift.protocol.TField SYNCS_FIELD_DESC = new org.apache.thrift.protocol.TField("syncs", org.apache.thrift.protocol.TType.I64, (short)16);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -80,6 +82,8 @@ import org.slf4j.LoggerFactory;
public long dataCacheHits; // required
public long dataCacheRequest; // required
public List<RecoveryStatus> logSorts; // required
+ public long flushs; // required
+ public long syncs; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -93,7 +97,9 @@ import org.slf4j.LoggerFactory;
INDEX_CACHE_REQUEST((short)11, "indexCacheRequest"),
DATA_CACHE_HITS((short)12, "dataCacheHits"),
DATA_CACHE_REQUEST((short)13, "dataCacheRequest"),
- LOG_SORTS((short)14, "logSorts");
+ LOG_SORTS((short)14, "logSorts"),
+ FLUSHS((short)15, "flushs"),
+ SYNCS((short)16, "syncs");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -130,6 +136,10 @@ import org.slf4j.LoggerFactory;
return DATA_CACHE_REQUEST;
case 14: // LOG_SORTS
return LOG_SORTS;
+ case 15: // FLUSHS
+ return FLUSHS;
+ case 16: // SYNCS
+ return SYNCS;
default:
return null;
}
@@ -178,7 +188,9 @@ import org.slf4j.LoggerFactory;
private static final int __INDEXCACHEREQUEST_ISSET_ID = 5;
private static final int __DATACACHEHITS_ISSET_ID = 6;
private static final int __DATACACHEREQUEST_ISSET_ID = 7;
- private byte __isset_bitfield = 0;
+ private static final int __FLUSHS_ISSET_ID = 8;
+ private static final int __SYNCS_ISSET_ID = 9;
+ private short __isset_bitfield = 0;
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -207,6 +219,10 @@ import org.slf4j.LoggerFactory;
tmpMap.put(_Fields.LOG_SORTS, new org.apache.thrift.meta_data.FieldMetaData("logSorts", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, RecoveryStatus.class))));
+ tmpMap.put(_Fields.FLUSHS, new org.apache.thrift.meta_data.FieldMetaData("flushs", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ tmpMap.put(_Fields.SYNCS, new org.apache.thrift.meta_data.FieldMetaData("syncs", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TabletServerStatus.class, metaDataMap);
}
@@ -225,7 +241,9 @@ import org.slf4j.LoggerFactory;
long indexCacheRequest,
long dataCacheHits,
long dataCacheRequest,
- List<RecoveryStatus> logSorts)
+ List<RecoveryStatus> logSorts,
+ long flushs,
+ long syncs)
{
this();
this.tableMap = tableMap;
@@ -247,6 +265,10 @@ import org.slf4j.LoggerFactory;
this.dataCacheRequest = dataCacheRequest;
setDataCacheRequestIsSet(true);
this.logSorts = logSorts;
+ this.flushs = flushs;
+ setFlushsIsSet(true);
+ this.syncs = syncs;
+ setSyncsIsSet(true);
}
/**
@@ -287,6 +309,8 @@ import org.slf4j.LoggerFactory;
}
this.logSorts = __this__logSorts;
}
+ this.flushs = other.flushs;
+ this.syncs = other.syncs;
}
public TabletServerStatus deepCopy() {
@@ -314,6 +338,10 @@ import org.slf4j.LoggerFactory;
setDataCacheRequestIsSet(false);
this.dataCacheRequest = 0;
this.logSorts = null;
+ setFlushsIsSet(false);
+ this.flushs = 0;
+ setSyncsIsSet(false);
+ this.syncs = 0;
}
public int getTableMapSize() {
@@ -598,6 +626,52 @@ import org.slf4j.LoggerFactory;
}
}
+ public long getFlushs() {
+ return this.flushs;
+ }
+
+ public TabletServerStatus setFlushs(long flushs) {
+ this.flushs = flushs;
+ setFlushsIsSet(true);
+ return this;
+ }
+
+ public void unsetFlushs() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __FLUSHS_ISSET_ID);
+ }
+
+ /** Returns true if field flushs is set (has been assigned a value) and false otherwise */
+ public boolean isSetFlushs() {
+ return EncodingUtils.testBit(__isset_bitfield, __FLUSHS_ISSET_ID);
+ }
+
+ public void setFlushsIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __FLUSHS_ISSET_ID, value);
+ }
+
+ public long getSyncs() {
+ return this.syncs;
+ }
+
+ public TabletServerStatus setSyncs(long syncs) {
+ this.syncs = syncs;
+ setSyncsIsSet(true);
+ return this;
+ }
+
+ public void unsetSyncs() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __SYNCS_ISSET_ID);
+ }
+
+ /** Returns true if field syncs is set (has been assigned a value) and false otherwise */
+ public boolean isSetSyncs() {
+ return EncodingUtils.testBit(__isset_bitfield, __SYNCS_ISSET_ID);
+ }
+
+ public void setSyncsIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __SYNCS_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case TABLE_MAP:
@@ -688,6 +762,22 @@ import org.slf4j.LoggerFactory;
}
break;
+ case FLUSHS:
+ if (value == null) {
+ unsetFlushs();
+ } else {
+ setFlushs((Long)value);
+ }
+ break;
+
+ case SYNCS:
+ if (value == null) {
+ unsetSyncs();
+ } else {
+ setSyncs((Long)value);
+ }
+ break;
+
}
}
@@ -726,6 +816,12 @@ import org.slf4j.LoggerFactory;
case LOG_SORTS:
return getLogSorts();
+ case FLUSHS:
+ return Long.valueOf(getFlushs());
+
+ case SYNCS:
+ return Long.valueOf(getSyncs());
+
}
throw new IllegalStateException();
}
@@ -759,6 +855,10 @@ import org.slf4j.LoggerFactory;
return isSetDataCacheRequest();
case LOG_SORTS:
return isSetLogSorts();
+ case FLUSHS:
+ return isSetFlushs();
+ case SYNCS:
+ return isSetSyncs();
}
throw new IllegalStateException();
}
@@ -875,6 +975,24 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_flushs = true;
+ boolean that_present_flushs = true;
+ if (this_present_flushs || that_present_flushs) {
+ if (!(this_present_flushs && that_present_flushs))
+ return false;
+ if (this.flushs != that.flushs)
+ return false;
+ }
+
+ boolean this_present_syncs = true;
+ boolean that_present_syncs = true;
+ if (this_present_syncs || that_present_syncs) {
+ if (!(this_present_syncs && that_present_syncs))
+ return false;
+ if (this.syncs != that.syncs)
+ return false;
+ }
+
return true;
}
@@ -1001,6 +1119,26 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetFlushs()).compareTo(other.isSetFlushs());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetFlushs()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.flushs, other.flushs);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetSyncs()).compareTo(other.isSetSyncs());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetSyncs()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.syncs, other.syncs);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -1076,6 +1214,14 @@ import org.slf4j.LoggerFactory;
sb.append(this.logSorts);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("flushs:");
+ sb.append(this.flushs);
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("syncs:");
+ sb.append(this.syncs);
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -1233,6 +1379,22 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 15: // FLUSHS
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.flushs = iprot.readI64();
+ struct.setFlushsIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 16: // SYNCS
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.syncs = iprot.readI64();
+ struct.setSyncsIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -1302,6 +1464,12 @@ import org.slf4j.LoggerFactory;
}
oprot.writeFieldEnd();
}
+ oprot.writeFieldBegin(FLUSHS_FIELD_DESC);
+ oprot.writeI64(struct.flushs);
+ oprot.writeFieldEnd();
+ oprot.writeFieldBegin(SYNCS_FIELD_DESC);
+ oprot.writeI64(struct.syncs);
+ oprot.writeFieldEnd();
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -1353,7 +1521,13 @@ import org.slf4j.LoggerFactory;
if (struct.isSetLogSorts()) {
optionals.set(10);
}
- oprot.writeBitSet(optionals, 11);
+ if (struct.isSetFlushs()) {
+ optionals.set(11);
+ }
+ if (struct.isSetSyncs()) {
+ optionals.set(12);
+ }
+ oprot.writeBitSet(optionals, 13);
if (struct.isSetTableMap()) {
{
oprot.writeI32(struct.tableMap.size());
@@ -1400,12 +1574,18 @@ import org.slf4j.LoggerFactory;
}
}
}
+ if (struct.isSetFlushs()) {
+ oprot.writeI64(struct.flushs);
+ }
+ if (struct.isSetSyncs()) {
+ oprot.writeI64(struct.syncs);
+ }
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, TabletServerStatus struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
- BitSet incoming = iprot.readBitSet(11);
+ BitSet incoming = iprot.readBitSet(13);
if (incoming.get(0)) {
{
org.apache.thrift.protocol.TMap _map11 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
@@ -1472,6 +1652,14 @@ import org.slf4j.LoggerFactory;
}
struct.setLogSortsIsSet(true);
}
+ if (incoming.get(11)) {
+ struct.flushs = iprot.readI64();
+ struct.setFlushsIsSet(true);
+ }
+ if (incoming.get(12)) {
+ struct.syncs = iprot.readI64();
+ struct.setSyncsIsSet(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/core/src/main/thrift/master.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/master.thrift b/core/src/main/thrift/master.thrift
index 49fa262..72ba3a5 100644
--- a/core/src/main/thrift/master.thrift
+++ b/core/src/main/thrift/master.thrift
@@ -60,6 +60,8 @@ struct TabletServerStatus {
12:i64 dataCacheHits
13:i64 dataCacheRequest
14:list<RecoveryStatus> logSorts
+ 15:i64 flushs
+ 16:i64 syncs
}
enum MasterState {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index dc9f27f..b4fbfed 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -263,6 +263,9 @@ public class TabletServer implements Runnable {
private final TabletStatsKeeper statsKeeper;
private final AtomicInteger logIdGenerator = new AtomicInteger();
+ private final AtomicLong flushCounter = new AtomicLong(0);
+ private final AtomicLong syncCounter = new AtomicLong(0);
+
private final VolumeManager fs;
public Instance getInstance() {
return serverConfig.getInstance();
@@ -333,7 +336,7 @@ public class TabletServer implements Runnable {
if (minBlockSize != 0 && minBlockSize > walogMaxSize)
throw new RuntimeException("Unable to start TabletServer. Logger is set to use blocksize " + walogMaxSize + " but hdfs minimum block size is "
+ minBlockSize + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE + " or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml.");
- logger = new TabletServerLogger(this, walogMaxSize);
+ logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter);
this.resourceManager = new TabletServerResourceManager(getInstance(), fs);
}
@@ -351,6 +354,8 @@ public class TabletServer implements Runnable {
private final RowLocks rowLocks = new RowLocks();
+ private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
+
private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
ThriftClientHandler() {
@@ -727,13 +732,17 @@ public class TabletServer implements Runnable {
setUpdateTablet(us, keyExtent);
if (us.currentTablet != null) {
+ long additionalMutationSize = 0;
List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
for (TMutation tmutation : tmutations) {
Mutation mutation = new ServerMutation(tmutation);
mutations.add(mutation);
- us.queuedMutationSize += mutation.numBytes();
+ additionalMutationSize += mutation.numBytes();
}
- if (us.queuedMutationSize > TabletServer.this.getConfiguration().getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX)) {
+ us.queuedMutationSize += additionalMutationSize;
+ long totalQueued = updateTotalQueuedMutationSize(additionalMutationSize);
+ long total = TabletServer.this.getConfiguration().getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
+ if (totalQueued > total) {
flush(us);
}
}
@@ -777,7 +786,6 @@ public class TabletServer implements Runnable {
}
us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
} else {
- log.debug("Durablity for " + tablet.getExtent() + " durability " + us.durability + " table durability " + tabletDurability + " using " + DurabilityImpl.resolveDurabilty(us.durability, tabletDurability));
sendables.put(commitSession, new Mutations(DurabilityImpl.resolveDurabilty(us.durability, tabletDurability), mutations));
mutationCount += mutations.size();
}
@@ -789,10 +797,8 @@ public class TabletServer implements Runnable {
if (e.getNonViolators().size() > 0) {
// only log and commit mutations if there were some
- // that did not
- // violate constraints... this is what
- // prepareMutationsForCommit()
- // expects
+ // that did not violate constraints... this is what
+ // prepareMutationsForCommit() expects
sendables.put(e.getCommitSession(), new Mutations(DurabilityImpl.resolveDurabilty(us.durability, tabletDurability), e.getNonViolators()));
}
@@ -882,6 +888,7 @@ public class TabletServer implements Runnable {
if (us.currentTablet != null) {
us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
}
+ updateTotalQueuedMutationSize(-us.queuedMutationSize);
us.queuedMutationSize = 0;
}
us.totalUpdates += mutationCount;
@@ -1758,6 +1765,10 @@ public class TabletServer implements Runnable {
return majorCompactorDisabled;
}
+ public long updateTotalQueuedMutationSize(long additionalMutationSize) {
+ return totalQueuedMutationSize.addAndGet(additionalMutationSize);
+ }
+
public Tablet getOnlineTablet(KeyExtent extent) {
return onlineTablets.get(extent);
}
@@ -2845,6 +2856,8 @@ public class TabletServer implements Runnable {
result.dataCacheHits = resourceManager.getDataCache().getStats().getHitCount();
result.dataCacheRequest = resourceManager.getDataCache().getStats().getRequestCount();
result.logSorts = logSorter.getLogSorts();
+ result.flushs = flushCounter.get();
+ result.syncs = syncCounter.get();
return result;
}
@@ -2961,5 +2974,4 @@ public class TabletServer implements Runnable {
public double getHoldTimeMillis() {
return resourceManager.holdTime();
}
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 3d42c7c..25c0ee8 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -156,7 +156,7 @@ public class TabletServerResourceManager {
long blockSize = acuConf.getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE);
long dCacheSize = acuConf.getMemoryInBytes(Property.TSERV_DATACACHE_SIZE);
long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE);
- long totalQueueSize = acuConf.getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX);
+ long totalQueueSize = acuConf.getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
_iCache = new LruBlockCache(iCacheSize, blockSize);
_dCache = new LruBlockCache(dCacheSize, blockSize);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 50475c2..6747c14 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -39,6 +39,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -169,6 +170,11 @@ public class DfsLogger {
try {
if (durabilityMethod != null) {
durabilityMethod.invoke(logFile);
+ if (durabilityMethod == sync) {
+ syncCounter.incrementAndGet();
+ } else {
+ flushCounter.incrementAndGet();
+ }
}
} catch (Exception ex) {
log.warn("Exception syncing " + ex);
@@ -248,9 +254,13 @@ public class DfsLogger {
/* Track what's actually in +r/!0 for this logger ref */
private String metaReference;
+ private AtomicLong syncCounter;
+ private AtomicLong flushCounter;
- public DfsLogger(ServerResources conf) throws IOException {
+ public DfsLogger(ServerResources conf, AtomicLong syncCounter, AtomicLong flushCounter) throws IOException {
this.conf = conf;
+ this.syncCounter = syncCounter;
+ this.flushCounter = flushCounter;
}
/**
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 2a540e5..bdd3016 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -84,6 +84,9 @@ public class TabletServerLogger {
private final AtomicInteger seqGen = new AtomicInteger();
+ private final AtomicLong syncCounter;
+ private final AtomicLong flushCounter;
+
static private abstract class TestCallWithWriteLock {
abstract boolean test();
@@ -128,9 +131,11 @@ public class TabletServerLogger {
}
}
- public TabletServerLogger(TabletServer tserver, long maxSize) {
+ public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong syncCounter, AtomicLong flushCounter) {
this.tserver = tserver;
this.maxSize = maxSize;
+ this.syncCounter = syncCounter;
+ this.flushCounter = flushCounter;
}
private int initializeLoggers(final List<DfsLogger> copy) throws IOException {
@@ -184,7 +189,7 @@ public class TabletServerLogger {
}
try {
- DfsLogger alog = new DfsLogger(tserver.getServerConfig());
+ DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
alog.open(tserver.getClientAddressString());
loggers.add(alog);
logSetId.incrementAndGet();
@@ -381,8 +386,7 @@ public class TabletServerLogger {
final Map<CommitSession,Mutations> loggables = new HashMap<CommitSession,Mutations>(mutations);
for (Entry<CommitSession,Mutations> entry : mutations.entrySet()) {
- Durability durability = entry.getValue().getDurability();
- if (durability == Durability.NONE) {
+ if (entry.getValue().getDurability() == Durability.NONE) {
loggables.remove(entry.getKey());
}
}
@@ -402,8 +406,9 @@ public class TabletServerLogger {
}
});
for (Mutations entry : loggables.values()) {
- if (entry.getMutations().size() < 1)
+ if (entry.getMutations().size() < 1) {
throw new IllegalArgumentException("logManyTablets: logging empty mutation list");
+ }
for (Mutation m : entry.getMutations()) {
logSizeEstimate.addAndGet(m.numBytes());
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java
index 4af85a7..8d14574 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java
@@ -72,7 +72,7 @@ public class Config extends Test {
s(Property.TSERV_MAXMEM, 1000000, 3 * 1024 * 1024 * 1024L),
s(Property.TSERV_READ_AHEAD_MAXCONCURRENT, 1, 25),
s(Property.TSERV_MIGRATE_MAXCONCURRENT, 1, 10),
- s(Property.TSERV_MUTATION_QUEUE_MAX, 10000, 1024 * 1024),
+ s(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, 10000, 1024 * 1024),
s(Property.TSERV_RECOVERY_MAX_CONCURRENT, 1, 100),
s(Property.TSERV_SCAN_MAX_OPENFILES, 10, 1000),
s(Property.TSERV_THREADCHECK, 100, 10000),
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java b/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java
new file mode 100644
index 0000000..a794088
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+// see ACCUMULO-1950
+public class TotalQueuedIT extends ConfigurableMacIT {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
+ cfg.setDefaultMemory(cfg.getDefaultMemory() * 2, MemoryUnit.BYTE);
+ cfg.useMiniDFS();
+ }
+
+ int SMALL_QUEUE_SIZE = 100000;
+ int LARGE_QUEUE_SIZE = SMALL_QUEUE_SIZE * 10;
+ static final long N = 1000000;
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void test() throws Exception {
+ Random random = new Random();
+ Connector c = getConnector();
+ c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "" + SMALL_QUEUE_SIZE);
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "9999");
+ c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "999");
+ UtilWaitThread.sleep(1000);
+ // get an idea of how fast the syncs occur
+ byte row[] = new byte[250];
+ BatchWriterConfig cfg = new BatchWriterConfig();
+ cfg.setMaxWriteThreads(10);
+ cfg.setMaxLatency(1, TimeUnit.SECONDS);
+ cfg.setMaxMemory(1024*1024);
+ long realSyncs = getSyncs();
+ BatchWriter bw = c.createBatchWriter(tableName, cfg);
+ long now = System.currentTimeMillis();
+ long bytesSent = 0;
+ for (int i = 0; i < N; i++) {
+ random.nextBytes(row);
+ Mutation m = new Mutation(row);
+ m.put("", "", "");
+ bw.addMutation(m);
+ bytesSent += m.estimatedMemoryUsed();
+ }
+ bw.close();
+ long diff = System.currentTimeMillis() - now;
+ double secs = diff / 1000.;
+ double syncs = bytesSent / SMALL_QUEUE_SIZE;
+ double syncsPerSec = syncs / secs;
+ System.out.println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", bytesSent, secs, ((long)syncs), syncsPerSec));
+ long update = getSyncs();
+ System.out.println("Syncs " + (update - realSyncs));
+ realSyncs = update;
+
+ // Now with a much bigger total queue
+ c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "" + LARGE_QUEUE_SIZE);
+ c.tableOperations().flush(tableName, null, null, true);
+ UtilWaitThread.sleep(1000);
+ bw = c.createBatchWriter(tableName, cfg);
+ now = System.currentTimeMillis();
+ bytesSent = 0;
+ for (int i = 0; i < N; i++) {
+ random.nextBytes(row);
+ Mutation m = new Mutation(row);
+ m.put("", "", "");
+ bw.addMutation(m);
+ bytesSent += m.estimatedMemoryUsed();
+ }
+ bw.close();
+ diff = System.currentTimeMillis() - now;
+ secs = diff / 1000.;
+ syncs = bytesSent / LARGE_QUEUE_SIZE;
+ syncsPerSec = syncs / secs;
+ System.out.println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", bytesSent, secs, ((long)syncs), syncsPerSec));
+ update = getSyncs();
+ System.out.println("Syncs " + (update - realSyncs));
+ assertTrue(update - realSyncs < realSyncs);
+ }
+
+ private long getSyncs() throws Exception {
+ Connector c = getConnector();
+ Credentials credentials = new Credentials("root", new PasswordToken(ROOT_PASSWORD.getBytes()));
+ for (String address : c.instanceOperations().getTabletServers()) {
+ TabletClientService.Client client = ThriftUtil.getTServerClient(address, DefaultConfiguration.getDefaultConfiguration());
+ TabletServerStatus status = client.getTabletServerStatus(null, credentials.toThrift(c.getInstance()));
+ return status.syncs;
+ }
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java
index 8f6b830..6ee671e 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java
@@ -57,7 +57,7 @@ public class BloomFilterIT extends ConfigurableMacIT {
siteConfig.put(Property.TABLE_BLOOM_SIZE.getKey(), "2000000");
siteConfig.put(Property.TABLE_BLOOM_ERRORRATE.getKey(), "1%");
siteConfig.put(Property.TABLE_BLOOM_LOAD_THRESHOLD.getKey(), "0");
- siteConfig.put(Property.TSERV_MUTATION_QUEUE_MAX.getKey(), "10M");
+ siteConfig.put(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "10M");
siteConfig.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64K");
cfg.setSiteConfig(siteConfig );
}