You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/02/02 23:53:19 UTC
[1/3] accumulo git commit: ACCUMULO-3549 periodically clear the
locator cache
Repository: accumulo
Updated Branches:
refs/heads/master c3493c19b -> a6a2be8aa
ACCUMULO-3549 periodically clear the locator cache
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ed12e1f3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ed12e1f3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ed12e1f3
Branch: refs/heads/master
Commit: ed12e1f380736f3fcb897b95561ef94eba075aee
Parents: a88299f
Author: Eric C. Newton <er...@gmail.com>
Authored: Fri Jan 30 18:01:05 2015 -0500
Committer: Eric C. Newton <er...@gmail.com>
Committed: Fri Jan 30 19:31:49 2015 -0500
----------------------------------------------------------------------
.../apache/accumulo/tserver/TabletServer.java | 25 ++++++++++++++++++++
1 file changed, 25 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ed12e1f3/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index dfec999..948d0f0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -38,6 +38,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
@@ -64,11 +65,13 @@ import javax.management.StandardMBean;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.CompressedIterators;
import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
import org.apache.accumulo.core.client.impl.ScannerImpl;
import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.impl.TabletType;
import org.apache.accumulo.core.client.impl.Translator;
import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator;
@@ -252,6 +255,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
private static final long TIME_BETWEEN_GC_CHECKS = 5000;
+ private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 1000;
private TabletServerLogger logger;
@@ -280,6 +284,27 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
}
}, TIME_BETWEEN_GC_CHECKS, TIME_BETWEEN_GC_CHECKS);
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ SystemCredentials creds = SystemCredentials.get();
+ Connector connector = instance.getConnector(creds.getPrincipal(), creds.getToken());
+ for (String id : connector.tableOperations().tableIdMap().values()) {
+ TabletLocator locator = TabletLocator.getLocator(instance, new Text(id));
+ locator.invalidateCache();
+ }
+ } catch (Exception ex) {
+ log.error("Error clearing locator cache, ignoring", ex);
+ }
+ }
+ }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
+ }
+
+ private static long jitter(long ms) {
+ Random r = new Random();
+ // add a random 10% wait
+ return (long)(1. + (r.nextDouble() / 10) * ms);
}
private synchronized static void logGCInfo(AccumuloConfiguration conf) {
[2/3] accumulo git commit: ACCUMULO-3549 updated with [~kturner]'s
comments
Posted by ec...@apache.org.
ACCUMULO-3549 updated with [~kturner]'s comments
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c5d2bd54
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c5d2bd54
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c5d2bd54
Branch: refs/heads/master
Commit: c5d2bd54a961ce59946304c6d440b42caa9bdbbb
Parents: ed12e1f
Author: Eric C. Newton <er...@gmail.com>
Authored: Mon Feb 2 17:37:17 2015 -0500
Committer: Eric C. Newton <er...@gmail.com>
Committed: Mon Feb 2 17:37:17 2015 -0500
----------------------------------------------------------------------
.../accumulo/core/client/impl/TabletLocator.java | 4 ++++
.../org/apache/accumulo/tserver/TabletServer.java | 14 ++------------
2 files changed, 6 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5d2bd54/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
index 1d35af0..5f30ddc 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
@@ -92,6 +92,10 @@ public abstract class TabletLocator {
private static HashMap<LocatorKey,TabletLocator> locators = new HashMap<LocatorKey,TabletLocator>();
+ public static synchronized void clearLocators() {
+ locators.clear();
+ }
+
public static synchronized TabletLocator getLocator(Instance instance, Text tableId) {
LocatorKey key = new LocatorKey(instance.getInstanceID(), tableId);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5d2bd54/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 948d0f0..dbececd 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -65,7 +65,6 @@ import javax.management.StandardMBean;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.CompressedIterators;
import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
@@ -287,16 +286,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
SimpleTimer.getInstance().schedule(new Runnable() {
@Override
public void run() {
- try {
- SystemCredentials creds = SystemCredentials.get();
- Connector connector = instance.getConnector(creds.getPrincipal(), creds.getToken());
- for (String id : connector.tableOperations().tableIdMap().values()) {
- TabletLocator locator = TabletLocator.getLocator(instance, new Text(id));
- locator.invalidateCache();
- }
- } catch (Exception ex) {
- log.error("Error clearing locator cache, ignoring", ex);
- }
+ TabletLocator.clearLocators();
}
}, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
}
@@ -304,7 +294,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
private static long jitter(long ms) {
Random r = new Random();
// add a random 10% wait
- return (long)(1. + (r.nextDouble() / 10) * ms);
+ return (long)((1. + (r.nextDouble() / 10)) * ms);
}
private synchronized static void logGCInfo(AccumuloConfiguration conf) {
[3/3] accumulo git commit: ACCUMULO-3549 merge to master
Posted by ec...@apache.org.
ACCUMULO-3549 merge to master
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a6a2be8a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a6a2be8a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a6a2be8a
Branch: refs/heads/master
Commit: a6a2be8aa9c607d5ebe7f902448e6245a130b716
Parents: c3493c1 c5d2bd5
Author: Eric C. Newton <er...@gmail.com>
Authored: Mon Feb 2 17:52:50 2015 -0500
Committer: Eric C. Newton <er...@gmail.com>
Committed: Mon Feb 2 17:52:50 2015 -0500
----------------------------------------------------------------------
.../accumulo/core/client/impl/TabletLocator.java | 4 ++++
.../org/apache/accumulo/tserver/TabletServer.java | 15 +++++++++++++++
2 files changed, 19 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a6a2be8a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
index 782a599,5f30ddc..2391fe6
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
@@@ -92,8 -92,12 +92,12 @@@ public abstract class TabletLocator
private static HashMap<LocatorKey,TabletLocator> locators = new HashMap<LocatorKey,TabletLocator>();
+ public static synchronized void clearLocators() {
+ locators.clear();
+ }
+
- public static synchronized TabletLocator getLocator(Instance instance, Text tableId) {
-
+ public static synchronized TabletLocator getLocator(ClientContext context, Text tableId) {
+ Instance instance = context.getInstance();
LocatorKey key = new LocatorKey(instance.getInstanceID(), tableId);
TabletLocator tl = locators.get(key);
if (tl == null) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a6a2be8a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 7d49e65,dbececd..a5675dc
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -59,9 -68,9 +60,10 @@@ import org.apache.accumulo.core.client.
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.CompressedIterators;
import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
+import org.apache.accumulo.core.client.impl.DurabilityImpl;
import org.apache.accumulo.core.client.impl.ScannerImpl;
import org.apache.accumulo.core.client.impl.Tables;
+ import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.impl.TabletType;
import org.apache.accumulo.core.client.impl.Translator;
import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator;
@@@ -248,157 -254,966 +250,170 @@@ public class TabletServer extends Accum
private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
private static final long TIME_BETWEEN_GC_CHECKS = 5000;
+ private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 1000;
+ private static final Set<Column> EMPTY_COLUMNS = Collections.emptySet();
- private TabletServerLogger logger;
-
- protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
-
- private ServerConfiguration serverConfig;
- private LogSorter logSorter = null;
-
- public TabletServer(ServerConfiguration conf, VolumeManager fs) {
- super();
- this.serverConfig = conf;
- this.instance = conf.getInstance();
- this.fs = fs;
- this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
- SimpleTimer.getInstance().schedule(new Runnable() {
- @Override
- public void run() {
- synchronized (onlineTablets) {
- long now = System.currentTimeMillis();
- for (Tablet tablet : onlineTablets.values())
- try {
- tablet.updateRates(now);
- } catch (Exception ex) {
- log.error(ex, ex);
- }
- }
- }
- }, TIME_BETWEEN_GC_CHECKS, TIME_BETWEEN_GC_CHECKS);
- SimpleTimer.getInstance().schedule(new Runnable() {
- @Override
- public void run() {
- TabletLocator.clearLocators();
- }
- }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
- }
-
- private static long jitter(long ms) {
- Random r = new Random();
- // add a random 10% wait
- return (long)((1. + (r.nextDouble() / 10)) * ms);
- }
-
- private synchronized static void logGCInfo(AccumuloConfiguration conf) {
- long now = System.currentTimeMillis();
-
- List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
- Runtime rt = Runtime.getRuntime();
-
- StringBuilder sb = new StringBuilder("gc");
-
- boolean sawChange = false;
-
- long maxIncreaseInCollectionTime = 0;
-
- for (GarbageCollectorMXBean gcBean : gcmBeans) {
- Long prevTime = prevGcTime.get(gcBean.getName());
- long pt = 0;
- if (prevTime != null) {
- pt = prevTime;
- }
-
- long time = gcBean.getCollectionTime();
-
- if (time - pt != 0) {
- sawChange = true;
- }
-
- long increaseInCollectionTime = time - pt;
- sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0));
- maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
- prevGcTime.put(gcBean.getName(), time);
- }
-
- long mem = rt.freeMemory();
- if (maxIncreaseInCollectionTime == 0) {
- gcTimeIncreasedCount = 0;
- } else {
- gcTimeIncreasedCount++;
- if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
- log.warn("Running low on memory");
- gcTimeIncreasedCount = 0;
- }
- }
-
- if (mem > lastMemorySize) {
- sawChange = true;
- }
-
- String sign = "+";
- if (mem - lastMemorySize <= 0) {
- sign = "";
- }
-
- sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory()));
-
- if (sawChange) {
- log.debug(sb.toString());
- }
-
- final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
- if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
- long diff = now - lastMemoryCheckTime;
- if (diff > keepAliveTimeout) {
- log.warn(String.format("GC pause checker not called in a timely fashion. Expected every %.1f seconds but was %.1f seconds since last check",
- TIME_BETWEEN_GC_CHECKS / 1000., diff / 1000.));
- }
- lastMemoryCheckTime = now;
- return;
- }
-
- if (maxIncreaseInCollectionTime > keepAliveTimeout) {
- Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1);
- }
-
- lastMemorySize = mem;
- lastMemoryCheckTime = now;
- }
-
- private TabletStatsKeeper statsKeeper;
-
- private static class Session {
- long lastAccessTime;
- long startTime;
- String user;
- String client = TServerUtils.clientAddress.get();
- public boolean reserved;
-
- public void cleanup() {}
- }
-
- private static class SessionManager {
-
- SecureRandom random;
- Map<Long,Session> sessions;
- long maxIdle;
-
- SessionManager(AccumuloConfiguration conf) {
- random = new SecureRandom();
- sessions = new HashMap<Long,Session>();
-
- maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
-
- Runnable r = new Runnable() {
- @Override
- public void run() {
- sweep(maxIdle);
- }
- };
-
- SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
- }
-
- synchronized long createSession(Session session, boolean reserve) {
- long sid = random.nextLong();
-
- while (sessions.containsKey(sid)) {
- sid = random.nextLong();
- }
-
- sessions.put(sid, session);
-
- session.reserved = reserve;
-
- session.startTime = session.lastAccessTime = System.currentTimeMillis();
-
- return sid;
- }
-
- long getMaxIdleTime() {
- return maxIdle;
- }
-
- /**
- * while a session is reserved, it cannot be canceled or removed
- */
- synchronized Session reserveSession(long sessionId) {
- Session session = sessions.get(sessionId);
- if (session != null) {
- if (session.reserved)
- throw new IllegalStateException();
- session.reserved = true;
- }
-
- return session;
-
- }
-
- synchronized Session reserveSession(long sessionId, boolean wait) {
- Session session = sessions.get(sessionId);
- if (session != null) {
- while (wait && session.reserved) {
- try {
- wait(1000);
- } catch (InterruptedException e) {
- throw new RuntimeException();
- }
- }
-
- if (session.reserved)
- throw new IllegalStateException();
- session.reserved = true;
- }
-
- return session;
-
- }
-
- synchronized void unreserveSession(Session session) {
- if (!session.reserved)
- throw new IllegalStateException();
- notifyAll();
- session.reserved = false;
- session.lastAccessTime = System.currentTimeMillis();
- }
-
- synchronized void unreserveSession(long sessionId) {
- Session session = getSession(sessionId);
- if (session != null)
- unreserveSession(session);
- }
-
- synchronized Session getSession(long sessionId) {
- Session session = sessions.get(sessionId);
- if (session != null)
- session.lastAccessTime = System.currentTimeMillis();
- return session;
- }
-
- Session removeSession(long sessionId) {
- return removeSession(sessionId, false);
- }
-
- Session removeSession(long sessionId, boolean unreserve) {
- Session session = null;
- synchronized (this) {
- session = sessions.remove(sessionId);
- if (unreserve && session != null)
- unreserveSession(session);
- }
-
- // do clean up out side of lock..
- if (session != null)
- session.cleanup();
-
- return session;
- }
-
- private void sweep(long maxIdle) {
- ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
- synchronized (this) {
- Iterator<Session> iter = sessions.values().iterator();
- while (iter.hasNext()) {
- Session session = iter.next();
- long idleTime = System.currentTimeMillis() - session.lastAccessTime;
- if (idleTime > maxIdle && !session.reserved) {
- log.info("Closing idle session from user=" + session.user + ", client=" + session.client + ", idle=" + idleTime + "ms");
- iter.remove();
- sessionsToCleanup.add(session);
- }
- }
- }
-
- // do clean up outside of lock
- for (Session session : sessionsToCleanup) {
- session.cleanup();
- }
- }
-
- synchronized void removeIfNotAccessed(final long sessionId, final long delay) {
- Session session = sessions.get(sessionId);
- if (session != null) {
- final long removeTime = session.lastAccessTime;
- TimerTask r = new TimerTask() {
- @Override
- public void run() {
- Session sessionToCleanup = null;
- synchronized (SessionManager.this) {
- Session session2 = sessions.get(sessionId);
- if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
- log.info("Closing not accessed session from user=" + session2.user + ", client=" + session2.client + ", duration=" + delay + "ms");
- sessions.remove(sessionId);
- sessionToCleanup = session2;
- }
- }
-
- // call clean up outside of lock
- if (sessionToCleanup != null)
- sessionToCleanup.cleanup();
- }
- };
-
- SimpleTimer.getInstance().schedule(r, delay);
- }
- }
-
- public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
- Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
- for (Entry<Long,Session> entry : sessions.entrySet()) {
-
- Session session = entry.getValue();
- @SuppressWarnings("rawtypes")
- ScanTask nbt = null;
- String tableID = null;
-
- if (session instanceof ScanSession) {
- ScanSession ss = (ScanSession) session;
- nbt = ss.nextBatchTask;
- tableID = ss.extent.getTableId().toString();
- } else if (session instanceof MultiScanSession) {
- MultiScanSession mss = (MultiScanSession) session;
- nbt = mss.lookupTask;
- tableID = mss.threadPoolExtent.getTableId().toString();
- }
-
- if (nbt == null)
- continue;
-
- ScanRunState srs = nbt.getScanRunState();
-
- if (srs == ScanRunState.FINISHED)
- continue;
-
- MapCounter<ScanRunState> stateCounts = counts.get(tableID);
- if (stateCounts == null) {
- stateCounts = new MapCounter<ScanRunState>();
- counts.put(tableID, stateCounts);
- }
-
- stateCounts.increment(srs, 1);
- }
-
- return counts;
- }
-
- public synchronized List<ActiveScan> getActiveScans() {
-
- ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
-
- long ct = System.currentTimeMillis();
-
- for (Entry<Long,Session> entry : sessions.entrySet()) {
- Session session = entry.getValue();
- if (session instanceof ScanSession) {
- ScanSession ss = (ScanSession) session;
-
- ScanState state = ScanState.RUNNING;
-
- ScanTask<ScanBatch> nbt = ss.nextBatchTask;
- if (nbt == null) {
- state = ScanState.IDLE;
- } else {
- switch (nbt.getScanRunState()) {
- case QUEUED:
- state = ScanState.QUEUED;
- break;
- case FINISHED:
- state = ScanState.IDLE;
- break;
- case RUNNING:
- default:
- /* do nothing */
- break;
- }
- }
-
- ActiveScan activeScan = new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime,
- ScanType.SINGLE, state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio,
- ss.auths.getAuthorizationsBB());
-
- // scanId added by ACCUMULO-2641 is an optional thrift argument and not available in ActiveScan constructor
- activeScan.setScanId(entry.getKey());
- activeScans.add(activeScan);
-
- } else if (session instanceof MultiScanSession) {
- MultiScanSession mss = (MultiScanSession) session;
-
- ScanState state = ScanState.RUNNING;
-
- ScanTask<MultiScanResult> nbt = mss.lookupTask;
- if (nbt == null) {
- state = ScanState.IDLE;
- } else {
- switch (nbt.getScanRunState()) {
- case QUEUED:
- state = ScanState.QUEUED;
- break;
- case FINISHED:
- state = ScanState.IDLE;
- break;
- case RUNNING:
- default:
- /* do nothing */
- break;
- }
- }
-
- activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
- ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths
- .getAuthorizationsBB()));
- }
- }
-
- return activeScans;
- }
- }
-
- static class TservConstraintEnv implements Environment {
-
- private TCredentials credentials;
- private SecurityOperation security;
- private Authorizations auths;
- private KeyExtent ke;
-
- TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
- this.security = secOp;
- this.credentials = credentials;
- }
-
- void setExtent(KeyExtent ke) {
- this.ke = ke;
- }
-
- @Override
- public KeyExtent getExtent() {
- return ke;
- }
-
- @Override
- public String getUser() {
- return credentials.getPrincipal();
- }
-
- @Override
- @Deprecated
- public Authorizations getAuthorizations() {
- if (auths == null)
- try {
- this.auths = security.getUserAuthorizations(credentials);
- } catch (ThriftSecurityException e) {
- throw new RuntimeException(e);
- }
- return auths;
- }
-
- @Override
- public AuthorizationContainer getAuthorizationsContainer() {
- return new AuthorizationContainer() {
- @Override
- public boolean contains(ByteSequence auth) {
- try {
- return security.userHasAuthorizations(credentials,
- Collections.<ByteBuffer> singletonList(ByteBuffer.wrap(auth.getBackingArray(), auth.offset(), auth.length())));
- } catch (ThriftSecurityException e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
- }
-
- private abstract class ScanTask<T> implements RunnableFuture<T> {
-
- protected AtomicBoolean interruptFlag;
- protected ArrayBlockingQueue<Object> resultQueue;
- protected AtomicInteger state;
- protected AtomicReference<ScanRunState> runState;
-
- private static final int INITIAL = 1;
- private static final int ADDED = 2;
- private static final int CANCELED = 3;
+ private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
+ private final TransactionWatcher watcher = new TransactionWatcher();
+ private final ZooCache masterLockCache = new ZooCache();
- ScanTask() {
- interruptFlag = new AtomicBoolean(false);
- runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
- state = new AtomicInteger(INITIAL);
- resultQueue = new ArrayBlockingQueue<Object>(1);
- }
-
- protected void addResult(Object o) {
- if (state.compareAndSet(INITIAL, ADDED))
- resultQueue.add(o);
- else if (state.get() == ADDED)
- throw new IllegalStateException("Tried to add more than one result");
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (!mayInterruptIfRunning)
- throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
-
- if (state.get() == CANCELED)
- return true;
-
- if (state.compareAndSet(INITIAL, CANCELED)) {
- interruptFlag.set(true);
- resultQueue = null;
- return true;
- }
-
- return false;
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- throw new UnsupportedOperationException();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-
- ArrayBlockingQueue<Object> localRQ = resultQueue;
-
- if (state.get() == CANCELED)
- throw new CancellationException();
-
- if (localRQ == null && state.get() == ADDED)
- throw new IllegalStateException("Tried to get result twice");
-
- Object r = localRQ.poll(timeout, unit);
-
- // could have been canceled while waiting
- if (state.get() == CANCELED) {
- if (r != null)
- throw new IllegalStateException("Nothing should have been added when in canceled state");
-
- throw new CancellationException();
- }
-
- if (r == null)
- throw new TimeoutException();
-
- // make this method stop working now that something is being
- // returned
- resultQueue = null;
-
- if (r instanceof Throwable)
- throw new ExecutionException((Throwable) r);
-
- return (T) r;
- }
-
- @Override
- public boolean isCancelled() {
- return state.get() == CANCELED;
- }
-
- @Override
- public boolean isDone() {
- return runState.get().equals(ScanRunState.FINISHED);
- }
-
- public ScanRunState getScanRunState() {
- return runState.get();
- }
-
- }
-
- private static class ConditionalSession extends Session {
- public TCredentials credentials;
- public Authorizations auths;
- public String tableId;
- public AtomicBoolean interruptFlag;
-
- @Override
- public void cleanup() {
- interruptFlag.set(true);
- }
- }
-
- private static class UpdateSession extends Session {
- public Tablet currentTablet;
- public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
- Map<KeyExtent,Long> failures = new HashMap<KeyExtent,Long>();
- HashMap<KeyExtent,SecurityErrorCode> authFailures = new HashMap<KeyExtent,SecurityErrorCode>();
- public Violations violations;
- public TCredentials credentials;
- public long totalUpdates = 0;
- public long flushTime = 0;
- Stat prepareTimes = new Stat();
- Stat walogTimes = new Stat();
- Stat commitTimes = new Stat();
- Stat authTimes = new Stat();
- public Map<Tablet,List<Mutation>> queuedMutations = new HashMap<Tablet,List<Mutation>>();
- public long queuedMutationSize = 0;
- TservConstraintEnv cenv = null;
- }
-
- private static class ScanSession extends Session {
- public KeyExtent extent;
- public HashSet<Column> columnSet;
- public List<IterInfo> ssiList;
- public Map<String,Map<String,String>> ssio;
- public Authorizations auths;
- public long entriesReturned = 0;
- public Stat nbTimes = new Stat();
- public long batchCount = 0;
- public volatile ScanTask<ScanBatch> nextBatchTask;
- public AtomicBoolean interruptFlag;
- public Scanner scanner;
- public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
-
- @Override
- public void cleanup() {
- try {
- if (nextBatchTask != null)
- nextBatchTask.cancel(true);
- } finally {
- if (scanner != null)
- scanner.close();
- }
- }
-
- }
-
- private static class MultiScanSession extends Session {
- HashSet<Column> columnSet;
- Map<KeyExtent,List<Range>> queries;
- public List<IterInfo> ssiList;
- public Map<String,Map<String,String>> ssio;
- public Authorizations auths;
-
- // stats
- int numRanges;
- int numTablets;
- int numEntries;
- long totalLookupTime;
-
- public volatile ScanTask<MultiScanResult> lookupTask;
- public KeyExtent threadPoolExtent;
-
- @Override
- public void cleanup() {
- if (lookupTask != null)
- lookupTask.cancel(true);
- }
- }
-
- /**
- * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
- * are monotonically increasing.
- *
- */
- static class WriteTracker {
- private static AtomicLong operationCounter = new AtomicLong(1);
- private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
-
- WriteTracker() {
- for (TabletType ttype : TabletType.values()) {
- inProgressWrites.put(ttype, new TreeSet<Long>());
- }
- }
-
- synchronized long startWrite(TabletType ttype) {
- long operationId = operationCounter.getAndIncrement();
- inProgressWrites.get(ttype).add(operationId);
- return operationId;
- }
-
- synchronized void finishWrite(long operationId) {
- if (operationId == -1)
- return;
-
- boolean removed = false;
-
- for (TabletType ttype : TabletType.values()) {
- removed = inProgressWrites.get(ttype).remove(operationId);
- if (removed)
- break;
- }
-
- if (!removed) {
- throw new IllegalArgumentException("Attempted to finish write not in progress, operationId " + operationId);
- }
-
- this.notifyAll();
- }
-
- synchronized void waitForWrites(TabletType ttype) {
- long operationId = operationCounter.getAndIncrement();
- while (inProgressWrites.get(ttype).floor(operationId) != null) {
- try {
- this.wait();
- } catch (InterruptedException e) {
- log.error(e, e);
- }
- }
- }
+ private final TabletServerLogger logger;
- public long startWrite(Set<Tablet> keySet) {
- if (keySet.size() == 0)
- return -1;
+ private final TabletServerMetricsFactory metricsFactory;
+ private final Metrics updateMetrics;
+ private final Metrics scanMetrics;
+ private final Metrics mincMetrics;
- ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
-
- for (Tablet tablet : keySet)
- extents.add(tablet.getExtent());
-
- return startWrite(TabletType.type(extents));
- }
+ public Metrics getMinCMetrics() {
+ return mincMetrics;
}
- public AccumuloConfiguration getSystemConfiguration() {
- return serverConfig.getConfiguration();
- }
-
- TransactionWatcher watcher = new TransactionWatcher();
-
- private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
-
- SessionManager sessionManager;
+ private final LogSorter logSorter;
+ private ReplicationWorker replWorker = null;
+ private final TabletStatsKeeper statsKeeper;
+ private final AtomicInteger logIdGenerator = new AtomicInteger();
- AccumuloConfiguration acuConf = getSystemConfiguration();
+ private final AtomicLong flushCounter = new AtomicLong(0);
+ private final AtomicLong syncCounter = new AtomicLong(0);
- TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics();
+ private final VolumeManager fs;
- TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics();
+ private final SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
+ private final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
+ private final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
+ @SuppressWarnings("unchecked")
+ private final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap(1000));
- WriteTracker writeTracker = new WriteTracker();
+ private final TabletServerResourceManager resourceManager;
+ private final SecurityOperation security;
- private RowLocks rowLocks = new RowLocks();
+ private final BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>();
- ThriftClientHandler() {
- super(instance, watcher, fs);
- log.debug(ThriftClientHandler.class.getName() + " created");
- sessionManager = new SessionManager(getSystemConfiguration());
- // Register the metrics MBean
- try {
- updateMetrics.register();
- scanMetrics.register();
- } catch (Exception e) {
- log.error("Exception registering MBean with MBean Server", e);
- }
- }
-
- @Override
- public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
- throws ThriftSecurityException {
-
- if (!security.canPerformSystemActions(credentials))
- throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+ private Thread majorCompactorThread;
- List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
+ private HostAndPort replicationAddress;
+ private HostAndPort clientAddress;
- for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
- TKeyExtent tke = entry.getKey();
- Map<String,MapFileInfo> fileMap = entry.getValue();
- Map<FileRef,MapFileInfo> fileRefMap = new HashMap<FileRef,MapFileInfo>();
- for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
- Path path = new Path(mapping.getKey());
- FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
- path = ns.makeQualified(path);
- fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue());
- }
+ private volatile boolean serverStopRequested = false;
+ private volatile boolean majorCompactorDisabled = false;
+ private volatile boolean shutdownComplete = false;
- Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
+ private ZooLock tabletServerLock;
- if (importTablet == null) {
- failures.add(tke);
- } else {
- try {
- importTablet.importMapFiles(tid, fileRefMap, setTime);
- } catch (IOException ioe) {
- log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
- failures.add(tke);
- }
- }
- }
- return failures;
- }
+ private TServer server;
+ private TServer replServer;
- private class NextBatchTask extends ScanTask<ScanBatch> {
+ private DistributedWorkQueue bulkFailedCopyQ;
- private long scanID;
+ private String lockID;
- NextBatchTask(long scanID, AtomicBoolean interruptFlag) {
- this.scanID = scanID;
- this.interruptFlag = interruptFlag;
+ public static final AtomicLong seekCount = new AtomicLong(0);
- if (interruptFlag.get())
- cancel(true);
- }
+ private final AtomicLong totalMinorCompactions = new AtomicLong(0);
+ private final ServerConfigurationFactory confFactory;
+ public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) {
+ super(confFactory);
+ this.confFactory = confFactory;
+ this.fs = fs;
+ AccumuloConfiguration aconf = getConfiguration();
+ Instance instance = getInstance();
+ this.sessionManager = new SessionManager(aconf);
+ this.logSorter = new LogSorter(instance, fs, aconf);
+ this.replWorker = new ReplicationWorker(this, fs);
+ this.statsKeeper = new TabletStatsKeeper();
+ SimpleTimer.getInstance(aconf).schedule(new Runnable() {
@Override
public void run() {
-
- final ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
- String oldThreadName = Thread.currentThread().getName();
-
- try {
- if (isCancelled() || scanSession == null)
- return;
-
- runState.set(ScanRunState.RUNNING);
-
- Thread.currentThread().setName(
- "User: " + scanSession.user + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent);
-
- Tablet tablet = onlineTablets.get(scanSession.extent);
-
- if (tablet == null) {
- addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
- return;
- }
-
- long t1 = System.currentTimeMillis();
- ScanBatch batch = scanSession.scanner.read();
- long t2 = System.currentTimeMillis();
- scanSession.nbTimes.addStat(t2 - t1);
-
- // there should only be one thing on the queue at a time, so
- // it should be ok to call add()
- // instead of put()... if add() fails because queue is at
- // capacity it means there is code
- // problem somewhere
- addResult(batch);
- } catch (TabletClosedException e) {
- addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
- } catch (IterationInterruptedException iie) {
- if (!isCancelled()) {
- log.warn("Iteration interrupted, when scan not cancelled", iie);
- addResult(iie);
- }
- } catch (TooManyFilesException tmfe) {
- addResult(tmfe);
- } catch (Throwable e) {
- log.warn("exception while scanning tablet " + (scanSession == null ? "(unknown)" : scanSession.extent), e);
- addResult(e);
- } finally {
- runState.set(ScanRunState.FINISHED);
- Thread.currentThread().setName(oldThreadName);
+ synchronized (onlineTablets) {
+ long now = System.currentTimeMillis();
+ for (Tablet tablet : onlineTablets.values())
+ try {
+ tablet.updateRates(now);
+ } catch (Exception ex) {
+ log.error("Error updating rates for {}", tablet.getExtent(), ex);
+ }
}
-
- }
- }
-
- private class LookupTask extends ScanTask<MultiScanResult> {
-
- private long scanID;
-
- LookupTask(long scanID) {
- this.scanID = scanID;
}
+ }, 5000, 5000);
+ long walogMaxSize = getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE);
+ long minBlockSize = CachedConfiguration.getInstance().getLong("dfs.namenode.fs-limits.min-block-size", 0);
+ if (minBlockSize != 0 && minBlockSize > walogMaxSize)
+ throw new RuntimeException("Unable to start TabletServer. Logger is set to use blocksize " + walogMaxSize + " but hdfs minimum block size is "
+ + minBlockSize + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE + " or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml.");
+ logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter);
+ this.resourceManager = new TabletServerResourceManager(this, fs);
+ this.security = AuditedSecurityOperation.getInstance(this);
+
+ metricsFactory = new TabletServerMetricsFactory(aconf);
+ updateMetrics = metricsFactory.createUpdateMetrics();
+ scanMetrics = metricsFactory.createScanMetrics();
+ mincMetrics = metricsFactory.createMincMetrics();
++ SimpleTimer.getInstance(aconf).schedule(new Runnable() {
+ @Override
+ public void run() {
- MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID);
- String oldThreadName = Thread.currentThread().getName();
-
- try {
- if (isCancelled() || session == null)
- return;
-
- TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString());
- long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
-
- runState.set(ScanRunState.RUNNING);
- Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
-
- long bytesAdded = 0;
- long maxScanTime = 4000;
-
- long startTime = System.currentTimeMillis();
++ TabletLocator.clearLocators();
++ }
++ }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
++ }
+
- ArrayList<KVEntry> results = new ArrayList<KVEntry>();
- Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
- ArrayList<KeyExtent> fullScans = new ArrayList<KeyExtent>();
- KeyExtent partScan = null;
- Key partNextKey = null;
- boolean partNextKeyInclusive = false;
++ private static long jitter(long ms) {
++ Random r = new Random();
++ // add a random 10% wait
++ return (long)((1. + (r.nextDouble() / 10)) * ms);
+ }
- Iterator<Entry<KeyExtent,List<Range>>> iter = session.queries.entrySet().iterator();
+ private final SessionManager sessionManager;
- // check the time so that the read ahead thread is not monopolized
- while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) {
- Entry<KeyExtent,List<Range>> entry = iter.next();
+ private final WriteTracker writeTracker = new WriteTracker();
- iter.remove();
+ private final RowLocks rowLocks = new RowLocks();
- // check that tablet server is serving requested tablet
- Tablet tablet = onlineTablets.get(entry.getKey());
- if (tablet == null) {
- failures.put(entry.getKey(), entry.getValue());
- continue;
- }
- Thread.currentThread().setName(
- "Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString());
+ private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
+ private final ReentrantLock recoveryLock = new ReentrantLock(true);
- LookupResult lookupResult;
- try {
+ private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
- // do the following check to avoid a race condition
- // between setting false below and the task being
- // canceled
- if (isCancelled())
- interruptFlag.set(true);
+ ThriftClientHandler() {
+ super(TabletServer.this, watcher, fs);
+ log.debug(ThriftClientHandler.class.getName() + " created");
+ }
- lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList,
- session.ssio, interruptFlag);
+ @Override
+ public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
+ throws ThriftSecurityException {
- // if the tablet was closed it it possible that the
- // interrupt flag was set.... do not want it set for
- // the next
- // lookup
- interruptFlag.set(false);
+ if (!security.canPerformSystemActions(credentials))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
- } catch (IOException e) {
- log.warn("lookup failed for tablet " + entry.getKey(), e);
- throw new RuntimeException(e);
- }
+ List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
- bytesAdded += lookupResult.bytesAdded;
+ for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
+ TKeyExtent tke = entry.getKey();
+ Map<String,MapFileInfo> fileMap = entry.getValue();
+ Map<FileRef,MapFileInfo> fileRefMap = new HashMap<FileRef,MapFileInfo>();
+ for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
+ Path path = new Path(mapping.getKey());
+ FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
+ path = ns.makeQualified(path);
+ fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue());
+ }
- if (lookupResult.unfinishedRanges.size() > 0) {
- if (lookupResult.closed) {
- failures.put(entry.getKey(), lookupResult.unfinishedRanges);
- } else {
- session.queries.put(entry.getKey(), lookupResult.unfinishedRanges);
- partScan = entry.getKey();
- partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey();
- partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive();
- }
- } else {
- fullScans.add(entry.getKey());
- }
- }
+ Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
- long finishTime = System.currentTimeMillis();
- session.totalLookupTime += (finishTime - startTime);
- session.numEntries += results.size();
-
- // convert everything to thrift before adding result
- List<TKeyValue> retResults = new ArrayList<TKeyValue>();
- for (KVEntry entry : results)
- retResults.add(new TKeyValue(entry.getKey().toThrift(), ByteBuffer.wrap(entry.getValue().get())));
- Map<TKeyExtent,List<TRange>> retFailures = Translator.translate(failures, Translators.KET,
- new Translator.ListTranslator<Range,TRange>(Translators.RT));
- List<TKeyExtent> retFullScans = Translator.translate(fullScans, Translators.KET);
- TKeyExtent retPartScan = null;
- TKey retPartNextKey = null;
- if (partScan != null) {
- retPartScan = partScan.toThrift();
- retPartNextKey = partNextKey.toThrift();
- }
- // add results to queue
- addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan, retPartNextKey, partNextKeyInclusive, session.queries.size() != 0));
- } catch (IterationInterruptedException iie) {
- if (!isCancelled()) {
- log.warn("Iteration interrupted, when scan not cancelled", iie);
- addResult(iie);
+ if (importTablet == null) {
+ failures.add(tke);
+ } else {
+ try {
+ importTablet.importMapFiles(tid, fileRefMap, setTime);
+ } catch (IOException ioe) {
+ log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
+ failures.add(tke);
}
- } catch (Throwable e) {
- log.warn("exception while doing multi-scan ", e);
- addResult(e);
- } finally {
- Thread.currentThread().setName(oldThreadName);
- runState.set(ScanRunState.FINISHED);
}
}
+ return failures;
}
@Override