You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/01/10 16:15:36 UTC
[1/7] git commit: ACCUMULO-2160 back-port real bugs found by findbugs
Updated Branches:
refs/heads/master dc10e22dc -> ba931ff5d
ACCUMULO-2160 back-port real bugs found by findbugs
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5dd6f84b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5dd6f84b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5dd6f84b
Branch: refs/heads/master
Commit: 5dd6f84bc1858d894cfbaedbdecfb9acae15f877
Parents: 9abb725
Author: Eric Newton <er...@gmail.com>
Authored: Thu Jan 9 15:41:37 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Thu Jan 9 15:41:37 2014 -0500
----------------------------------------------------------------------
.../apache/accumulo/core/client/mapreduce/RangeInputSplit.java | 3 ++-
src/server/src/main/java/org/apache/accumulo/server/Accumulo.java | 2 +-
.../org/apache/accumulo/server/tabletserver/TabletServer.java | 2 +-
3 files changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5dd6f84b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
index 69f2b38..01921c4 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
@@ -20,6 +20,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigInteger;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
@@ -309,7 +310,7 @@ public class RangeInputSplit extends InputSplit implements Writable {
public String toString() {
StringBuilder sb = new StringBuilder(128);
sb.append("Range: ").append(range);
- sb.append(" Locations: ").append(locations);
+ sb.append(" Locations: ").append(Arrays.asList(locations));
sb.append(" Table: ").append(table);
sb.append(" InstanceName: ").append(instanceName);
sb.append(" zooKeepers: ").append(zooKeepers);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5dd6f84b/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java b/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
index 253962b..4e909a7 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -117,7 +117,7 @@ public class Accumulo {
System.setProperty("org.apache.accumulo.core.host.log", localhost);
// Use a specific log config, if it exists
- String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"), application);
+ String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
if (!new File(logConfig).exists()) {
// otherwise, use the generic config
logConfig = String.format("%s/generic_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5dd6f84b/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 33082ea..ff4f1bc 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -2464,7 +2464,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
openingTablets.remove(extentToOpen);
onlineTablets.put(extentToOpen, tablet);
openingTablets.notifyAll();
- recentlyUnloadedCache.remove(tablet);
+ recentlyUnloadedCache.remove(tablet.getExtent());
}
}
tablet = null; // release this reference
[4/7] git commit: ACCUMULO-2160 back-port real bugs found by findbugs
Posted by ec...@apache.org.
ACCUMULO-2160 back-port real bugs found by findbugs
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e135e039
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e135e039
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e135e039
Branch: refs/heads/master
Commit: e135e039a33c1e3917bed87c16efcf6535418d15
Parents: 2d51e32 cb50a74
Author: Eric Newton <er...@gmail.com>
Authored: Thu Jan 9 15:50:51 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Thu Jan 9 15:50:51 2014 -0500
----------------------------------------------------------------------
server/base/src/main/java/org/apache/accumulo/server/Accumulo.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e135e039/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
index 83f54b0,0000000..c546590
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
@@@ -1,264 -1,0 +1,264 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.trace.DistributedTrace;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.util.Version;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.log4j.helpers.FileWatchdog;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+public class Accumulo {
+
+ private static final Logger log = Logger.getLogger(Accumulo.class);
+
+ public static synchronized void updateAccumuloVersion(VolumeManager fs) {
+ try {
+ if (getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) {
+ fs.create(new Path(ServerConstants.getDataVersionLocation() + "/" + ServerConstants.DATA_VERSION));
+ fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + ServerConstants.PREV_DATA_VERSION));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to set accumulo version: an error occurred.", e);
+ }
+ }
+
+ public static synchronized int getAccumuloPersistentVersion(VolumeManager fs) {
+ int dataVersion;
+ try {
+ FileStatus[] files = fs.getDefaultVolume().listStatus(ServerConstants.getDataVersionLocation());
+ if (files == null || files.length == 0) {
+ dataVersion = -1; // assume it is 0.5 or earlier
+ } else {
+ dataVersion = Integer.parseInt(files[0].getPath().getName());
+ }
+ return dataVersion;
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to read accumulo version: an error occurred.", e);
+ }
+ }
+
+ public static void enableTracing(String address, String application) {
+ try {
+ DistributedTrace.enable(HdfsZooInstance.getInstance(), ZooReaderWriter.getInstance(), application, address);
+ } catch (Exception ex) {
+ log.error("creating remote sink for trace spans", ex);
+ }
+ }
+
+ private static class LogMonitor extends FileWatchdog implements Watcher {
+ String path;
+
+ protected LogMonitor(String instance, String filename, int delay) {
+ super(filename);
+ setDelay(delay);
+ this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT;
+ }
+
+ private void setMonitorPort() {
+ try {
+ String port = new String(ZooReaderWriter.getInstance().getData(path, null));
+ System.setProperty("org.apache.accumulo.core.host.log.port", port);
+ log.info("Changing monitor log4j port to "+port);
+ doOnChange();
+ } catch (Exception e) {
+ log.error("Error reading zookeeper data for monitor log4j port", e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null)
+ setMonitorPort();
+ log.info("Set watch for monitor log4j port");
+ } catch (Exception e) {
+ log.error("Unable to set watch for monitor log4j port " + path);
+ }
+ super.run();
+ }
+
+ @Override
+ protected void doOnChange() {
+ LogManager.resetConfiguration();
+ new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository());
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ setMonitorPort();
+ if (event.getPath() != null) {
+ try {
+ ZooReaderWriter.getInstance().exists(event.getPath(), this);
+ } catch (Exception ex) {
+ log.error("Unable to reset watch for monitor log4j port", ex);
+ }
+ }
+ }
+ }
+
+ public static void init(VolumeManager fs, ServerConfiguration config, String application) throws UnknownHostException {
+
+ System.setProperty("org.apache.accumulo.core.application", application);
+
+ if (System.getenv("ACCUMULO_LOG_DIR") != null)
+ System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_LOG_DIR"));
+ else
+ System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_HOME") + "/logs/");
+
+ String localhost = InetAddress.getLocalHost().getHostName();
+ System.setProperty("org.apache.accumulo.core.ip.localhost.hostname", localhost);
+
+ if (System.getenv("ACCUMULO_LOG_HOST") != null)
+ System.setProperty("org.apache.accumulo.core.host.log", System.getenv("ACCUMULO_LOG_HOST"));
+ else
+ System.setProperty("org.apache.accumulo.core.host.log", localhost);
+
+ int logPort = config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT);
+ System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort));
+
+ // Use a specific log config, if it exists
- String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"), application);
++ String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
+ if (!new File(logConfig).exists()) {
+ // otherwise, use the generic config
+ logConfig = String.format("%s/generic_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
+ }
+ // Turn off messages about not being able to reach the remote logger... we protect against that.
+ LogLog.setQuietMode(true);
+
+ // Configure logging
+ if (logPort==0)
+ new LogMonitor(config.getInstance().getInstanceID(), logConfig, 5000).start();
+ else
+ DOMConfigurator.configureAndWatch(logConfig, 5000);
+
+ // Read the auditing config
+ String auditConfig = String.format("%s/auditLog.xml", System.getenv("ACCUMULO_CONF_DIR"));
+
+ DOMConfigurator.configureAndWatch(auditConfig, 5000);
+
+ log.info(application + " starting");
+ log.info("Instance " + config.getInstance().getInstanceID());
+ int dataVersion = Accumulo.getAccumuloPersistentVersion(fs);
+ log.info("Data Version " + dataVersion);
+ Accumulo.waitForZookeeperAndHdfs(fs);
+
+ Version codeVersion = new Version(Constants.VERSION);
+ if (dataVersion != ServerConstants.DATA_VERSION && dataVersion != ServerConstants.PREV_DATA_VERSION) {
+ throw new RuntimeException("This version of accumulo (" + codeVersion + ") is not compatible with files stored using data version " + dataVersion);
+ }
+
+ TreeMap<String,String> sortedProps = new TreeMap<String,String>();
+ for (Entry<String,String> entry : config.getConfiguration())
+ sortedProps.put(entry.getKey(), entry.getValue());
+
+ for (Entry<String,String> entry : sortedProps.entrySet()) {
+ String key = entry.getKey();
+ log.info(key + " = " + (Property.isSensitive(key) ? "<hidden>" : entry.getValue()));
+ }
+
+ monitorSwappiness();
+ }
+
+ /**
+ *
+ */
+ public static void monitorSwappiness() {
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ String procFile = "/proc/sys/vm/swappiness";
+ File swappiness = new File(procFile);
+ if (swappiness.exists() && swappiness.canRead()) {
+ InputStream is = new FileInputStream(procFile);
+ try {
+ byte[] buffer = new byte[10];
+ int bytes = is.read(buffer);
+ String setting = new String(buffer, 0, bytes);
+ setting = setting.trim();
+ if (bytes > 0 && Integer.parseInt(setting) > 10) {
+ log.warn("System swappiness setting is greater than ten (" + setting + ") which can cause time-sensitive operations to be delayed. "
+ + " Accumulo is time sensitive because it needs to maintain distributed lock agreement.");
+ }
+ } finally {
+ is.close();
+ }
+ }
+ } catch (Throwable t) {
+ log.error(t, t);
+ }
+ }
+ }, 1000, 10 * 60 * 1000);
+ }
+
+ public static void waitForZookeeperAndHdfs(VolumeManager fs) {
+ log.info("Attempting to talk to zookeeper");
+ while (true) {
+ try {
+ ZooReaderWriter.getInstance().getChildren(Constants.ZROOT);
+ break;
+ } catch (InterruptedException e) {
+ // ignored
+ } catch (KeeperException ex) {
+ log.info("Waiting for accumulo to be initialized");
+ UtilWaitThread.sleep(1000);
+ }
+ }
+ log.info("Zookeeper connected and initialized, attemping to talk to HDFS");
+ long sleep = 1000;
+ while (true) {
+ try {
+ if (fs.isReady())
+ break;
+ log.warn("Waiting for the NameNode to leave safemode");
+ } catch (IOException ex) {
+ log.warn("Unable to connect to HDFS");
+ }
+ log.info("Sleeping " + sleep / 1000. + " seconds");
+ UtilWaitThread.sleep(sleep);
+ sleep = Math.min(60 * 1000, sleep * 2);
+ }
+ log.info("Connected to HDFS");
+ }
+
+}
[6/7] git commit: ACCUMULO-2160 back-port real bugs found by findbugs
Posted by ec...@apache.org.
ACCUMULO-2160 back-port real bugs found by findbugs
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e4db5923
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e4db5923
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e4db5923
Branch: refs/heads/master
Commit: e4db592367099b53c6880ca2c94a851963b76efc
Parents: 4967454
Author: Eric Newton <er...@gmail.com>
Authored: Thu Jan 9 16:18:15 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Thu Jan 9 16:18:15 2014 -0500
----------------------------------------------------------------------
server/base/src/main/java/org/apache/accumulo/server/Accumulo.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e4db5923/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
index c546590..83f54b0 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -157,7 +157,7 @@ public class Accumulo {
System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort));
// Use a specific log config, if it exists
- String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
+ String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"), application);
if (!new File(logConfig).exists()) {
// otherwise, use the generic config
logConfig = String.format("%s/generic_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
[5/7] git commit: Merge branch '1.6.0-SNAPSHOT'
Posted by ec...@apache.org.
Merge branch '1.6.0-SNAPSHOT'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/49674548
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/49674548
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/49674548
Branch: refs/heads/master
Commit: 49674548bdc2fa96b2622a1c8aa43d28758bc987
Parents: c3d5c1f e135e03
Author: Eric Newton <er...@gmail.com>
Authored: Thu Jan 9 16:00:40 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Thu Jan 9 16:00:40 2014 -0500
----------------------------------------------------------------------
server/base/src/main/java/org/apache/accumulo/server/Accumulo.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
[7/7] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/accumulo
Posted by ec...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/accumulo
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ba931ff5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ba931ff5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ba931ff5
Branch: refs/heads/master
Commit: ba931ff5dd0251f9c3a0494ed309864eaea12a4f
Parents: e4db592 dc10e22
Author: Eric Newton <er...@gmail.com>
Authored: Fri Jan 10 10:15:04 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Fri Jan 10 10:15:04 2014 -0500
----------------------------------------------------------------------
.../minicluster/MiniAccumuloCluster.java | 4 +---
.../minicluster/MiniAccumuloConfig.java | 20 ++--------------
.../minicluster/MiniAccumuloClusterGCTest.java | 25 --------------------
.../minicluster/MiniAccumuloClusterTest.java | 4 ++--
4 files changed, 5 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
[3/7] git commit: ACCUMULO-2160 back-port real bugs found by findbugs
Posted by ec...@apache.org.
ACCUMULO-2160 back-port real bugs found by findbugs
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cb50a743
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cb50a743
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cb50a743
Branch: refs/heads/master
Commit: cb50a743dee006bcbe1bf196ff224af5557722f6
Parents: 1413ebc 5dd6f84
Author: Eric Newton <er...@gmail.com>
Authored: Thu Jan 9 15:48:23 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Thu Jan 9 15:48:23 2014 -0500
----------------------------------------------------------------------
.../apache/accumulo/core/client/mapreduce/RangeInputSplit.java | 3 ++-
server/src/main/java/org/apache/accumulo/server/Accumulo.java | 2 +-
.../org/apache/accumulo/server/tabletserver/TabletServer.java | 2 +-
3 files changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cb50a743/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
index 0718505,0000000..561e7ac
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
@@@ -1,432 -1,0 +1,433 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
++import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.CredentialHelper;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.log4j.Level;
+
+/**
+ * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
+ */
+public class RangeInputSplit extends InputSplit implements Writable {
+ private Range range;
+ private String[] locations;
+ private String table, instanceName, zooKeepers, principal;
+ private AuthenticationToken token;
+ private Boolean offline, mockInstance, isolatedScan, localIterators;
+ private Authorizations auths;
+ private Set<Pair<Text,Text>> fetchedColumns;
+ private List<IteratorSetting> iterators;
+ private Level level;
+
+ public RangeInputSplit() {
+ range = new Range();
+ locations = new String[0];
+ }
+
+ public RangeInputSplit(Range range, String[] locations) {
+ this.range = range;
+ this.locations = locations;
+ }
+
+ public Range getRange() {
+ return range;
+ }
+
+ private static byte[] extractBytes(ByteSequence seq, int numBytes) {
+ byte[] bytes = new byte[numBytes + 1];
+ bytes[0] = 0;
+ for (int i = 0; i < numBytes; i++) {
+ if (i >= seq.length())
+ bytes[i + 1] = 0;
+ else
+ bytes[i + 1] = seq.byteAt(i);
+ }
+ return bytes;
+ }
+
+ public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) {
+ int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
+ BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
+ BigInteger endBI = new BigInteger(extractBytes(end, maxDepth));
+ BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth));
+ return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
+ }
+
+ public float getProgress(Key currentKey) {
+ if (currentKey == null)
+ return 0f;
+ if (range.getStartKey() != null && range.getEndKey() != null) {
+ if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) {
+ // just look at the row progress
+ return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData());
+ } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) {
+ // just look at the column family progress
+ return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData());
+ } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) {
+ // just look at the column qualifier progress
+ return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData());
+ }
+ }
+ // if we can't figure it out, then claim no progress
+ return 0f;
+ }
+
+ /**
+ * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value.
+ */
+ @Override
+ public long getLength() throws IOException {
+ Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow();
+ Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
+ int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength()));
+ long diff = 0;
+
+ byte[] start = startRow.getBytes();
+ byte[] stop = stopRow.getBytes();
+ for (int i = 0; i < maxCommon; ++i) {
+ diff |= 0xff & (start[i] ^ stop[i]);
+ diff <<= Byte.SIZE;
+ }
+
+ if (startRow.getLength() != stopRow.getLength())
+ diff |= 0xff;
+
+ return diff + 1;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return locations;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ range.readFields(in);
+ int numLocs = in.readInt();
+ locations = new String[numLocs];
+ for (int i = 0; i < numLocs; ++i)
+ locations[i] = in.readUTF();
+
+ if (in.readBoolean()) {
+ isolatedScan = in.readBoolean();
+ }
+
+ if (in.readBoolean()) {
+ offline = in.readBoolean();
+ }
+
+ if (in.readBoolean()) {
+ localIterators = in.readBoolean();
+ }
+
+ if (in.readBoolean()) {
+ mockInstance = in.readBoolean();
+ }
+
+ if (in.readBoolean()) {
+ int numColumns = in.readInt();
+ List<String> columns = new ArrayList<String>(numColumns);
+ for (int i = 0; i < numColumns; i++) {
+ columns.add(in.readUTF());
+ }
+
+ fetchedColumns = InputConfigurator.deserializeFetchedColumns(columns);
+ }
+
+ if (in.readBoolean()) {
+ String strAuths = in.readUTF();
+ auths = new Authorizations(strAuths.getBytes(Charset.forName("UTF-8")));
+ }
+
+ if (in.readBoolean()) {
+ principal = in.readUTF();
+ }
+
+ if (in.readBoolean()) {
+ String tokenClass = in.readUTF();
+ byte[] base64TokenBytes = in.readUTF().getBytes(Charset.forName("UTF-8"));
+ byte[] tokenBytes = Base64.decodeBase64(base64TokenBytes);
+
+ try {
+ token = CredentialHelper.extractToken(tokenClass, tokenBytes);
+ } catch (AccumuloSecurityException e) {
+ throw new IOException(e);
+ }
+ }
+
+ if (in.readBoolean()) {
+ instanceName = in.readUTF();
+ }
+
+ if (in.readBoolean()) {
+ zooKeepers = in.readUTF();
+ }
+
+ if (in.readBoolean()) {
+ level = Level.toLevel(in.readInt());
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ range.write(out);
+ out.writeInt(locations.length);
+ for (int i = 0; i < locations.length; ++i)
+ out.writeUTF(locations[i]);
+
+ out.writeBoolean(null != isolatedScan);
+ if (null != isolatedScan) {
+ out.writeBoolean(isolatedScan);
+ }
+
+ out.writeBoolean(null != offline);
+ if (null != offline) {
+ out.writeBoolean(offline);
+ }
+
+ out.writeBoolean(null != localIterators);
+ if (null != localIterators) {
+ out.writeBoolean(localIterators);
+ }
+
+ out.writeBoolean(null != mockInstance);
+ if (null != mockInstance) {
+ out.writeBoolean(mockInstance);
+ }
+
+ out.writeBoolean(null != fetchedColumns);
+ if (null != fetchedColumns) {
+ String[] cols = InputConfigurator.serializeColumns(fetchedColumns);
+ out.writeInt(cols.length);
+ for (String col : cols) {
+ out.writeUTF(col);
+ }
+ }
+
+ out.writeBoolean(null != auths);
+ if (null != auths) {
+ out.writeUTF(auths.serialize());
+ }
+
+ out.writeBoolean(null != principal);
+ if (null != principal) {
+ out.writeUTF(principal);
+ }
+
+ out.writeBoolean(null != token);
+ if (null != token) {
+ out.writeUTF(token.getClass().getCanonicalName());
+ try {
+ out.writeUTF(CredentialHelper.tokenAsBase64(token));
+ } catch (AccumuloSecurityException e) {
+ throw new IOException(e);
+ }
+ }
+
+ out.writeBoolean(null != instanceName);
+ if (null != instanceName) {
+ out.writeUTF(instanceName);
+ }
+
+ out.writeBoolean(null != zooKeepers);
+ if (null != zooKeepers) {
+ out.writeUTF(zooKeepers);
+ }
+
+ out.writeBoolean(null != level);
+ if (null != level) {
+ out.writeInt(level.toInt());
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(256);
+ sb.append("Range: ").append(range);
- sb.append(" Locations: ").append(locations);
++ sb.append(" Locations: ").append(Arrays.asList(locations));
+ sb.append(" Table: ").append(table);
+ sb.append(" InstanceName: ").append(instanceName);
+ sb.append(" zooKeepers: ").append(zooKeepers);
+ sb.append(" principal: ").append(principal);
+ sb.append(" authenticationToken: ").append(token);
+ sb.append(" Authorizations: ").append(auths);
+ sb.append(" offlineScan: ").append(offline);
+ sb.append(" mockInstance: ").append(mockInstance);
+ sb.append(" isolatedScan: ").append(isolatedScan);
+ sb.append(" localIterators: ").append(localIterators);
+ sb.append(" fetchColumns: ").append(fetchedColumns);
+ sb.append(" iterators: ").append(iterators);
+ sb.append(" logLevel: ").append(level);
+ return sb.toString();
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public Instance getInstance() {
+ if (null == instanceName) {
+ return null;
+ }
+
+ if (isMockInstance()) {
+ return new MockInstance(getInstanceName());
+ }
+
+ if (null == zooKeepers) {
+ return null;
+ }
+
+ return new ZooKeeperInstance(getInstanceName(), getZooKeepers());
+ }
+
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ public void setInstanceName(String instanceName) {
+ this.instanceName = instanceName;
+ }
+
+ public String getZooKeepers() {
+ return zooKeepers;
+ }
+
+ public void setZooKeepers(String zooKeepers) {
+ this.zooKeepers = zooKeepers;
+ }
+
+ public String getPrincipal() {
+ return principal;
+ }
+
+ public void setPrincipal(String principal) {
+ this.principal = principal;
+ }
+
+ public AuthenticationToken getToken() {
+ return token;
+ }
+
+ public void setToken(AuthenticationToken token) {
+ this.token = token;
+ ;
+ }
+
+ public Boolean isOffline() {
+ return offline;
+ }
+
+ public void setOffline(Boolean offline) {
+ this.offline = offline;
+ }
+
+ public void setLocations(String[] locations) {
+ this.locations = locations;
+ }
+
+ public Boolean isMockInstance() {
+ return mockInstance;
+ }
+
+ public void setMockInstance(Boolean mockInstance) {
+ this.mockInstance = mockInstance;
+ }
+
+ public Boolean isIsolatedScan() {
+ return isolatedScan;
+ }
+
+ public void setIsolatedScan(Boolean isolatedScan) {
+ this.isolatedScan = isolatedScan;
+ }
+
+ public Authorizations getAuths() {
+ return auths;
+ }
+
+ public void setAuths(Authorizations auths) {
+ this.auths = auths;
+ }
+
+ public void setRange(Range range) {
+ this.range = range;
+ }
+
+ public Boolean usesLocalIterators() {
+ return localIterators;
+ }
+
+ public void setUsesLocalIterators(Boolean localIterators) {
+ this.localIterators = localIterators;
+ }
+
+ public Set<Pair<Text,Text>> getFetchedColumns() {
+ return fetchedColumns;
+ }
+
+ public void setFetchedColumns(Set<Pair<Text,Text>> fetchedColumns) {
+ this.fetchedColumns = fetchedColumns;
+ }
+
+ public List<IteratorSetting> getIterators() {
+ return iterators;
+ }
+
+ public void setIterators(List<IteratorSetting> iterators) {
+ this.iterators = iterators;
+ }
+
+ public Level getLogLevel() {
+ return level;
+ }
+
+ public void setLogLevel(Level level) {
+ this.level = level;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cb50a743/server/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/Accumulo.java
index 33bb871,0000000..f56dfd8
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
@@@ -1,309 -1,0 +1,309 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.trace.DistributedTrace;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.util.Version;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.log4j.helpers.FileWatchdog;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+public class Accumulo {
+
+ private static final Logger log = Logger.getLogger(Accumulo.class);
+
+ public static synchronized void updateAccumuloVersion(FileSystem fs) {
+ try {
+ if (getAccumuloPersistentVersion(fs) == Constants.PREV_DATA_VERSION) {
+ fs.create(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.DATA_VERSION));
+ fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.PREV_DATA_VERSION), false);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to set accumulo version: an error occurred.", e);
+ }
+ }
+
+ public static synchronized int getAccumuloPersistentVersion(FileSystem fs) {
+ int dataVersion;
+ try {
+ FileStatus[] files = fs.listStatus(ServerConstants.getDataVersionLocation());
+ if (files == null || files.length == 0) {
+ dataVersion = -1; // assume it is 0.5 or earlier
+ } else {
+ dataVersion = Integer.parseInt(files[0].getPath().getName());
+ }
+ return dataVersion;
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to read accumulo version: an error occurred.", e);
+ }
+ }
+
+ public static void enableTracing(String address, String application) {
+ try {
+ DistributedTrace.enable(HdfsZooInstance.getInstance(), ZooReaderWriter.getInstance(), application, address);
+ } catch (Exception ex) {
+ log.error("creating remote sink for trace spans", ex);
+ }
+ }
+
+ private static class LogMonitor extends FileWatchdog implements Watcher {
+ String path;
+
+ protected LogMonitor(String instance, String filename, int delay) {
+ super(filename);
+ setDelay(delay);
+ this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT;
+ }
+
+ private void setMonitorPort() {
+ try {
+ String port = new String(ZooReaderWriter.getInstance().getData(path, null));
+ System.setProperty("org.apache.accumulo.core.host.log.port", port);
+ log.info("Changing monitor log4j port to "+port);
+ doOnChange();
+ } catch (Exception e) {
+ log.error("Error reading zookeeper data for monitor log4j port", e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null)
+ setMonitorPort();
+ log.info("Set watch for monitor log4j port");
+ } catch (Exception e) {
+ log.error("Unable to set watch for monitor log4j port " + path);
+ }
+ super.run();
+ }
+
+ @Override
+ protected void doOnChange() {
+ LogManager.resetConfiguration();
+ new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository());
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ setMonitorPort();
+ if (event.getPath() != null) {
+ try {
+ ZooReaderWriter.getInstance().exists(event.getPath(), this);
+ } catch (Exception ex) {
+ log.error("Unable to reset watch for monitor log4j port", ex);
+ }
+ }
+ }
+ }
+
+ public static void init(FileSystem fs, ServerConfiguration config, String application) throws UnknownHostException {
+
+ System.setProperty("org.apache.accumulo.core.application", application);
+
+ if (System.getenv("ACCUMULO_LOG_DIR") != null)
+ System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_LOG_DIR"));
+ else
+ System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_HOME") + "/logs/");
+
+ String localhost = InetAddress.getLocalHost().getHostName();
+ System.setProperty("org.apache.accumulo.core.ip.localhost.hostname", localhost);
+
+ if (System.getenv("ACCUMULO_LOG_HOST") != null)
+ System.setProperty("org.apache.accumulo.core.host.log", System.getenv("ACCUMULO_LOG_HOST"));
+ else
+ System.setProperty("org.apache.accumulo.core.host.log", localhost);
+
+ int logPort = config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT);
+ System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort));
+
+ // Use a specific log config, if it exists
- String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"), application);
++ String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
+ if (!new File(logConfig).exists()) {
+ // otherwise, use the generic config
+ logConfig = String.format("%s/generic_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
+ }
+ // Turn off messages about not being able to reach the remote logger... we protect against that.
+ LogLog.setQuietMode(true);
+
+ // Configure logging
+ if (logPort==0)
+ new LogMonitor(config.getInstance().getInstanceID(), logConfig, 5000).start();
+ else
+ DOMConfigurator.configureAndWatch(logConfig, 5000);
+
+ log.info(application + " starting");
+ log.info("Instance " + config.getInstance().getInstanceID());
+ int dataVersion = Accumulo.getAccumuloPersistentVersion(fs);
+ log.info("Data Version " + dataVersion);
+ Accumulo.waitForZookeeperAndHdfs(fs);
+
+ Version codeVersion = new Version(Constants.VERSION);
+ if (dataVersion != Constants.DATA_VERSION && dataVersion != Constants.PREV_DATA_VERSION) {
+ throw new RuntimeException("This version of accumulo (" + codeVersion + ") is not compatible with files stored using data version " + dataVersion);
+ }
+
+ TreeMap<String,String> sortedProps = new TreeMap<String,String>();
+ for (Entry<String,String> entry : config.getConfiguration())
+ sortedProps.put(entry.getKey(), entry.getValue());
+
+ for (Entry<String,String> entry : sortedProps.entrySet()) {
+ if (entry.getKey().toLowerCase().contains("password") || entry.getKey().toLowerCase().contains("secret")
+ || entry.getKey().startsWith(Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey()))
+ log.info(entry.getKey() + " = <hidden>");
+ else
+ log.info(entry.getKey() + " = " + entry.getValue());
+ }
+
+ monitorSwappiness();
+ }
+
+ /**
+ *
+ */
+ public static void monitorSwappiness() {
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ String procFile = "/proc/sys/vm/swappiness";
+ File swappiness = new File(procFile);
+ if (swappiness.exists() && swappiness.canRead()) {
+ InputStream is = new FileInputStream(procFile);
+ try {
+ byte[] buffer = new byte[10];
+ int bytes = is.read(buffer);
+ String setting = new String(buffer, 0, bytes);
+ setting = setting.trim();
+ if (bytes > 0 && Integer.parseInt(setting) > 10) {
+ log.warn("System swappiness setting is greater than ten (" + setting + ") which can cause time-sensitive operations to be delayed. "
+ + " Accumulo is time sensitive because it needs to maintain distributed lock agreement.");
+ }
+ } finally {
+ is.close();
+ }
+ }
+ } catch (Throwable t) {
+ log.error(t, t);
+ }
+ }
+ }, 1000, 10 * 60 * 1000);
+ }
+
+ public static String getLocalAddress(String[] args) throws UnknownHostException {
+ InetAddress result = InetAddress.getLocalHost();
+ for (int i = 0; i < args.length - 1; i++) {
+ if (args[i].equals("-a") || args[i].equals("--address")) {
+ result = InetAddress.getByName(args[i + 1]);
+ log.debug("Local address is: " + args[i + 1] + " (" + result.toString() + ")");
+ break;
+ }
+ }
+ return result.getHostName();
+ }
+
+ public static void waitForZookeeperAndHdfs(FileSystem fs) {
+ log.info("Attempting to talk to zookeeper");
+ while (true) {
+ try {
+ ZooReaderWriter.getInstance().getChildren(Constants.ZROOT);
+ break;
+ } catch (InterruptedException e) {
+ // ignored
+ } catch (KeeperException ex) {
+ log.info("Waiting for accumulo to be initialized");
+ UtilWaitThread.sleep(1000);
+ }
+ }
+ log.info("Zookeeper connected and initialized, attemping to talk to HDFS");
+ long sleep = 1000;
+ while (true) {
+ try {
+ if (!isInSafeMode(fs))
+ break;
+ log.warn("Waiting for the NameNode to leave safemode");
+ } catch (IOException ex) {
+ log.warn("Unable to connect to HDFS");
+ }
+ log.info("Sleeping " + sleep / 1000. + " seconds");
+ UtilWaitThread.sleep(sleep);
+ sleep = Math.min(60 * 1000, sleep * 2);
+ }
+ log.info("Connected to HDFS");
+ }
+
+ private static boolean isInSafeMode(FileSystem fs) throws IOException {
+ if (!(fs instanceof DistributedFileSystem))
+ return false;
+ DistributedFileSystem dfs = (DistributedFileSystem)fs;
+ // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
+ // Becomes this:
+ Class<?> safeModeAction;
+ try {
+ // hadoop 2.0
+ safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
+ } catch (ClassNotFoundException ex) {
+ // hadoop 1.0
+ try {
+ safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot figure out the right class for Constants");
+ }
+ }
+ Object get = null;
+ for (Object obj : safeModeAction.getEnumConstants()) {
+ if (obj.toString().equals("SAFEMODE_GET"))
+ get = obj;
+ }
+ if (get == null) {
+ throw new RuntimeException("cannot find SAFEMODE_GET");
+ }
+ try {
+ Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
+ return (Boolean) setSafeMode.invoke(dfs, get);
+ } catch (Exception ex) {
+ throw new RuntimeException("cannot find method setSafeMode");
+ }
+ }
+}
[2/7] ACCUMULO-2160 back-port real bugs found by findbugs
Posted by ec...@apache.org.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/cb50a743/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index aa52834,0000000..c5695bc
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@@ -1,3602 -1,0 +1,3602 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.ScannerImpl;
+import org.apache.accumulo.core.client.impl.TabletType;
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.constraints.Constraint.Environment;
+import org.apache.accumulo.core.constraints.Violations;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.ConstraintViolationSummary;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.InitialMultiScan;
+import org.apache.accumulo.core.data.thrift.InitialScan;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.data.thrift.MapFileInfo;
+import org.apache.accumulo.core.data.thrift.MultiScanResult;
+import org.apache.accumulo.core.data.thrift.ScanResult;
+import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TKey;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.data.thrift.TKeyValue;
+import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.accumulo.core.data.thrift.TRange;
+import org.apache.accumulo.core.data.thrift.UpdateErrors;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.master.thrift.Compacting;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletLoadState;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.ScanState;
+import org.apache.accumulo.core.tabletserver.thrift.ScanType;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.Stat;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.ClientServiceHandler;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.DistributedStoreException;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.accumulo.server.master.state.TabletStateStore;
+import org.apache.accumulo.server.master.state.ZooTabletStateStore;
+import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SecurityConstants;
+import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.tabletserver.Compactor.CompactionInfo;
+import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
+import org.apache.accumulo.server.tabletserver.Tablet.KVEntry;
+import org.apache.accumulo.server.tabletserver.Tablet.LookupResult;
+import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
+import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
+import org.apache.accumulo.server.tabletserver.Tablet.ScanBatch;
+import org.apache.accumulo.server.tabletserver.Tablet.Scanner;
+import org.apache.accumulo.server.tabletserver.Tablet.SplitInfo;
+import org.apache.accumulo.server.tabletserver.Tablet.TConstraintViolationException;
+import org.apache.accumulo.server.tabletserver.Tablet.TabletClosedException;
+import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger;
+import org.apache.accumulo.server.tabletserver.log.LogSorter;
+import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
+import org.apache.accumulo.server.tabletserver.log.TabletServerLogger;
+import org.apache.accumulo.server.tabletserver.mastermessage.MasterMessage;
+import org.apache.accumulo.server.tabletserver.mastermessage.SplitReportMessage;
+import org.apache.accumulo.server.tabletserver.mastermessage.TabletStatusMessage;
+import org.apache.accumulo.server.tabletserver.metrics.TabletServerMBean;
+import org.apache.accumulo.server.tabletserver.metrics.TabletServerMinCMetrics;
+import org.apache.accumulo.server.tabletserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.server.tabletserver.metrics.TabletServerUpdateMetrics;
+import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.accumulo.server.util.FileSystemMonitor;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.MapCounter;
+import org.apache.accumulo.server.util.MetadataTable;
+import org.apache.accumulo.server.util.MetadataTable.LogEntry;
+import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.util.TServerUtils.ServerPort;
+import org.apache.accumulo.server.util.time.RelativeTime;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.start.Platform;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.accumulo.start.classloader.vfs.ContextManager;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
+import org.apache.accumulo.trace.thrift.TInfo;
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TServiceClient;
+import org.apache.thrift.server.TServer;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+enum ScanRunState {
+ QUEUED, RUNNING, FINISHED
+}
+
+public class TabletServer extends AbstractMetricsImpl implements org.apache.accumulo.server.tabletserver.metrics.TabletServerMBean {
+ private static final Logger log = Logger.getLogger(TabletServer.class);
+
+ private static HashMap<String,Long> prevGcTime = new HashMap<String,Long>();
+ private static long lastMemorySize = 0;
+ private static long gcTimeIncreasedCount;
+
+ private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
+
+ private TabletServerLogger logger;
+
+ protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
+
+ private ServerConfiguration serverConfig;
+ private LogSorter logSorter = null;
+
+ public TabletServer(ServerConfiguration conf, FileSystem fs) {
+ super();
+ this.serverConfig = conf;
+ this.instance = conf.getInstance();
+ this.fs = TraceFileSystem.wrap(fs);
+ this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ synchronized (onlineTablets) {
+ long now = System.currentTimeMillis();
+ for (Tablet tablet : onlineTablets.values())
+ try {
+ tablet.updateRates(now);
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ }
+ }
+ }, 5000, 5000);
+ }
+
+ private synchronized static void logGCInfo(AccumuloConfiguration conf) {
+ List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
+ Runtime rt = Runtime.getRuntime();
+
+ StringBuilder sb = new StringBuilder("gc");
+
+ boolean sawChange = false;
+
+ long maxIncreaseInCollectionTime = 0;
+
+ for (GarbageCollectorMXBean gcBean : gcmBeans) {
+ Long prevTime = prevGcTime.get(gcBean.getName());
+ long pt = 0;
+ if (prevTime != null) {
+ pt = prevTime;
+ }
+
+ long time = gcBean.getCollectionTime();
+
+ if (time - pt != 0) {
+ sawChange = true;
+ }
+
+ long increaseInCollectionTime = time - pt;
+ sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0));
+ maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
+ prevGcTime.put(gcBean.getName(), time);
+ }
+
+ long mem = rt.freeMemory();
+ if (maxIncreaseInCollectionTime == 0) {
+ gcTimeIncreasedCount = 0;
+ } else {
+ gcTimeIncreasedCount++;
+ if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
+ log.warn("Running low on memory");
+ gcTimeIncreasedCount = 0;
+ }
+ }
+
+ if (mem > lastMemorySize) {
+ sawChange = true;
+ }
+
+ String sign = "+";
+ if (mem - lastMemorySize <= 0) {
+ sign = "";
+ }
+
+ sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory()));
+
+ if (sawChange) {
+ log.debug(sb.toString());
+ }
+
+ final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
+ if (maxIncreaseInCollectionTime > keepAliveTimeout) {
+ Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1);
+ }
+
+ lastMemorySize = mem;
+ }
+
+ private TabletStatsKeeper statsKeeper;
+
+ private static class Session {
+ long lastAccessTime;
+ long startTime;
+ String user;
+ String client = TServerUtils.clientAddress.get();
+ public boolean reserved;
+
+ public void cleanup() {}
+ }
+
+ private static class SessionManager {
+
+ SecureRandom random;
+ Map<Long,Session> sessions;
+
+ SessionManager(AccumuloConfiguration conf) {
+ random = new SecureRandom();
+ sessions = new HashMap<Long,Session>();
+
+ final long maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ sweep(maxIdle);
+ }
+ };
+
+ SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
+ }
+
+ synchronized long createSession(Session session, boolean reserve) {
+ long sid = random.nextLong();
+
+ while (sessions.containsKey(sid)) {
+ sid = random.nextLong();
+ }
+
+ sessions.put(sid, session);
+
+ session.reserved = reserve;
+
+ session.startTime = session.lastAccessTime = System.currentTimeMillis();
+
+ return sid;
+ }
+
+ /**
+ * while a session is reserved, it cannot be canceled or removed
+ *
+ * @param sessionId
+ */
+
+ synchronized Session reserveSession(long sessionId) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ if (session.reserved)
+ throw new IllegalStateException();
+ session.reserved = true;
+ }
+
+ return session;
+
+ }
+
+ synchronized void unreserveSession(Session session) {
+ if (!session.reserved)
+ throw new IllegalStateException();
+ session.reserved = false;
+ session.lastAccessTime = System.currentTimeMillis();
+ }
+
+ synchronized void unreserveSession(long sessionId) {
+ Session session = getSession(sessionId);
+ if (session != null)
+ unreserveSession(session);
+ }
+
+ synchronized Session getSession(long sessionId) {
+ Session session = sessions.get(sessionId);
+ if (session != null)
+ session.lastAccessTime = System.currentTimeMillis();
+ return session;
+ }
+
+ Session removeSession(long sessionId) {
+ Session session = null;
+ synchronized (this) {
+ session = sessions.remove(sessionId);
+ }
+
+ // do clean up out side of lock..
+ if (session != null)
+ session.cleanup();
+
+ return session;
+ }
+
+ private void sweep(long maxIdle) {
+ ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
+ synchronized (this) {
+ Iterator<Session> iter = sessions.values().iterator();
+ while (iter.hasNext()) {
+ Session session = iter.next();
+ long idleTime = System.currentTimeMillis() - session.lastAccessTime;
+ if (idleTime > maxIdle && !session.reserved) {
+ iter.remove();
+ sessionsToCleanup.add(session);
+ }
+ }
+ }
+
+ // do clean up outside of lock
+ for (Session session : sessionsToCleanup) {
+ session.cleanup();
+ }
+ }
+
+ synchronized void removeIfNotAccessed(final long sessionId, long delay) {
+ Session session = sessions.get(sessionId);
+ if (session != null) {
+ final long removeTime = session.lastAccessTime;
+ TimerTask r = new TimerTask() {
+ @Override
+ public void run() {
+ Session sessionToCleanup = null;
+ synchronized (SessionManager.this) {
+ Session session2 = sessions.get(sessionId);
+ if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
+ sessions.remove(sessionId);
+ sessionToCleanup = session2;
+ }
+ }
+
+ // call clean up outside of lock
+ if (sessionToCleanup != null)
+ sessionToCleanup.cleanup();
+ }
+ };
+
+ SimpleTimer.getInstance().schedule(r, delay);
+ }
+ }
+
+ public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
+ Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
+ for (Entry<Long,Session> entry : sessions.entrySet()) {
+
+ Session session = entry.getValue();
+ @SuppressWarnings("rawtypes")
+ ScanTask nbt = null;
+ String tableID = null;
+
+ if (session instanceof ScanSession) {
+ ScanSession ss = (ScanSession) session;
+ nbt = ss.nextBatchTask;
+ tableID = ss.extent.getTableId().toString();
+ } else if (session instanceof MultiScanSession) {
+ MultiScanSession mss = (MultiScanSession) session;
+ nbt = mss.lookupTask;
+ tableID = mss.threadPoolExtent.getTableId().toString();
+ }
+
+ if (nbt == null)
+ continue;
+
+ ScanRunState srs = nbt.getScanRunState();
+
+ if (nbt == null || srs == ScanRunState.FINISHED)
+ continue;
+
+ MapCounter<ScanRunState> stateCounts = counts.get(tableID);
+ if (stateCounts == null) {
+ stateCounts = new MapCounter<ScanRunState>();
+ counts.put(tableID, stateCounts);
+ }
+
+ stateCounts.increment(srs, 1);
+ }
+
+ return counts;
+ }
+
+ public synchronized List<ActiveScan> getActiveScans() {
+
+ ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
+
+ long ct = System.currentTimeMillis();
+
+ for (Entry<Long,Session> entry : sessions.entrySet()) {
+ Session session = entry.getValue();
+ if (session instanceof ScanSession) {
+ ScanSession ss = (ScanSession) session;
+
+ ScanState state = ScanState.RUNNING;
+
+ ScanTask<ScanBatch> nbt = ss.nextBatchTask;
+ if (nbt == null) {
+ state = ScanState.IDLE;
+ } else {
+ switch (nbt.getScanRunState()) {
+ case QUEUED:
+ state = ScanState.QUEUED;
+ break;
+ case FINISHED:
+ state = ScanState.IDLE;
+ break;
+ case RUNNING:
+ default:
+ /* do nothing */
+ break;
+ }
+ }
+
+ activeScans.add(new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
+ state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translator.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
+
+ } else if (session instanceof MultiScanSession) {
+ MultiScanSession mss = (MultiScanSession) session;
+
+ ScanState state = ScanState.RUNNING;
+
+ ScanTask<MultiScanResult> nbt = mss.lookupTask;
+ if (nbt == null) {
+ state = ScanState.IDLE;
+ } else {
+ switch (nbt.getScanRunState()) {
+ case QUEUED:
+ state = ScanState.QUEUED;
+ break;
+ case FINISHED:
+ state = ScanState.IDLE;
+ break;
+ case RUNNING:
+ default:
+ /* do nothing */
+ break;
+ }
+ }
+
+ activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
+ ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translator.CT), mss.ssiList, mss.ssio, mss.auths
+ .getAuthorizationsBB()));
+ }
+ }
+
+ return activeScans;
+ }
+ }
+
+ static class TservConstraintEnv implements Environment {
+
+ private TCredentials credentials;
+ private SecurityOperation security;
+ private Authorizations auths;
+ private KeyExtent ke;
+
+ TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
+ this.security = secOp;
+ this.credentials = credentials;
+ }
+
+ void setExtent(KeyExtent ke) {
+ this.ke = ke;
+ }
+
+ @Override
+ public KeyExtent getExtent() {
+ return ke;
+ }
+
+ @Override
+ public String getUser() {
+ return credentials.getPrincipal();
+ }
+
+ @Override
+ public Authorizations getAuthorizations() {
+ if (auths == null)
+ try {
+ this.auths = security.getUserAuthorizations(credentials);
+ } catch (ThriftSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ return auths;
+ }
+
+ }
+
+ private abstract class ScanTask<T> implements RunnableFuture<T> {
+
+ protected AtomicBoolean interruptFlag;
+ protected ArrayBlockingQueue<Object> resultQueue;
+ protected AtomicInteger state;
+ protected AtomicReference<ScanRunState> runState;
+
+ private static final int INITIAL = 1;
+ private static final int ADDED = 2;
+ private static final int CANCELED = 3;
+
+ ScanTask() {
+ interruptFlag = new AtomicBoolean(false);
+ runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
+ state = new AtomicInteger(INITIAL);
+ resultQueue = new ArrayBlockingQueue<Object>(1);
+ }
+
+ protected void addResult(Object o) {
+ if (state.compareAndSet(INITIAL, ADDED))
+ resultQueue.add(o);
+ else if (state.get() == ADDED)
+ throw new IllegalStateException("Tried to add more than one result");
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (!mayInterruptIfRunning)
+ throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
+
+ if (state.get() == CANCELED)
+ return true;
+
+ if (state.compareAndSet(INITIAL, CANCELED)) {
+ interruptFlag.set(true);
+ resultQueue = null;
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ throw new UnsupportedOperationException();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+
+ ArrayBlockingQueue<Object> localRQ = resultQueue;
+
+ if (state.get() == CANCELED)
+ throw new CancellationException();
+
+ if (localRQ == null && state.get() == ADDED)
+ throw new IllegalStateException("Tried to get result twice");
+
+ Object r = localRQ.poll(timeout, unit);
+
+ // could have been canceled while waiting
+ if (state.get() == CANCELED) {
+ if (r != null)
+ throw new IllegalStateException("Nothing should have been added when in canceled state");
+
+ throw new CancellationException();
+ }
+
+ if (r == null)
+ throw new TimeoutException();
+
+ // make this method stop working now that something is being
+ // returned
+ resultQueue = null;
+
+ if (r instanceof Throwable)
+ throw new ExecutionException((Throwable) r);
+
+ return (T) r;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return state.get() == CANCELED;
+ }
+
+ @Override
+ public boolean isDone() {
+ return runState.get().equals(ScanRunState.FINISHED);
+ }
+
+ public ScanRunState getScanRunState() {
+ return runState.get();
+ }
+
+ }
+
+ private static class UpdateSession extends Session {
+ public Tablet currentTablet;
+ public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
+ Map<KeyExtent,Long> failures = new HashMap<KeyExtent,Long>();
+ HashMap<KeyExtent,SecurityErrorCode> authFailures = new HashMap<KeyExtent,SecurityErrorCode>();
+ public Violations violations;
+ public TCredentials credentials;
+ public long totalUpdates = 0;
+ public long flushTime = 0;
+ Stat prepareTimes = new Stat();
+ Stat walogTimes = new Stat();
+ Stat commitTimes = new Stat();
+ Stat authTimes = new Stat();
+ public Map<Tablet,List<Mutation>> queuedMutations = new HashMap<Tablet,List<Mutation>>();
+ public long queuedMutationSize = 0;
+ TservConstraintEnv cenv = null;
+ }
+
+ private static class ScanSession extends Session {
+ public KeyExtent extent;
+ public HashSet<Column> columnSet;
+ public List<IterInfo> ssiList;
+ public Map<String,Map<String,String>> ssio;
+ public Authorizations auths;
+ public long entriesReturned = 0;
+ public Stat nbTimes = new Stat();
+ public long batchCount = 0;
+ public volatile ScanTask<ScanBatch> nextBatchTask;
+ public AtomicBoolean interruptFlag;
+ public Scanner scanner;
+
+ @Override
+ public void cleanup() {
+ try {
+ if (nextBatchTask != null)
+ nextBatchTask.cancel(true);
+ } finally {
+ if (scanner != null)
+ scanner.close();
+ }
+ }
+
+ }
+
+ private static class MultiScanSession extends Session {
+ HashSet<Column> columnSet;
+ Map<KeyExtent,List<Range>> queries;
+ public List<IterInfo> ssiList;
+ public Map<String,Map<String,String>> ssio;
+ public Authorizations auths;
+
+ // stats
+ int numRanges;
+ int numTablets;
+ int numEntries;
+ long totalLookupTime;
+
+ public volatile ScanTask<MultiScanResult> lookupTask;
+ public KeyExtent threadPoolExtent;
+
+ @Override
+ public void cleanup() {
+ if (lookupTask != null)
+ lookupTask.cancel(true);
+ }
+ }
+
+ /**
+ * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
+ * are monotonically increasing.
+ *
+ */
+ static class WriteTracker {
+ private static AtomicLong operationCounter = new AtomicLong(1);
+ private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
+
+ WriteTracker() {
+ for (TabletType ttype : TabletType.values()) {
+ inProgressWrites.put(ttype, new TreeSet<Long>());
+ }
+ }
+
+ synchronized long startWrite(TabletType ttype) {
+ long operationId = operationCounter.getAndIncrement();
+ inProgressWrites.get(ttype).add(operationId);
+ return operationId;
+ }
+
+ synchronized void finishWrite(long operationId) {
+ if (operationId == -1)
+ return;
+
+ boolean removed = false;
+
+ for (TabletType ttype : TabletType.values()) {
+ removed = inProgressWrites.get(ttype).remove(operationId);
+ if (removed)
+ break;
+ }
+
+ if (!removed) {
+ throw new IllegalArgumentException("Attempted to finish write not in progress, operationId " + operationId);
+ }
+
+ this.notifyAll();
+ }
+
+ synchronized void waitForWrites(TabletType ttype) {
+ long operationId = operationCounter.getAndIncrement();
+ while (inProgressWrites.get(ttype).floor(operationId) != null) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ }
+ }
+ }
+
+ public long startWrite(Set<Tablet> keySet) {
+ if (keySet.size() == 0)
+ return -1;
+
+ ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
+
+ for (Tablet tablet : keySet)
+ extents.add(tablet.getExtent());
+
+ return startWrite(TabletType.type(extents));
+ }
+ }
+
+ public AccumuloConfiguration getSystemConfiguration() {
+ return serverConfig.getConfiguration();
+ }
+
+ TransactionWatcher watcher = new TransactionWatcher();
+
+ private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
+
+ SessionManager sessionManager;
+
+ AccumuloConfiguration acuConf = getSystemConfiguration();
+
+ TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics();
+
+ TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics();
+
+ WriteTracker writeTracker = new WriteTracker();
+
+ ThriftClientHandler() {
+ super(instance, watcher);
+ log.debug(ThriftClientHandler.class.getName() + " created");
+ sessionManager = new SessionManager(getSystemConfiguration());
+ // Register the metrics MBean
+ try {
+ updateMetrics.register();
+ scanMetrics.register();
+ } catch (Exception e) {
+ log.error("Exception registering MBean with MBean Server", e);
+ }
+ }
+
+ @Override
+ public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
+ throws ThriftSecurityException {
+
+ if (!security.canPerformSystemActions(credentials))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+
+ List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
+
+ for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
+ TKeyExtent tke = entry.getKey();
+ Map<String,MapFileInfo> fileMap = entry.getValue();
+
+ Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
+
+ if (importTablet == null) {
+ failures.add(tke);
+ } else {
+ try {
+ importTablet.importMapFiles(tid, fileMap, setTime);
+ } catch (IOException ioe) {
+ log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
+ failures.add(tke);
+ }
+ }
+ }
+ return failures;
+ }
+
+ private class NextBatchTask extends ScanTask<ScanBatch> {
+
+ private long scanID;
+
+ NextBatchTask(long scanID, AtomicBoolean interruptFlag) {
+ this.scanID = scanID;
+ this.interruptFlag = interruptFlag;
+
+ if (interruptFlag.get())
+ cancel(true);
+ }
+
+ @Override
+ public void run() {
+
+ final ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
+ String oldThreadName = Thread.currentThread().getName();
+
+ try {
+ if (isCancelled() || scanSession == null)
+ return;
+
+ runState.set(ScanRunState.RUNNING);
+
+ Thread.currentThread().setName(
+ "User: " + scanSession.user + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent);
+
+ Tablet tablet = onlineTablets.get(scanSession.extent);
+
+ if (tablet == null) {
+ addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
+ return;
+ }
+
+ long t1 = System.currentTimeMillis();
+ ScanBatch batch = scanSession.scanner.read();
+ long t2 = System.currentTimeMillis();
+ scanSession.nbTimes.addStat(t2 - t1);
+
+ // there should only be one thing on the queue at a time, so
+ // it should be ok to call add()
+ // instead of put()... if add() fails because queue is at
+ // capacity it means there is code
+ // problem somewhere
+ addResult(batch);
+ } catch (TabletClosedException e) {
+ addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
+ } catch (IterationInterruptedException iie) {
+ if (!isCancelled()) {
+ log.warn("Iteration interrupted, when scan not cancelled", iie);
+ addResult(iie);
+ }
+ } catch (TooManyFilesException tmfe) {
+ addResult(tmfe);
+ } catch (Throwable e) {
+ log.warn("exception while scanning tablet " + (scanSession == null ? "(unknown)" : scanSession.extent), e);
+ addResult(e);
+ } finally {
+ runState.set(ScanRunState.FINISHED);
+ Thread.currentThread().setName(oldThreadName);
+ }
+
+ }
+ }
+
+ private class LookupTask extends ScanTask<MultiScanResult> {
+
+ private long scanID;
+
+ LookupTask(long scanID) {
+ this.scanID = scanID;
+ }
+
+ @Override
+ public void run() {
+ MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID);
+ String oldThreadName = Thread.currentThread().getName();
+
+ try {
+ if (isCancelled() || session == null)
+ return;
+
+ TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString());
+ long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
+
+ runState.set(ScanRunState.RUNNING);
+ Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
+
+ long bytesAdded = 0;
+ long maxScanTime = 4000;
+
+ long startTime = System.currentTimeMillis();
+
+ ArrayList<KVEntry> results = new ArrayList<KVEntry>();
+ Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
+ ArrayList<KeyExtent> fullScans = new ArrayList<KeyExtent>();
+ KeyExtent partScan = null;
+ Key partNextKey = null;
+ boolean partNextKeyInclusive = false;
+
+ Iterator<Entry<KeyExtent,List<Range>>> iter = session.queries.entrySet().iterator();
+
+ // check the time so that the read ahead thread is not monopolized
+ while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) {
+ Entry<KeyExtent,List<Range>> entry = iter.next();
+
+ iter.remove();
+
+ // check that tablet server is serving requested tablet
+ Tablet tablet = onlineTablets.get(entry.getKey());
+ if (tablet == null) {
+ failures.put(entry.getKey(), entry.getValue());
+ continue;
+ }
+ Thread.currentThread().setName(
+ "Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString());
+
+ LookupResult lookupResult;
+ try {
+
+ // do the following check to avoid a race condition
+ // between setting false below and the task being
+ // canceled
+ if (isCancelled())
+ interruptFlag.set(true);
+
+ lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList,
+ session.ssio, interruptFlag);
+
+ // if the tablet was closed it it possible that the
+ // interrupt flag was set.... do not want it set for
+ // the next
+ // lookup
+ interruptFlag.set(false);
+
+ } catch (IOException e) {
+ log.warn("lookup failed for tablet " + entry.getKey(), e);
+ throw new RuntimeException(e);
+ }
+
+ bytesAdded += lookupResult.bytesAdded;
+
+ if (lookupResult.unfinishedRanges.size() > 0) {
+ if (lookupResult.closed) {
+ failures.put(entry.getKey(), lookupResult.unfinishedRanges);
+ } else {
+ session.queries.put(entry.getKey(), lookupResult.unfinishedRanges);
+ partScan = entry.getKey();
+ partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey();
+ partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive();
+ }
+ } else {
+ fullScans.add(entry.getKey());
+ }
+ }
+
+ long finishTime = System.currentTimeMillis();
+ session.totalLookupTime += (finishTime - startTime);
+ session.numEntries += results.size();
+
+ // convert everything to thrift before adding result
+ List<TKeyValue> retResults = new ArrayList<TKeyValue>();
+ for (KVEntry entry : results)
+ retResults.add(new TKeyValue(entry.key.toThrift(), ByteBuffer.wrap(entry.value)));
+ Map<TKeyExtent,List<TRange>> retFailures = Translator.translate(failures, Translator.KET, new Translator.ListTranslator<Range,TRange>(Translator.RT));
+ List<TKeyExtent> retFullScans = Translator.translate(fullScans, Translator.KET);
+ TKeyExtent retPartScan = null;
+ TKey retPartNextKey = null;
+ if (partScan != null) {
+ retPartScan = partScan.toThrift();
+ retPartNextKey = partNextKey.toThrift();
+ }
+ // add results to queue
+ addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan, retPartNextKey, partNextKeyInclusive, session.queries.size() != 0));
+ } catch (IterationInterruptedException iie) {
+ if (!isCancelled()) {
+ log.warn("Iteration interrupted, when scan not cancelled", iie);
+ addResult(iie);
+ }
+ } catch (Throwable e) {
+ log.warn("exception while doing multi-scan ", e);
+ addResult(e);
+ } finally {
+ Thread.currentThread().setName(oldThreadName);
+ runState.set(ScanRunState.FINISHED);
+ }
+ }
+ }
+
+ @Override
+ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
+ List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated)
+ throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+
+ Authorizations userauths = null;
+ if (!security.canScan(credentials, new String(textent.getTable())))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+
+ userauths = security.getUserAuthorizations(credentials);
+ for (ByteBuffer auth : authorizations)
+ if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+
+ KeyExtent extent = new KeyExtent(textent);
+
+ // wait for any writes that are in flight.. this done to ensure
+ // consistency across client restarts... assume a client writes
+ // to accumulo and dies while waiting for a confirmation from
+ // accumulo... the client process restarts and tries to read
+ // data from accumulo making the assumption that it will get
+ // any writes previously made... however if the server side thread
+ // processing the write from the dead client is still in progress,
+ // the restarted client may not see the write unless we wait here.
+ // this behavior is very important when the client is reading the
+ // !METADATA table
+ if (waitForWrites)
+ writeTracker.waitForWrites(TabletType.type(extent));
+
+ Tablet tablet = onlineTablets.get(extent);
+ if (tablet == null)
+ throw new NotServingTabletException(textent);
+
+ ScanSession scanSession = new ScanSession();
+ scanSession.user = credentials.getPrincipal();
+ scanSession.extent = new KeyExtent(extent);
+ scanSession.columnSet = new HashSet<Column>();
+ scanSession.ssiList = ssiList;
+ scanSession.ssio = ssio;
+ scanSession.auths = new Authorizations(authorizations);
+ scanSession.interruptFlag = new AtomicBoolean();
+
+ for (TColumn tcolumn : columns) {
+ scanSession.columnSet.add(new Column(tcolumn));
+ }
+
+ scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated,
+ scanSession.interruptFlag);
+
+ long sid = sessionManager.createSession(scanSession, true);
+
+ ScanResult scanResult;
+ try {
+ scanResult = continueScan(tinfo, sid, scanSession);
+ } catch (NoSuchScanIDException e) {
+ log.error("The impossible happened", e);
+ throw new RuntimeException();
+ } finally {
+ sessionManager.unreserveSession(sid);
+ }
+
+ return new InitialScan(sid, scanResult);
+ }
+
+ @Override
+ public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException,
+ org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+ ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID);
+ if (scanSession == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ try {
+ return continueScan(tinfo, scanID, scanSession);
+ } finally {
+ sessionManager.unreserveSession(scanSession);
+ }
+ }
+
+ private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException,
+ org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+
+ if (scanSession.nextBatchTask == null) {
+ scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
+ resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
+ }
+
+ ScanBatch bresult;
+ try {
+ bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+ scanSession.nextBatchTask = null;
+ } catch (ExecutionException e) {
+ sessionManager.removeSession(scanID);
+ if (e.getCause() instanceof NotServingTabletException)
+ throw (NotServingTabletException) e.getCause();
+ else if (e.getCause() instanceof TooManyFilesException)
+ throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(scanSession.extent.toThrift());
+ else
+ throw new RuntimeException(e);
+ } catch (CancellationException ce) {
+ sessionManager.removeSession(scanID);
+ Tablet tablet = onlineTablets.get(scanSession.extent);
+ if (tablet == null || tablet.isClosed())
+ throw new NotServingTabletException(scanSession.extent.toThrift());
+ else
+ throw new NoSuchScanIDException();
+ } catch (TimeoutException e) {
+ List<TKeyValue> param = Collections.emptyList();
+ long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+ sessionManager.removeIfNotAccessed(scanID, timeout);
+ return new ScanResult(param, true);
+ } catch (Throwable t) {
+ sessionManager.removeSession(scanID);
+ log.warn("Failed to get next batch", t);
+ throw new RuntimeException(t);
+ }
+
+ ScanResult scanResult = new ScanResult(Key.compress(bresult.results), bresult.more);
+
+ scanSession.entriesReturned += scanResult.results.size();
+
+ scanSession.batchCount++;
+
+ if (scanResult.more && scanSession.batchCount > 3) {
+ // start reading next batch while current batch is transmitted
+ // to client
+ scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
+ resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
+ }
+
+ if (!scanResult.more)
+ closeScan(tinfo, scanID);
+
+ return scanResult;
+ }
+
+ @Override
+ public void closeScan(TInfo tinfo, long scanID) {
+ ScanSession ss = (ScanSession) sessionManager.removeSession(scanID);
+ if (ss != null) {
+ long t2 = System.currentTimeMillis();
+
+ log.debug(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", TServerUtils.clientAddress.get(), ss.extent.getTableId()
+ .toString(), ss.entriesReturned, (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
+ if (scanMetrics.isEnabled()) {
+ scanMetrics.add(TabletServerScanMetrics.scan, t2 - ss.startTime);
+ scanMetrics.add(TabletServerScanMetrics.resultSize, ss.entriesReturned);
+ }
+ }
+ }
+
+ @Override
+ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns,
+ List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws ThriftSecurityException {
+ // find all of the tables that need to be scanned
+ HashSet<String> tables = new HashSet<String>();
+ for (TKeyExtent keyExtent : tbatch.keySet()) {
+ tables.add(new String(keyExtent.getTable()));
+ }
+
+ // check if user has permission to the tables
+ Authorizations userauths = null;
+ for (String table : tables)
+ if (!security.canScan(credentials, table))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+
+ userauths = security.getUserAuthorizations(credentials);
+ for (ByteBuffer auth : authorizations)
+ if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+
+ KeyExtent threadPoolExtent = null;
+
+ Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, Translator.TKET, new Translator.ListTranslator<TRange,Range>(Translator.TRT));
+
+ for (KeyExtent keyExtent : batch.keySet()) {
+ if (threadPoolExtent == null) {
+ threadPoolExtent = keyExtent;
+ } else if (keyExtent.isRootTablet()) {
+ throw new IllegalArgumentException("Cannot batch query root tablet with other tablets " + threadPoolExtent + " " + keyExtent);
+ } else if (keyExtent.isMeta() && !threadPoolExtent.isMeta()) {
+ throw new IllegalArgumentException("Cannot batch query !METADATA and non !METADATA tablets " + threadPoolExtent + " " + keyExtent);
+ }
+
+ }
+
+ if (waitForWrites)
+ writeTracker.waitForWrites(TabletType.type(batch.keySet()));
+
+ MultiScanSession mss = new MultiScanSession();
+ mss.user = credentials.getPrincipal();
+ mss.queries = batch;
+ mss.columnSet = new HashSet<Column>(tcolumns.size());
+ mss.ssiList = ssiList;
+ mss.ssio = ssio;
+ mss.auths = new Authorizations(authorizations);
+
+ mss.numTablets = batch.size();
+ for (List<Range> ranges : batch.values()) {
+ mss.numRanges += ranges.size();
+ }
+
+ for (TColumn tcolumn : tcolumns)
+ mss.columnSet.add(new Column(tcolumn));
+
+ mss.threadPoolExtent = threadPoolExtent;
+
+ long sid = sessionManager.createSession(mss, true);
+
+ MultiScanResult result;
+ try {
+ result = continueMultiScan(tinfo, sid, mss);
+ } catch (NoSuchScanIDException e) {
+ log.error("the impossible happened", e);
+ throw new RuntimeException("the impossible happened", e);
+ } finally {
+ sessionManager.unreserveSession(sid);
+ }
+
+ return new InitialMultiScan(sid, result);
+ }
+
+ @Override
+ public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
+
+ MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID);
+
+ if (session == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ try {
+ return continueMultiScan(tinfo, scanID, session);
+ } finally {
+ sessionManager.unreserveSession(session);
+ }
+ }
+
+ private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException {
+
+ if (session.lookupTask == null) {
+ session.lookupTask = new LookupTask(scanID);
+ resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
+ }
+
+ try {
+ MultiScanResult scanResult = session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+ session.lookupTask = null;
+ return scanResult;
+ } catch (TimeoutException e1) {
+ long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+ sessionManager.removeIfNotAccessed(scanID, timeout);
+ List<TKeyValue> results = Collections.emptyList();
+ Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
+ List<TKeyExtent> fullScans = Collections.emptyList();
+ return new MultiScanResult(results, failures, fullScans, null, null, false, true);
+ } catch (Throwable t) {
+ sessionManager.removeSession(scanID);
+ log.warn("Failed to get multiscan result", t);
+ throw new RuntimeException(t);
+ }
+ }
+
+ @Override
+ public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
+ MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID);
+ if (session == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ long t2 = System.currentTimeMillis();
+ log.debug(String.format("MultiScanSess %s %,d entries in %.2f secs (lookup_time:%.2f secs tablets:%,d ranges:%,d) ", TServerUtils.clientAddress.get(),
+ session.numEntries, (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
+ }
+
+ @Override
+ public long startUpdate(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException {
+ // Make sure user is real
+
+ security.authenticateUser(credentials, credentials);
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
+
+ UpdateSession us = new UpdateSession();
+ us.violations = new Violations();
+ us.credentials = credentials;
+ us.cenv = new TservConstraintEnv(security, us.credentials);
+
+ long sid = sessionManager.createSession(us, false);
+
+ return sid;
+ }
+
+ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
+ long t1 = System.currentTimeMillis();
+ if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent))
+ return;
+ if (us.currentTablet == null && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
+ // if there were previous failures, then do not accept additional writes
+ return;
+ }
+
+ try {
+ // if user has no permission to write to this table, add it to
+ // the failures list
+ boolean sameTable = us.currentTablet != null && (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId()));
+ if (sameTable || security.canWrite(us.credentials, keyExtent.getTableId().toString())) {
+ long t2 = System.currentTimeMillis();
+ us.authTimes.addStat(t2 - t1);
+ us.currentTablet = onlineTablets.get(keyExtent);
+ if (us.currentTablet != null) {
+ us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
+ } else {
+ // not serving tablet, so report all mutations as
+ // failures
+ us.failures.put(keyExtent, 0l);
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.unknownTabletErrors, 0);
+ }
+ } else {
+ log.warn("Denying access to table " + keyExtent.getTableId() + " for user " + us.credentials.getPrincipal());
+ long t2 = System.currentTimeMillis();
+ us.authTimes.addStat(t2 - t1);
+ us.currentTablet = null;
+ us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED);
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
+ return;
+ }
+ } catch (ThriftSecurityException e) {
+ log.error("Denying permission to check user " + us.credentials.getPrincipal() + " with user " + e.getUser(), e);
+ long t2 = System.currentTimeMillis();
+ us.authTimes.addStat(t2 - t1);
+ us.currentTablet = null;
+ us.authFailures.put(keyExtent, e.getCode());
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
+ return;
+ }
+ }
+
+ @Override
+ public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, List<TMutation> tmutations) {
+ UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID);
+ if (us == null) {
+ throw new RuntimeException("No Such SessionID");
+ }
+
+ try {
+ KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+ setUpdateTablet(us, keyExtent);
+
+ if (us.currentTablet != null) {
+ List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
+ for (TMutation tmutation : tmutations) {
+ Mutation mutation = new ServerMutation(tmutation);
+ mutations.add(mutation);
+ us.queuedMutationSize += mutation.numBytes();
+ }
+ if (us.queuedMutationSize > getSystemConfiguration().getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX))
+ flush(us);
+ }
+ } finally {
+ sessionManager.unreserveSession(us);
+ }
+ }
+
+ private void flush(UpdateSession us) {
+
+ int mutationCount = 0;
+ Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
+ Throwable error = null;
+
+ long pt1 = System.currentTimeMillis();
+
+ boolean containsMetadataTablet = false;
+ for (Tablet tablet : us.queuedMutations.keySet())
+ if (tablet.getExtent().isMeta())
+ containsMetadataTablet = true;
+
+ if (!containsMetadataTablet && us.queuedMutations.size() > 0)
+ TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
+
+ Span prep = Trace.start("prep");
+ for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
+
+ Tablet tablet = entry.getKey();
+ List<Mutation> mutations = entry.getValue();
+ if (mutations.size() > 0) {
+ try {
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size());
+
+ CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations);
+ if (commitSession == null) {
+ if (us.currentTablet == tablet) {
+ us.currentTablet = null;
+ }
+ us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
+ } else {
+ sendables.put(commitSession, mutations);
+ mutationCount += mutations.size();
+ }
+
+ } catch (TConstraintViolationException e) {
+ us.violations.add(e.getViolations());
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0);
+
+ if (e.getNonViolators().size() > 0) {
+ // only log and commit mutations if there were some
+ // that did not
+ // violate constraints... this is what
+ // prepareMutationsForCommit()
+ // expects
+ sendables.put(e.getCommitSession(), e.getNonViolators());
+ }
+
+ mutationCount += mutations.size();
+
+ } catch (HoldTimeoutException t) {
+ error = t;
+ log.debug("Giving up on mutations due to a long memory hold time");
+ break;
+ } catch (Throwable t) {
+ error = t;
+ log.error("Unexpected error preparing for commit", error);
+ break;
+ }
+ }
+ }
+ prep.stop();
+
+ Span wal = Trace.start("wal");
+ long pt2 = System.currentTimeMillis();
+ long avgPrepareTime = (long) ((pt2 - pt1) / (double) us.queuedMutations.size());
+ us.prepareTimes.addStat(pt2 - pt1);
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (avgPrepareTime));
+
+ if (error != null) {
+ for (Entry<CommitSession,List<Mutation>> e : sendables.entrySet()) {
+ e.getKey().abortCommit(e.getValue());
+ }
+ throw new RuntimeException(error);
+ }
+ try {
+ while (true) {
+ try {
+ long t1 = System.currentTimeMillis();
+
+ logger.logManyTablets(sendables);
+
+ long t2 = System.currentTimeMillis();
+ us.walogTimes.addStat(t2 - t1);
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, (t2 - t1));
+
+ break;
+ } catch (IOException ex) {
+ log.warn("logging mutations failed, retrying");
+ } catch (Throwable t) {
+ log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
+ throw new RuntimeException(t);
+ }
+ }
+
+ wal.stop();
+
+ Span commit = Trace.start("commit");
+ long t1 = System.currentTimeMillis();
+ for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
+ CommitSession commitSession = entry.getKey();
+ List<Mutation> mutations = entry.getValue();
+
+ commitSession.commit(mutations);
+
+ Tablet tablet = commitSession.getTablet();
+
+ if (tablet == us.currentTablet) {
+ // because constraint violations may filter out some
+ // mutations, for proper
+ // accounting with the client code, need to increment
+ // the count based
+ // on the original number of mutations from the client
+ // NOT the filtered number
+ us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size());
+ }
+ }
+ long t2 = System.currentTimeMillis();
+
+ long avgCommitTime = (long) ((t2 - t1) / (double) sendables.size());
+
+ us.flushTime += (t2 - pt1);
+ us.commitTimes.addStat(t2 - t1);
+
+ if (updateMetrics.isEnabled())
+ updateMetrics.add(TabletServerUpdateMetrics.commitTime, avgCommitTime);
+ commit.stop();
+ } finally {
+ us.queuedMutations.clear();
+ if (us.currentTablet != null) {
+ us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
+ }
+ us.queuedMutationSize = 0;
+ }
+ us.totalUpdates += mutationCount;
+ }
+
+ @Override
+ public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
+ UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID);
+ if (us == null) {
+ throw new NoSuchScanIDException();
+ }
+
+ // clients may or may not see data from an update session while
+ // it is in progress, however when the update session is closed
+ // want to ensure that reads wait for the write to finish
+ long opid = writeTracker.startWrite(us.queuedMutations.keySet());
+
+ try {
+ flush(us);
+ } finally {
+ writeTracker.finishWrite(opid);
+ }
+
+ log.debug(String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", TServerUtils.clientAddress.get(), us.totalUpdates,
+ (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(), us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
+ us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0));
+ if (us.failures.size() > 0) {
+ Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
+ log.debug(String.format("Failures: %d, first extent %s successful commits: %d", us.failures.size(), first.getKey().toString(), first.getValue()));
+ }
+ List<ConstraintViolationSummary> violations = us.violations.asList();
+ if (violations.size() > 0) {
+ ConstraintViolationSummary first = us.violations.asList().iterator().next();
+ log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(), first.violationDescription, first.numberOfViolatingMutations));
+ }
+ if (us.authFailures.size() > 0) {
+ KeyExtent first = us.authFailures.keySet().iterator().next();
+ log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(), first.toString()));
+ }
+
+ return new UpdateErrors(Translator.translate(us.failures, Translator.KET), Translator.translate(violations, Translator.CVST), Translator.translate(
+ us.authFailures, Translator.KET));
+ }
+
+ @Override
+ public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, TMutation tmutation) throws NotServingTabletException,
+ ConstraintViolationException, ThriftSecurityException {
+
+ if (!security.canWrite(credentials, new String(tkeyExtent.getTable())))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+ KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+ Tablet tablet = onlineTablets.get(new KeyExtent(keyExtent));
+ if (tablet == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+
+ if (!keyExtent.isMeta())
+ TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
+
+ long opid = writeTracker.startWrite(TabletType.type(keyExtent));
+
+ try {
+ Mutation mutation = new ServerMutation(tmutation);
+ List<Mutation> mutations = Collections.singletonList(mutation);
+
+ Span prep = Trace.start("prep");
+ CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations);
+ prep.stop();
+ if (cs == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+
+ while (true) {
+ try {
+ Span wal = Trace.start("wal");
+ logger.log(cs, cs.getWALogSeq(), mutation);
+ wal.stop();
+ break;
+ } catch (IOException ex) {
+ log.warn(ex, ex);
+ }
+ }
+
+ Span commit = Trace.start("commit");
+ cs.commit(mutations);
+ commit.stop();
+ } catch (TConstraintViolationException e) {
+ throw new ConstraintViolationException(Translator.translate(e.getViolations().asList(), Translator.CVST));
+ } finally {
+ writeTracker.finishWrite(opid);
+ }
+ }
+
+ @Override
+ public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint)
+ throws NotServingTabletException, ThriftSecurityException {
+
+ String tableId = new String(ByteBufferUtil.toBytes(tkeyExtent.table));
+ if (!security.canSplitTablet(credentials, tableId))
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+
+ KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+
+ Tablet tablet = onlineTablets.get(keyExtent);
+ if (tablet == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+
+ if (keyExtent.getEndRow() == null || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
+ try {
+ if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
+ throw new NotServingTabletException(tkeyExtent);
+ }
+ } catch (IOException e) {
+ log.warn("Failed to split " + keyExtent, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
+ return getStats(sessionManager.getActiveScansPerTable());
+ }
+
+ @Override
+ public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException {
+ TreeMap<KeyExtent,Tablet> onlineTabletsCopy;
+ synchronized (onlineTablets) {
+ onlineTabletsCopy = new TreeMap<KeyExtent,Tablet>(onlineTablets);
+ }
+ List<TabletStats> result = new ArrayList<TabletStats>();
+ Text text = new Text(tableId);
+ KeyExtent start = new KeyExtent(text, new Text(), null);
+ for (Entry<KeyExtent,Tablet> entry : onlineTabletsCopy.tailMap(start).entrySet()) {
+ KeyExtent ke = entry.getKey();
+ if (ke.getTableId().compareTo(text) == 0) {
+ Tablet tablet = entry.getValue();
+ TabletStats stats = tablet.timer.getTabletStats();
+ stats.extent = ke.toThrift();
+ stats.ingestRate = tablet.ingestRate();
+ stats.queryRate = tablet.queryRate();
+ stats.splitCreationTime = tablet.getSplitCreationTime();
+ stats.numEntries = tablet.getNumEntries();
+ result.add(stats);
+ }
+ }
+ return result;
+ }
+
+ private ZooCache masterLockCache = new ZooCache();
+
+ private void checkPermission(TCredentials credentials, String lock, boolean requiresSystemPermission, final String request)
+ throws ThriftSecurityException {
+ if (requiresSystemPermission) {
+ boolean fatal = false;
+ try {
+ log.debug("Got " + request + " message from user: " + credentials.getPrincipal());
+ if (!security.canPerformSystemActions(credentials)) {
+ log.warn("Got " + request + " message from user: " + credentials.getPrincipal());
+ throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+ }
+ } catch (ThriftSecurityException e) {
+ log.warn("Got " + request + " message from unauthenticatable user: " + e.getUser());
+ if (e.getUser().equals(SecurityConstants.SYSTEM_PRINCIPAL)) {
+ log.fatal("Got message from a service with a mismatched configuration. Please ensure a compatible configuration.", e);
+ fatal = true;
+ }
+ throw e;
+ } finally {
+ if (fatal) {
+ Halt.halt(1, new Runnable() {
+ @Override
+ public void run() {
+ logGCInfo(getSystemConfiguration());
+ }
+ });
+ }
+ }
+ }
+
+ if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) {
+ log.warn("Got " + request + " message from master before lock acquired, ignoring...");
+ throw new RuntimeException("Lock not acquired");
+ }
+
+ if (tabletServerLock != null && tabletServerLock.wasLockAcquired() && !tabletServerLock.isLocked()) {
+ Halt.halt(1, new Runnable() {
+ @Override
+ public void run() {
+ log.info("Tablet server no longer holds lock during checkPermission() : " + request + ", exiting");
+ logGCInfo(getSystemConfiguration());
+ }
+ });
+ }
+
+ if (lock != null) {
+ ZooUtil.LockID lid = new ZooUtil.LockID(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK, lock);
+
+ try {
+ if (!ZooLock.isLockHeld(masterLockCache, lid)) {
+ // maybe the cache is out of date and a new master holds the
+ // lock?
+ masterLockCache.clear();
+ if (!ZooLock.isLockHeld(masterLockCache, lid)) {
+ log.warn("Got " + request + " message from a master that does not hold the current lock " + lock);
+ throw new RuntimeException("bad master lock");
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("bad master lock", e);
+ }
+ }
+ }
+
+ @Override
+ public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, final TKeyExtent textent) {
+
+ try {
+ checkPermission(credentials, lock, true, "loadTablet");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ final KeyExtent extent = new KeyExtent(textent);
+
+ synchronized (unopenedTablets) {
+ synchronized (openingTablets) {
+ synchronized (onlineTablets) {
+
+ // checking if this exact tablet is in any of the sets
+ // below is not a strong enough check
+ // when splits and fix splits occurring
+
+ Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
+ Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
+ Set<KeyExtent> onlineOverlapping = KeyExtent.findOverlapping(extent, onlineTablets);
+ Set<KeyExtent> all = new HashSet<KeyExtent>();
+ all.addAll(unopenedOverlapping);
+ all.addAll(openingOverlapping);
+ all.addAll(onlineOverlapping);
+
+ if (!all.isEmpty()) {
+ if (all.size() != 1 || !all.contains(extent)) {
+ log.error("Tablet " + extent + " overlaps previously assigned " + unopenedOverlapping + " " + openingOverlapping + " " + onlineOverlapping);
+ }
+ return;
+ }
+
+ unopenedTablets.add(extent);
+ }
+ }
+ }
+
+ // add the assignment job to the appropriate queue
+ log.info("Loading tablet " + extent);
+
+ final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent));
+ // Root tablet assignment must take place immediately
+ if (extent.isRootTablet()) {
+ new Daemon("Root Tablet Assignment") {
+ @Override
+ public void run() {
+ ah.run();
+ if (onlineTablets.containsKey(extent)) {
+ log.info("Root tablet loaded: " + extent);
+ } else {
+ log.info("Root tablet failed to load");
+ }
+
+ }
+ }.start();
+ } else {
+ if (extent.isMeta()) {
+ resourceManager.addMetaDataAssignment(ah);
+ } else {
+ resourceManager.addAssignment(ah);
+ }
+ }
+ }
+
+ @Override
+ public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent, boolean save) {
+ try {
+ checkPermission(credentials, lock, true, "unloadTablet");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ KeyExtent extent = new KeyExtent(textent);
+
+ resourceManager.addMigration(extent, new LoggingRunnable(log, new UnloadTabletHandler(extent, save)));
+ }
+
+ @Override
+ public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) {
+ try {
+ checkPermission(credentials, lock, true, "flush");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ ArrayList<Tablet> tabletsToFlush = new ArrayList<Tablet>();
+
+ KeyExtent ke = new KeyExtent(new Text(tableId), ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow));
+
+ synchronized (onlineTablets) {
+ for (Tablet tablet : onlineTablets.values())
+ if (ke.overlaps(tablet.getExtent()))
+ tabletsToFlush.add(tablet);
+ }
+
+ Long flushID = null;
+
+ for (Tablet tablet : tabletsToFlush) {
+ if (flushID == null) {
+ // read the flush id once from zookeeper instead of reading
+ // it for each tablet
+ try {
+ flushID = tablet.getFlushID();
+ } catch (NoNodeException e) {
+ // table was probably deleted
+ log.info("Asked to flush table that has no flush id " + ke + " " + e.getMessage());
+ return;
+ }
+ }
+ tablet.flush(flushID);
+ }
+ }
+
+ @Override
+ public void flushTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) throws TException {
+ try {
+ checkPermission(credentials, lock, true, "flushTablet");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ Tablet tablet = onlineTablets.get(new KeyExtent(textent));
+ if (tablet != null) {
+ log.info("Flushing " + tablet.getExtent());
+ try {
+ tablet.flush(tablet.getFlushID());
+ } catch (NoNodeException nne) {
+ log.info("Asked to flush tablet that has no flush id " + new KeyExtent(textent) + " " + nne.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public void halt(TInfo tinfo, TCredentials credentials, String lock) throws ThriftSecurityException {
+
+ checkPermission(credentials, lock, true, "halt");
+
+ Halt.halt(0, new Runnable() {
+ @Override
+ public void run() {
+ log.info("Master requested tablet server halt");
+ logGCInfo(getSystemConfiguration());
+ serverStopRequested = true;
+ try {
+ tabletServerLock.unlock();
+ } catch (Exception e) {
+ log.error(e, e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void fastHalt(TInfo info, TCredentials credentials, String lock) {
+ try {
+ halt(info, credentials, lock);
+ } catch (Exception e) {
+ log.warn("Error halting", e);
+ }
+ }
+
+ @Override
+ public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
+ return statsKeeper.getTabletStats();
+ }
+
+ @Override
+ public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
+ try {
+ checkPermission(credentials, null, true, "getScans");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ return sessionManager.getActiveScans();
+ }
+
+ @Override
+ public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) throws TException {
+ try {
+ checkPermission(credentials, lock, true, "chop");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ KeyExtent ke = new KeyExtent(textent);
+
+ Tablet tablet = onlineTablets.get(ke);
+ if (tablet != null) {
+ tablet.chopFiles();
+ }
+ }
+
+ @Override
+ public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow)
+ throws TException {
+ try {
+ checkPermission(credentials, lock, true, "compact");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ KeyExtent ke = new KeyExtent(new Text(tableId), ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow));
+
+ ArrayList<Tablet> tabletsToCompact = new ArrayList<Tablet>();
+ synchronized (onlineTablets) {
+ for (Tablet tablet : onlineTablets.values())
+ if (ke.overlaps(tablet.getExtent()))
+ tabletsToCompact.add(tablet);
+ }
+
+ Long compactionId = null;
+
+ for (Tablet tablet : tabletsToCompact) {
+ // all for the same table id, so only need to read
+ // compaction id once
+ if (compactionId == null)
+ try {
+ compactionId = tablet.getCompactionID().getFirst();
+ } catch (NoNodeException e) {
+ log.info("Asked to compact table with no compaction id " + ke + " " + e.getMessage());
+ return;
+ }
+ tablet.compactAll(compactionId);
+ }
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.trace.thrift.TInfo,
+ * org.apache.accumulo.core.security.thrift.Credentials, java.util.List)
+ */
+ @Override
+ public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
+ String myname = getClientAddressString();
+ myname = myname.replace(':', '+');
+ Path logDir = new Path(Constants.getWalDirectory(acuConf), myname);
+ Set<String> loggers = new HashSet<String>();
+ logger.getLoggers(loggers);
+ nextFile: for (String filename : filenames) {
+ for (String logger : loggers) {
+ if (logger.contains(filename))
+ continue nextFile;
+ }
+ List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>();
+ synchronized (onlineTablets) {
+ onlineTabletsCopy.addAll(onlineTablets.values());
+ }
+ for (Tablet tablet : onlineTabletsCopy) {
+ for (String current : tablet.getCurrentLogs()) {
+ if (current.contains(filename)) {
+ log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent());
+ continue nextFile;
+ }
+ }
+ }
+ try {
+ String source = logDir + "/" + filename;
+ if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
+ String walogArchive = Constants.getBaseDir(acuConf) + "/walogArchive";
+ fs.mkdirs(new Path(walogArchive));
+ String dest = walogArchive + "/" + filename;
+ log.info("Archiving walog " + source + " to " + dest);
+ if (!fs.rename(new Path(source), new Path(dest)))
+ log.error("rename is unsuccessful");
+ } else {
+ log.info("Deleting walog " + filename);
+ Trash trash = new Trash(fs, fs.getConf());
+ Path sourcePath = new Path(source);
+ if (!(!acuConf.getBoolean(Property.GC_TRASH_IGNORE) && trash.moveToTrash(sourcePath)) && !fs.delete(sourcePath, true))
+ log.warn("Failed to delete walog " + source);
+ Path recoveryPath = new Path(Constants.getRecoveryDir(acuConf), filename);
+ try {
+ if (trash.moveToTrash(recoveryPath) || fs.delete(recoveryPath, true))
+ log.info("Deleted any recovery log " + filename);
+ } catch (FileNotFoundException ex) {
+ // ignore
+ }
+
+ }
+ } catch (IOException e) {
+ log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);
+ }
+ }
+ }
+
+ @Override
+ public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
+ try {
+ checkPermission(credentials, null, true, "getActiveCompactions");
+ } catch (ThriftSecurityException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+
+ List<CompactionInfo> compactions = Compactor.getRunningCompactions();
+ List<ActiveCompaction> ret = new ArrayList<ActiveCompaction>(compactions.size());
+
+ for (CompactionInfo compactionInfo : compactions) {
+ ret.add(compactionInfo.toThrift());
+ }
+
+ return ret;
+ }
+ }
+
+ private class SplitRunner implements Runnable {
+ private Tablet tablet;
+
+ public SplitRunner(Tablet tablet) {
+ this.tablet = tablet;
+ }
+
+ @Override
+ public void run() {
+ if (majorCompactorDisabled) {
+ // this will make split task that were queued when shutdown was
+ // initiated exit
+ return;
+ }
+
+ splitTablet(tablet);
+ }
+ }
+
+ boolean isMajorCompactionDisabled() {
+ return majorCompactorDisabled;
+ }
+
+ void executeSplit(Tablet tablet) {
+ resourceManager.executeSplit(tablet.getExtent(), new LoggingRunnable(log, new SplitRunner(tablet)));
+ }
+
+ private class MajorCompactor implements Runnable {
+
+ @Override
+ public void run() {
+ while (!majorCompactorDisabled) {
+ try {
+ UtilWaitThread.sleep(getSystemConfiguration().getTimeInMillis(Property.TSERV_MAJC_DELAY));
+
+ TreeMap<KeyExtent,Tablet> copyOnlineTablets = new TreeMap<KeyExtent,Tablet>();
+
+ synchronized (onlineTablets) {
+ copyOnlineTablets.putAll(onlineTablets); // avoid
+ // concurrent
+ // modification
+ }
+
+ int numMajorCompactionsInProgress = 0;
+
+ Iterator<Entry<KeyExtent,Tablet>> iter = copyOnlineTablets.entrySet().iterator();
+
<TRUNCATED>