You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/06/09 22:01:03 UTC
[1/2] git commit: ACCUMULO-1919 add missing import
Repository: accumulo
Updated Branches:
refs/heads/master 3f6c9e533 -> 8070e1ca0
ACCUMULO-1919 add missing import
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bbc3b5ab
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bbc3b5ab
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bbc3b5ab
Branch: refs/heads/master
Commit: bbc3b5ab60f60a8793f6608a9e844a400b4516d0
Parents: 3f6c9e5
Author: Eric C. Newton <er...@gmail.com>
Authored: Mon Jun 9 15:20:02 2014 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Mon Jun 9 15:20:02 2014 -0400
----------------------------------------------------------------------
.../accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/bbc3b5ab/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java
index bd7c6bf..2d9fc04 100644
--- a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java
+++ b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java
@@ -23,6 +23,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Instance;
[2/2] git commit: ACCUMUMLO-2255 more little cleanups and class
extractions
Posted by ec...@apache.org.
ACCUMUMLO-2255 more little cleanups and class extractions
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8070e1ca
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8070e1ca
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8070e1ca
Branch: refs/heads/master
Commit: 8070e1ca0eb97aa700cb910b05854ce58653a1a3
Parents: bbc3b5a
Author: Eric C. Newton <er...@gmail.com>
Authored: Mon Jun 9 16:01:07 2014 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Mon Jun 9 16:01:07 2014 -0400
----------------------------------------------------------------------
.../apache/accumulo/tserver/SessionManager.java | 298 ++++++++++++++++
.../apache/accumulo/tserver/TabletServer.java | 344 +------------------
.../apache/accumulo/tserver/WriteTracker.java | 80 +++++
.../apache/accumulo/tserver/tablet/Tablet.java | 38 +-
4 files changed, 393 insertions(+), 367 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8070e1ca/server/tserver/src/main/java/org/apache/accumulo/tserver/SessionManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/SessionManager.java
new file mode 100644
index 0000000..e8fc010
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/SessionManager.java
@@ -0,0 +1,298 @@
+package org.apache.accumulo.tserver;
+
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.Translators;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.thrift.MultiScanResult;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ScanState;
+import org.apache.accumulo.core.tabletserver.thrift.ScanType;
+import org.apache.accumulo.core.util.MapCounter;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.tserver.TabletServer.ScanRunState;
+import org.apache.accumulo.tserver.TabletServer.ScanTask;
+import org.apache.accumulo.tserver.session.MultiScanSession;
+import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.Session;
+import org.apache.accumulo.tserver.tablet.ScanBatch;
+
+public class SessionManager {
+
+ private final SecureRandom random = new SecureRandom();
+ private final Map<Long,Session> sessions = new HashMap<Long,Session>();
+ private final long maxIdle;
+ private final AccumuloConfiguration aconf;
+
+ SessionManager(AccumuloConfiguration conf) {
+ aconf = conf;
+ maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ sweep(maxIdle);
+ }
+ };
+
+ SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000));
+ }
+
+ synchronized long createSession(Session session, boolean reserve) {
+ long sid = random.nextLong();
+
+ while (sessions.containsKey(sid)) {
+ sid = random.nextLong();
+ }
+
+ sessions.put(sid, session);
+
+ session.reserved = reserve;
+
+ session.startTime = session.lastAccessTime = System.currentTimeMillis();
+
+ return sid;
+ }
+
+ long getMaxIdleTime() {
+ return maxIdle;
+ }
+
+ /**
+ * while a session is reserved, it cannot be canceled or removed
+ *
+ * @param sessionId
+ */
+
+ synchronized Session reserveSession(long sessionId) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ if (session.reserved)
+ throw new IllegalStateException();
+ session.reserved = true;
+ }
+
+ return session;
+
+ }
+
+ synchronized Session reserveSession(long sessionId, boolean wait) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ while (wait && session.reserved) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException();
+ }
+ }
+
+ if (session.reserved)
+ throw new IllegalStateException();
+ session.reserved = true;
+ }
+
+ return session;
+
+ }
+
+ synchronized void unreserveSession(Session session) {
+ if (!session.reserved)
+ throw new IllegalStateException();
+ notifyAll();
+ session.reserved = false;
+ session.lastAccessTime = System.currentTimeMillis();
+ }
+
+ synchronized void unreserveSession(long sessionId) {
+ Session session = getSession(sessionId);
+ if (session != null)
+ unreserveSession(session);
+ }
+
+ synchronized Session getSession(long sessionId) {
+ Session session = sessions.get(sessionId);
+ if (session != null)
+ session.lastAccessTime = System.currentTimeMillis();
+ return session;
+ }
+
+ Session removeSession(long sessionId) {
+ return removeSession(sessionId, false);
+ }
+
+ Session removeSession(long sessionId, boolean unreserve) {
+ Session session = null;
+ synchronized (this) {
+ session = sessions.remove(sessionId);
+ if (unreserve && session != null)
+ unreserveSession(session);
+ }
+
+ // do clean up out side of lock..
+ if (session != null)
+ session.cleanup();
+
+ return session;
+ }
+
+ private void sweep(long maxIdle) {
+ List<Session> sessionsToCleanup = new ArrayList<Session>();
+ synchronized (this) {
+ Iterator<Session> iter = sessions.values().iterator();
+ while (iter.hasNext()) {
+ Session session = iter.next();
+ long idleTime = System.currentTimeMillis() - session.lastAccessTime;
+ if (idleTime > maxIdle && !session.reserved) {
+ iter.remove();
+ sessionsToCleanup.add(session);
+ }
+ }
+ }
+
+ // do clean up outside of lock
+ for (Session session : sessionsToCleanup) {
+ session.cleanup();
+ }
+ }
+
+ synchronized void removeIfNotAccessed(final long sessionId, long delay) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ final long removeTime = session.lastAccessTime;
+ TimerTask r = new TimerTask() {
+ @Override
+ public void run() {
+ Session sessionToCleanup = null;
+ synchronized (SessionManager.this) {
+ Session session2 = sessions.get(sessionId);
+ if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
+ sessions.remove(sessionId);
+ sessionToCleanup = session2;
+ }
+ }
+
+ // call clean up outside of lock
+ if (sessionToCleanup != null)
+ sessionToCleanup.cleanup();
+ }
+ };
+
+ SimpleTimer.getInstance(aconf).schedule(r, delay);
+ }
+ }
+
+ public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
+ Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
+ for (Entry<Long,Session> entry : sessions.entrySet()) {
+
+ Session session = entry.getValue();
+ @SuppressWarnings("rawtypes")
+ ScanTask nbt = null;
+ String tableID = null;
+
+ if (session instanceof ScanSession) {
+ ScanSession ss = (ScanSession) session;
+ nbt = ss.nextBatchTask;
+ tableID = ss.extent.getTableId().toString();
+ } else if (session instanceof MultiScanSession) {
+ MultiScanSession mss = (MultiScanSession) session;
+ nbt = mss.lookupTask;
+ tableID = mss.threadPoolExtent.getTableId().toString();
+ }
+
+ if (nbt == null)
+ continue;
+
+ ScanRunState srs = nbt.getScanRunState();
+
+ if (srs == ScanRunState.FINISHED)
+ continue;
+
+ MapCounter<ScanRunState> stateCounts = counts.get(tableID);
+ if (stateCounts == null) {
+ stateCounts = new MapCounter<ScanRunState>();
+ counts.put(tableID, stateCounts);
+ }
+
+ stateCounts.increment(srs, 1);
+ }
+
+ return counts;
+ }
+
+ public synchronized List<ActiveScan> getActiveScans() {
+
+ List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
+
+ long ct = System.currentTimeMillis();
+
+ for (Entry<Long,Session> entry : sessions.entrySet()) {
+ Session session = entry.getValue();
+ if (session instanceof ScanSession) {
+ ScanSession ss = (ScanSession) session;
+
+ ScanState state = ScanState.RUNNING;
+
+ ScanTask<ScanBatch> nbt = ss.nextBatchTask;
+ if (nbt == null) {
+ state = ScanState.IDLE;
+ } else {
+ switch (nbt.getScanRunState()) {
+ case QUEUED:
+ state = ScanState.QUEUED;
+ break;
+ case FINISHED:
+ state = ScanState.IDLE;
+ break;
+ case RUNNING:
+ default:
+ /* do nothing */
+ break;
+ }
+ }
+
+ activeScans.add(new ActiveScan(ss.client, ss.getUser(), ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
+ state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
+
+ } else if (session instanceof MultiScanSession) {
+ MultiScanSession mss = (MultiScanSession) session;
+
+ ScanState state = ScanState.RUNNING;
+
+ ScanTask<MultiScanResult> nbt = mss.lookupTask;
+ if (nbt == null) {
+ state = ScanState.IDLE;
+ } else {
+ switch (nbt.getScanRunState()) {
+ case QUEUED:
+ state = ScanState.QUEUED;
+ break;
+ case FINISHED:
+ state = ScanState.IDLE;
+ break;
+ case RUNNING:
+ default:
+ /* do nothing */
+ break;
+ }
+ }
+
+ activeScans.add(new ActiveScan(mss.client, mss.getUser(), mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
+ ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths
+ .getAuthorizationsBB()));
+ }
+ }
+
+ return activeScans;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8070e1ca/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index ce84bd3..17ee446 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -24,12 +24,10 @@ import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
-import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -119,8 +117,6 @@ import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
-import org.apache.accumulo.core.tabletserver.thrift.ScanState;
-import org.apache.accumulo.core.tabletserver.thrift.ScanType;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
@@ -208,7 +204,6 @@ import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
import org.apache.accumulo.tserver.session.ConditionalSession;
import org.apache.accumulo.tserver.session.MultiScanSession;
import org.apache.accumulo.tserver.session.ScanSession;
-import org.apache.accumulo.tserver.session.Session;
import org.apache.accumulo.tserver.session.UpdateSession;
import org.apache.accumulo.tserver.tablet.CommitSession;
import org.apache.accumulo.tserver.tablet.CompactionInfo;
@@ -334,278 +329,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
this.resourceManager = new TabletServerResourceManager(getInstance(), fs);
}
- public static class SessionManager {
-
- private final SecureRandom random = new SecureRandom();
- private final Map<Long,Session> sessions = new HashMap<Long,Session>();
- private final long maxIdle;
- private final AccumuloConfiguration aconf;
-
- SessionManager(AccumuloConfiguration conf) {
- aconf = conf;
- maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
-
- Runnable r = new Runnable() {
- @Override
- public void run() {
- sweep(maxIdle);
- }
- };
-
- SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000));
- }
-
- synchronized long createSession(Session session, boolean reserve) {
- long sid = random.nextLong();
-
- while (sessions.containsKey(sid)) {
- sid = random.nextLong();
- }
-
- sessions.put(sid, session);
-
- session.reserved = reserve;
-
- session.startTime = session.lastAccessTime = System.currentTimeMillis();
-
- return sid;
- }
-
- long getMaxIdleTime() {
- return maxIdle;
- }
-
- /**
- * while a session is reserved, it cannot be canceled or removed
- *
- * @param sessionId
- */
-
- synchronized Session reserveSession(long sessionId) {
- Session session = sessions.get(sessionId);
- if (session != null) {
- if (session.reserved)
- throw new IllegalStateException();
- session.reserved = true;
- }
-
- return session;
-
- }
-
- synchronized Session reserveSession(long sessionId, boolean wait) {
- Session session = sessions.get(sessionId);
- if (session != null) {
- while (wait && session.reserved) {
- try {
- wait(1000);
- } catch (InterruptedException e) {
- throw new RuntimeException();
- }
- }
-
- if (session.reserved)
- throw new IllegalStateException();
- session.reserved = true;
- }
-
- return session;
-
- }
-
- synchronized void unreserveSession(Session session) {
- if (!session.reserved)
- throw new IllegalStateException();
- notifyAll();
- session.reserved = false;
- session.lastAccessTime = System.currentTimeMillis();
- }
-
- synchronized void unreserveSession(long sessionId) {
- Session session = getSession(sessionId);
- if (session != null)
- unreserveSession(session);
- }
-
- synchronized Session getSession(long sessionId) {
- Session session = sessions.get(sessionId);
- if (session != null)
- session.lastAccessTime = System.currentTimeMillis();
- return session;
- }
-
- Session removeSession(long sessionId) {
- return removeSession(sessionId, false);
- }
-
- Session removeSession(long sessionId, boolean unreserve) {
- Session session = null;
- synchronized (this) {
- session = sessions.remove(sessionId);
- if (unreserve && session != null)
- unreserveSession(session);
- }
-
- // do clean up out side of lock..
- if (session != null)
- session.cleanup();
-
- return session;
- }
-
- private void sweep(long maxIdle) {
- List<Session> sessionsToCleanup = new ArrayList<Session>();
- synchronized (this) {
- Iterator<Session> iter = sessions.values().iterator();
- while (iter.hasNext()) {
- Session session = iter.next();
- long idleTime = System.currentTimeMillis() - session.lastAccessTime;
- if (idleTime > maxIdle && !session.reserved) {
- iter.remove();
- sessionsToCleanup.add(session);
- }
- }
- }
-
- // do clean up outside of lock
- for (Session session : sessionsToCleanup) {
- session.cleanup();
- }
- }
-
- synchronized void removeIfNotAccessed(final long sessionId, long delay) {
- Session session = sessions.get(sessionId);
- if (session != null) {
- final long removeTime = session.lastAccessTime;
- TimerTask r = new TimerTask() {
- @Override
- public void run() {
- Session sessionToCleanup = null;
- synchronized (SessionManager.this) {
- Session session2 = sessions.get(sessionId);
- if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
- sessions.remove(sessionId);
- sessionToCleanup = session2;
- }
- }
-
- // call clean up outside of lock
- if (sessionToCleanup != null)
- sessionToCleanup.cleanup();
- }
- };
-
- SimpleTimer.getInstance(aconf).schedule(r, delay);
- }
- }
-
- public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
- Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
- for (Entry<Long,Session> entry : sessions.entrySet()) {
-
- Session session = entry.getValue();
- @SuppressWarnings("rawtypes")
- ScanTask nbt = null;
- String tableID = null;
-
- if (session instanceof ScanSession) {
- ScanSession ss = (ScanSession) session;
- nbt = ss.nextBatchTask;
- tableID = ss.extent.getTableId().toString();
- } else if (session instanceof MultiScanSession) {
- MultiScanSession mss = (MultiScanSession) session;
- nbt = mss.lookupTask;
- tableID = mss.threadPoolExtent.getTableId().toString();
- }
-
- if (nbt == null)
- continue;
-
- ScanRunState srs = nbt.getScanRunState();
-
- if (srs == ScanRunState.FINISHED)
- continue;
-
- MapCounter<ScanRunState> stateCounts = counts.get(tableID);
- if (stateCounts == null) {
- stateCounts = new MapCounter<ScanRunState>();
- counts.put(tableID, stateCounts);
- }
-
- stateCounts.increment(srs, 1);
- }
-
- return counts;
- }
-
- public synchronized List<ActiveScan> getActiveScans() {
-
- List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
-
- long ct = System.currentTimeMillis();
-
- for (Entry<Long,Session> entry : sessions.entrySet()) {
- Session session = entry.getValue();
- if (session instanceof ScanSession) {
- ScanSession ss = (ScanSession) session;
-
- ScanState state = ScanState.RUNNING;
-
- ScanTask<ScanBatch> nbt = ss.nextBatchTask;
- if (nbt == null) {
- state = ScanState.IDLE;
- } else {
- switch (nbt.getScanRunState()) {
- case QUEUED:
- state = ScanState.QUEUED;
- break;
- case FINISHED:
- state = ScanState.IDLE;
- break;
- case RUNNING:
- default:
- /* do nothing */
- break;
- }
- }
-
- activeScans.add(new ActiveScan(ss.client, ss.getUser(), ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
- state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
-
- } else if (session instanceof MultiScanSession) {
- MultiScanSession mss = (MultiScanSession) session;
-
- ScanState state = ScanState.RUNNING;
-
- ScanTask<MultiScanResult> nbt = mss.lookupTask;
- if (nbt == null) {
- state = ScanState.IDLE;
- } else {
- switch (nbt.getScanRunState()) {
- case QUEUED:
- state = ScanState.QUEUED;
- break;
- case FINISHED:
- state = ScanState.IDLE;
- break;
- case RUNNING:
- default:
- /* do nothing */
- break;
- }
- }
-
- activeScans.add(new ActiveScan(mss.client, mss.getUser(), mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
- ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths
- .getAuthorizationsBB()));
- }
- }
-
- return activeScans;
- }
- }
-
- public abstract class ScanTask<T> implements RunnableFuture<T> {
+ public static abstract class ScanTask<T> implements RunnableFuture<T> {
protected AtomicBoolean interruptFlag;
protected ArrayBlockingQueue<Object> resultQueue;
@@ -703,70 +427,6 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
- /**
- * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
- * are monotonically increasing.
- *
- */
- static class WriteTracker {
- private static final AtomicLong operationCounter = new AtomicLong(1);
- private final Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
-
- WriteTracker() {
- for (TabletType ttype : TabletType.values()) {
- inProgressWrites.put(ttype, new TreeSet<Long>());
- }
- }
-
- synchronized long startWrite(TabletType ttype) {
- long operationId = operationCounter.getAndIncrement();
- inProgressWrites.get(ttype).add(operationId);
- return operationId;
- }
-
- synchronized void finishWrite(long operationId) {
- if (operationId == -1)
- return;
-
- boolean removed = false;
-
- for (TabletType ttype : TabletType.values()) {
- removed = inProgressWrites.get(ttype).remove(operationId);
- if (removed)
- break;
- }
-
- if (!removed) {
- throw new IllegalArgumentException("Attempted to finish write not in progress, operationId " + operationId);
- }
-
- this.notifyAll();
- }
-
- synchronized void waitForWrites(TabletType ttype) {
- long operationId = operationCounter.getAndIncrement();
- while (inProgressWrites.get(ttype).floor(operationId) != null) {
- try {
- this.wait();
- } catch (InterruptedException e) {
- log.error(e, e);
- }
- }
- }
-
- public long startWrite(Set<Tablet> keySet) {
- if (keySet.size() == 0)
- return -1;
-
- List<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
-
- for (Tablet tablet : keySet)
- extents.add(tablet.getExtent());
-
- return startWrite(TabletType.type(extents));
- }
- }
-
public AccumuloConfiguration getSystemConfiguration() {
return serverConfig.getConfiguration();
}
@@ -3496,7 +3156,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
if (this.isEnabled()) {
int result = 0;
for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
- if (tablet.majorCompactionRunning())
+ if (tablet.isMajorCompactionRunning())
result++;
}
return result;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8070e1ca/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java
new file mode 100644
index 0000000..2eed484
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java
@@ -0,0 +1,80 @@
+package org.apache.accumulo.tserver;
+
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.client.impl.TabletType;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.log4j.Logger;
+
+/**
+ * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
+ * are monotonically increasing.
+ *
+ */
+class WriteTracker {
+ private static Logger log = Logger.getLogger(WriteTracker.class);
+
+ private static final AtomicLong operationCounter = new AtomicLong(1);
+ private final Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
+
+ WriteTracker() {
+ for (TabletType ttype : TabletType.values()) {
+ inProgressWrites.put(ttype, new TreeSet<Long>());
+ }
+ }
+
+ synchronized long startWrite(TabletType ttype) {
+ long operationId = operationCounter.getAndIncrement();
+ inProgressWrites.get(ttype).add(operationId);
+ return operationId;
+ }
+
+ synchronized void finishWrite(long operationId) {
+ if (operationId == -1)
+ return;
+
+ boolean removed = false;
+
+ for (TabletType ttype : TabletType.values()) {
+ removed = inProgressWrites.get(ttype).remove(operationId);
+ if (removed)
+ break;
+ }
+
+ if (!removed) {
+ throw new IllegalArgumentException("Attempted to finish write not in progress, operationId " + operationId);
+ }
+
+ this.notifyAll();
+ }
+
+ synchronized void waitForWrites(TabletType ttype) {
+ long operationId = operationCounter.getAndIncrement();
+ while (inProgressWrites.get(ttype).floor(operationId) != null) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ }
+ }
+ }
+
+ public long startWrite(Set<Tablet> keySet) {
+ if (keySet.size() == 0)
+ return -1;
+
+ List<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
+
+ for (Tablet tablet : keySet)
+ extents.add(tablet.getExtent());
+
+ return startWrite(TabletType.type(extents));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/8070e1ca/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 8d800d8..b615925 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -261,7 +261,7 @@ public class Tablet implements TabletCommitter {
}
/**
- * Only visibile for testing
+ * Only visible for testing
*/
@VisibleForTesting
protected Tablet(TabletTime tabletTime, String tabletDirectory, int logId, Path location, DatafileManager datafileManager, TabletServer tabletServer,
@@ -346,7 +346,6 @@ public class Tablet implements TabletCommitter {
mdScanner.setRange(new Range(rowName));
for (Entry<Key,Value> entry : mdScanner) {
-
if (entry.getKey().compareRow(rowName) != 0) {
break;
}
@@ -1324,7 +1323,7 @@ public class Tablet implements TabletCommitter {
// wait for major compactions to finish, setting closing to
// true should cause any running major compactions to abort
- while (majorCompactionRunning()) {
+ while (isMajorCompactionRunning()) {
try {
this.wait(50);
} catch (InterruptedException e) {
@@ -1510,7 +1509,7 @@ public class Tablet implements TabletCommitter {
public synchronized boolean initiateMajorCompaction(MajorCompactionReason reason) {
- if (isClosing() || isClosed() || !needsMajorCompaction(reason) || majorCompactionRunning() || majorCompactionQueued.contains(reason)) {
+ if (isClosing() || isClosed() || !needsMajorCompaction(reason) || isMajorCompactionRunning() || majorCompactionQueued.contains(reason)) {
return false;
}
@@ -1526,7 +1525,7 @@ public class Tablet implements TabletCommitter {
*
*/
public boolean needsMajorCompaction(MajorCompactionReason reason) {
- if (majorCompactionRunning())
+ if (isMajorCompactionRunning())
return false;
if (reason == MajorCompactionReason.CHOP || reason == MajorCompactionReason.USER)
return true;
@@ -1534,7 +1533,7 @@ public class Tablet implements TabletCommitter {
}
/**
- * Returns an int representing the total block size of the f served by this tablet.
+ * Returns an int representing the total block size of the files served by this tablet.
*
* @return size
*/
@@ -1695,14 +1694,9 @@ public class Tablet implements TabletCommitter {
*
*/
public synchronized boolean needsSplit() {
- boolean ret;
-
if (isClosing() || isClosed())
- ret = false;
- else
- ret = findSplitRow(getDatafileManager().getFiles()) != null;
-
- return ret;
+ return false;
+ return findSplitRow(getDatafileManager().getFiles()) != null;
}
// BEGIN PRIVATE METHODS RELATED TO MAJOR COMPACTION
@@ -1972,7 +1966,7 @@ public class Tablet implements TabletCommitter {
// check that compaction is still needed - defer to splitting
majorCompactionQueued.remove(reason);
- if (isClosing() || isClosed ()|| !needsMajorCompaction(reason) || majorCompactionRunning() || needsSplit()) {
+ if (isClosing() || isClosed ()|| !needsMajorCompaction(reason) || isMajorCompactionRunning() || needsSplit()) {
return null;
}
@@ -2061,7 +2055,7 @@ public class Tablet implements TabletCommitter {
return closeState == CloseState.COMPLETE;
}
- public boolean majorCompactionRunning() {
+ public boolean isMajorCompactionRunning() {
return majorCompactionState == CompactionState.IN_PROGRESS;
}
@@ -2268,12 +2262,10 @@ public class Tablet implements TabletCommitter {
private Set<DfsLogger> currentLogs = new HashSet<DfsLogger>();
- public Set<String> getCurrentLogFiles() {
+ public synchronized Set<String> getCurrentLogFiles() {
Set<String> result = new HashSet<String>();
- synchronized (currentLogs) {
- for (DfsLogger log : currentLogs) {
- result.add(log.getFileName());
- }
+ for (DfsLogger log : currentLogs) {
+ result.add(log.getFileName());
}
return result;
}
@@ -2425,7 +2417,7 @@ public class Tablet implements TabletCommitter {
if (lastCompactID >= compactionId)
return;
- if (isClosing() || isClosed() || majorCompactionQueued.contains(MajorCompactionReason.USER) || majorCompactionRunning())
+ if (isClosing() || isClosed() || majorCompactionQueued.contains(MajorCompactionReason.USER) || isMajorCompactionRunning())
return;
if (getDatafileManager().getDatafileSizes().size() == 0) {
@@ -2562,10 +2554,6 @@ public class Tablet implements TabletCommitter {
minorCompactionState = null;
}
- public boolean isMajorCompactionRunning() {
- return majorCompactionState == CompactionState.IN_PROGRESS;
- }
-
public TabletStats getTabletStats() {
return timer.getTabletStats();
}