You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2014/04/09 19:57:54 UTC
[23/64] [abbrv] Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
----------------------------------------------------------------------
diff --cc minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
index a366c16,0000000..e2b2f83
mode 100644,000000..100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
@@@ -1,392 -1,0 +1,382 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.minicluster;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.gc.SimpleGarbageCollector;
+import org.apache.accumulo.server.master.Master;
+import org.apache.accumulo.server.tabletserver.TabletServer;
+import org.apache.accumulo.server.util.Initialize;
+import org.apache.accumulo.server.util.PortUtils;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.start.Main;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+
+/**
+ * A utility class that will create Zookeeper and Accumulo processes that write all of their data to a single local directory. This class makes it easy to test
+ * code against a real Accumulo instance. Its much more accurate for testing than MockAccumulo, but much slower than MockAccumulo.
+ *
+ * @since 1.5.0
+ */
+public class MiniAccumuloCluster {
+
+ private static final String INSTANCE_SECRET = "DONTTELL";
+ private static final String INSTANCE_NAME = "miniInstance";
+
+ private static class LogWriter extends Thread {
+ private BufferedReader in;
+ private BufferedWriter out;
+
- /**
- * @throws IOException
- */
+ public LogWriter(InputStream stream, File logFile) throws IOException {
+ this.setDaemon(true);
+ this.in = new BufferedReader(new InputStreamReader(stream, Constants.UTF8));
+ out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(logFile), Constants.UTF8));
+
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }, 1000, 1000);
+ }
+
+ public synchronized void flush() throws IOException {
+ if (out != null)
+ out.flush();
+ }
+
+ @Override
+ public void run() {
+ String line;
+
+ try {
+ while ((line = in.readLine()) != null) {
+ out.append(line);
+ out.append("\n");
+ }
+
+ synchronized (this) {
+ out.close();
+ out = null;
+ in.close();
+ }
+
+ } catch (IOException e) {}
+ }
+ }
+
+ private File libDir;
+ private File libExtDir;
+ private File confDir;
+ private File zooKeeperDir;
+ private File accumuloDir;
+ private File zooCfgFile;
+ private File logDir;
+ private File walogDir;
+
+ private Process zooKeeperProcess;
+ private Process masterProcess;
+ private Process gcProcess;
+
+ private int zooKeeperPort;
+
+ private List<LogWriter> logWriters = new ArrayList<MiniAccumuloCluster.LogWriter>();
+
+ private MiniAccumuloConfig config;
+ private Process[] tabletServerProcesses;
+
+ private Process exec(Class<? extends Object> clazz, String... args) throws IOException {
+ String javaHome = System.getProperty("java.home");
+ String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
+ String classpath = System.getProperty("java.class.path");
+
+ classpath = confDir.getAbsolutePath() + File.pathSeparator + classpath;
+
+ String className = clazz.getCanonicalName();
+
+ ArrayList<String> argList = new ArrayList<String>();
+
+ argList.addAll(Arrays.asList(javaBin, "-cp", classpath, "-Xmx128m", "-XX:+UseConcMarkSweepGC", "-XX:CMSInitiatingOccupancyFraction=75",
+ "-Dapple.awt.UIElement=true", Main.class.getName(), className));
+
+ argList.addAll(Arrays.asList(args));
+
+ ProcessBuilder builder = new ProcessBuilder(argList);
+
+ builder.environment().put("ACCUMULO_HOME", config.getDir().getAbsolutePath());
+ builder.environment().put("ACCUMULO_LOG_DIR", logDir.getAbsolutePath());
+
+ // if we're running under accumulo.start, we forward these env vars
+ String env = System.getenv("HADOOP_PREFIX");
+ if (env != null)
+ builder.environment().put("HADOOP_PREFIX", env);
+ env = System.getenv("ZOOKEEPER_HOME");
+ if (env != null)
+ builder.environment().put("ZOOKEEPER_HOME", env);
+
+ Process process = builder.start();
+
+ LogWriter lw;
+ lw = new LogWriter(process.getErrorStream(), new File(logDir, clazz.getSimpleName() + "_" + process.hashCode() + ".err"));
+ logWriters.add(lw);
+ lw.start();
+ lw = new LogWriter(process.getInputStream(), new File(logDir, clazz.getSimpleName() + "_" + process.hashCode() + ".out"));
+ logWriters.add(lw);
+ lw.start();
+
+ return process;
+ }
+
+ private void appendProp(Writer fileWriter, Property key, String value, Map<String,String> siteConfig) throws IOException {
+ appendProp(fileWriter, key.getKey(), value, siteConfig);
+ }
+
+ private void appendProp(Writer fileWriter, String key, String value, Map<String,String> siteConfig) throws IOException {
+ if (!siteConfig.containsKey(key))
+ fileWriter.append("<property><name>" + key + "</name><value>" + value + "</value></property>\n");
+ }
+
+ /**
+ * Sets a given key with a random port for the value on the site config if it doesn't already exist.
+ */
+ private void mergePropWithRandomPort(Map<String,String> siteConfig, String key) {
+ if (!siteConfig.containsKey(key)) {
+ siteConfig.put(key, "0");
+ }
+ }
+
+ /**
+ *
+ * @param dir
+ * An empty or nonexistant temp directoy that Accumulo and Zookeeper can store data in. Creating the directory is left to the user. Java 7, Guava,
+ * and Junit provide methods for creating temporary directories.
+ * @param rootPassword
+ * Initial root password for instance.
- * @throws IOException
+ */
+ public MiniAccumuloCluster(File dir, String rootPassword) throws IOException {
+ this(new MiniAccumuloConfig(dir, rootPassword));
+ }
+
+ /**
+ * @param config
+ * initial configuration
- * @throws IOException
+ */
+
+ public MiniAccumuloCluster(MiniAccumuloConfig config) throws IOException {
+
+ if (config.getDir().exists() && !config.getDir().isDirectory())
+ throw new IllegalArgumentException("Must pass in directory, " + config.getDir() + " is a file");
+
+ if (config.getDir().exists() && config.getDir().list().length != 0)
+ throw new IllegalArgumentException("Directory " + config.getDir() + " is not empty");
+
+ this.config = config;
+
+ libDir = new File(config.getDir(), "lib");
+ libExtDir = new File(libDir, "ext");
+ confDir = new File(config.getDir(), "conf");
+ accumuloDir = new File(config.getDir(), "accumulo");
+ zooKeeperDir = new File(config.getDir(), "zookeeper");
+ logDir = new File(config.getDir(), "logs");
+ walogDir = new File(config.getDir(), "walogs");
+
+ confDir.mkdirs();
+ accumuloDir.mkdirs();
+ zooKeeperDir.mkdirs();
+ logDir.mkdirs();
+ walogDir.mkdirs();
+ libDir.mkdirs();
+
+ // Avoid the classloader yelling that the general.dynamic.classpaths value is invalid because
+ // $ACCUMULO_HOME/lib/ext isn't defined.
+ libExtDir.mkdirs();
+
+ zooKeeperPort = PortUtils.getRandomFreePort();
+
+ File siteFile = new File(confDir, "accumulo-site.xml");
+
+ OutputStreamWriter fileWriter = new OutputStreamWriter(new FileOutputStream(siteFile), Constants.UTF8);
+ fileWriter.append("<configuration>\n");
+
+ HashMap<String,String> siteConfig = new HashMap<String,String>(config.getSiteConfig());
+
+ appendProp(fileWriter, Property.INSTANCE_DFS_URI, "file:///", siteConfig);
+ appendProp(fileWriter, Property.INSTANCE_DFS_DIR, accumuloDir.getAbsolutePath(), siteConfig);
+ appendProp(fileWriter, Property.INSTANCE_ZK_HOST, "localhost:" + zooKeeperPort, siteConfig);
+ appendProp(fileWriter, Property.INSTANCE_SECRET, INSTANCE_SECRET, siteConfig);
+ appendProp(fileWriter, Property.TSERV_PORTSEARCH, "true", siteConfig);
+ appendProp(fileWriter, Property.LOGGER_DIR, walogDir.getAbsolutePath(), siteConfig);
+ appendProp(fileWriter, Property.TSERV_DATACACHE_SIZE, "10M", siteConfig);
+ appendProp(fileWriter, Property.TSERV_INDEXCACHE_SIZE, "10M", siteConfig);
+ appendProp(fileWriter, Property.TSERV_MAXMEM, "50M", siteConfig);
+ appendProp(fileWriter, Property.TSERV_WALOG_MAX_SIZE, "100M", siteConfig);
+ appendProp(fileWriter, Property.TSERV_NATIVEMAP_ENABLED, "false", siteConfig);
+ appendProp(fileWriter, Property.TRACE_TOKEN_PROPERTY_PREFIX + ".password", config.getRootPassword(), siteConfig);
+ appendProp(fileWriter, Property.GC_CYCLE_DELAY, "4s", siteConfig);
+ appendProp(fileWriter, Property.GC_CYCLE_START, "0s", siteConfig);
+ mergePropWithRandomPort(siteConfig, Property.MASTER_CLIENTPORT.getKey());
+ mergePropWithRandomPort(siteConfig, Property.TRACE_PORT.getKey());
+ mergePropWithRandomPort(siteConfig, Property.TSERV_CLIENTPORT.getKey());
+ mergePropWithRandomPort(siteConfig, Property.MONITOR_PORT.getKey());
+ mergePropWithRandomPort(siteConfig, Property.GC_PORT.getKey());
+
+ // since there is a small amount of memory, check more frequently for majc... setting may not be needed in 1.5
+ appendProp(fileWriter, Property.TSERV_MAJC_DELAY, "3", siteConfig);
+
+ // ACCUMULO-1472 -- Use the classpath, not what might be installed on the system.
+ // We have to set *something* here, otherwise the AccumuloClassLoader will default to pulling from
+ // environment variables (e.g. ACCUMULO_HOME, HADOOP_HOME/PREFIX) which will result in multiple copies
+ // of artifacts on the classpath as they'll be provided by the invoking application
+ appendProp(fileWriter, Property.GENERAL_CLASSPATHS, libDir.getAbsolutePath() + "/[^.].*.jar", siteConfig);
+ appendProp(fileWriter, Property.GENERAL_DYNAMIC_CLASSPATHS, libExtDir.getAbsolutePath() + "/[^.].*.jar", siteConfig);
+
+ for (Entry<String,String> entry : siteConfig.entrySet())
+ fileWriter.append("<property><name>" + entry.getKey() + "</name><value>" + entry.getValue() + "</value></property>\n");
+ fileWriter.append("</configuration>\n");
+ fileWriter.close();
+
+ zooCfgFile = new File(confDir, "zoo.cfg");
+ fileWriter = new OutputStreamWriter(new FileOutputStream(zooCfgFile), Constants.UTF8);
+
+ // zookeeper uses Properties to read its config, so use that to write in order to properly escape things like Windows paths
+ Properties zooCfg = new Properties();
+ zooCfg.setProperty("tickTime", "1000");
+ zooCfg.setProperty("initLimit", "10");
+ zooCfg.setProperty("syncLimit", "5");
+ zooCfg.setProperty("clientPort", zooKeeperPort + "");
+ zooCfg.setProperty("maxClientCnxns", "100");
+ zooCfg.setProperty("dataDir", zooKeeperDir.getAbsolutePath());
+ zooCfg.store(fileWriter, null);
+
+ fileWriter.close();
+
+ }
+
+ /**
+ * Starts Accumulo and Zookeeper processes. Can only be called once.
+ *
- * @throws IOException
- * @throws InterruptedException
+ * @throws IllegalStateException
+ * if already started
+ */
+
+ public void start() throws IOException, InterruptedException {
+ if (zooKeeperProcess != null)
+ throw new IllegalStateException("Already started");
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ MiniAccumuloCluster.this.stop();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ zooKeeperProcess = exec(Main.class, ZooKeeperServerMain.class.getName(), zooCfgFile.getAbsolutePath());
+
+ // sleep a little bit to let zookeeper come up before calling init, seems to work better
+ UtilWaitThread.sleep(250);
+
+ Process initProcess = exec(Initialize.class, "--instance-name", INSTANCE_NAME, "--password", config.getRootPassword());
+ int ret = initProcess.waitFor();
+ if (ret != 0) {
+ throw new RuntimeException("Initialize process returned " + ret + ". Check the logs in " + logDir + " for errors.");
+ }
+
+ tabletServerProcesses = new Process[config.getNumTservers()];
+ for (int i = 0; i < config.getNumTservers(); i++) {
+ tabletServerProcesses[i] = exec(TabletServer.class);
+ }
+
+ masterProcess = exec(Master.class);
+
+ gcProcess = exec(SimpleGarbageCollector.class);
+ }
+
+ /**
+ * @return Accumulo instance name
+ */
+
+ public String getInstanceName() {
+ return INSTANCE_NAME;
+ }
+
+ /**
+ * @return zookeeper connection string
+ */
+
+ public String getZooKeepers() {
+ return "localhost:" + zooKeeperPort;
+ }
+
+ /**
+ * Stops Accumulo and Zookeeper processes. If stop is not called, there is a shutdown hook that is setup to kill the processes. Howerver its probably best to
+ * call stop in a finally block as soon as possible.
- *
- * @throws IOException
- * @throws InterruptedException
+ */
+
+ public void stop() throws IOException, InterruptedException {
+ if (zooKeeperProcess != null) {
+ zooKeeperProcess.destroy();
+ zooKeeperProcess.waitFor();
+ }
+ if (masterProcess != null) {
+ masterProcess.destroy();
+ masterProcess.waitFor();
+ }
+ if (tabletServerProcesses != null) {
+ for (Process tserver : tabletServerProcesses) {
+ tserver.destroy();
+ tserver.waitFor();
+ }
+ }
+
+ for (LogWriter lw : logWriters)
+ lw.flush();
+
+ if (gcProcess != null) {
+ gcProcess.destroy();
+ gcProcess.waitFor();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/proxy/src/test/java/org/apache/accumulo/proxy/TestProxyReadWrite.java
----------------------------------------------------------------------
diff --cc proxy/src/test/java/org/apache/accumulo/proxy/TestProxyReadWrite.java
index 99a3218,0000000..c0049a0
mode 100644,000000..100644
--- a/proxy/src/test/java/org/apache/accumulo/proxy/TestProxyReadWrite.java
+++ b/proxy/src/test/java/org/apache/accumulo/proxy/TestProxyReadWrite.java
@@@ -1,488 -1,0 +1,478 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.proxy;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.accumulo.proxy.thrift.BatchScanOptions;
+import org.apache.accumulo.proxy.thrift.ColumnUpdate;
+import org.apache.accumulo.proxy.thrift.IteratorSetting;
+import org.apache.accumulo.proxy.thrift.Key;
+import org.apache.accumulo.proxy.thrift.KeyValue;
+import org.apache.accumulo.proxy.thrift.Range;
+import org.apache.accumulo.proxy.thrift.ScanColumn;
+import org.apache.accumulo.proxy.thrift.ScanOptions;
+import org.apache.accumulo.proxy.thrift.ScanResult;
+import org.apache.accumulo.proxy.thrift.TimeType;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestProxyReadWrite {
+ protected static TServer proxy;
+ protected static Thread thread;
+ protected static TestProxyClient tpc;
+ protected static ByteBuffer userpass;
+ protected static final int port = 10194;
+ protected static final String testtable = "testtable";
+
+ @SuppressWarnings("serial")
+ @BeforeClass
+ public static void setup() throws Exception {
+ Properties prop = new Properties();
+ prop.setProperty("useMockInstance", "true");
+ prop.put("tokenClass", PasswordToken.class.getName());
+
+ proxy = Proxy.createProxyServer(Class.forName("org.apache.accumulo.proxy.thrift.AccumuloProxy"), Class.forName("org.apache.accumulo.proxy.ProxyServer"),
+ port, TCompactProtocol.Factory.class, prop);
+ thread = new Thread() {
+ @Override
+ public void run() {
+ proxy.serve();
+ }
+ };
+ thread.start();
+ tpc = new TestProxyClient("localhost", port);
+ userpass = tpc.proxy().login("root", new TreeMap<String, String>() {{put("password",""); }});
+ }
+
+ @AfterClass
+ public static void tearDown() throws InterruptedException {
+ proxy.stop();
+ thread.join();
+ }
+
+ @Before
+ public void makeTestTable() throws Exception {
+ tpc.proxy().createTable(userpass, testtable, true, TimeType.MILLIS);
+ }
+
+ @After
+ public void deleteTestTable() throws Exception {
+ tpc.proxy().deleteTable(userpass, testtable);
+ }
+
+ private static void addMutation(Map<ByteBuffer,List<ColumnUpdate>> mutations, String row, String cf, String cq, String value) {
+ ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(cf.getBytes()), ByteBuffer.wrap(cq.getBytes()));
+ update.setValue(value.getBytes());
+ mutations.put(ByteBuffer.wrap(row.getBytes()), Collections.singletonList(update));
+ }
+
+ private static void addMutation(Map<ByteBuffer,List<ColumnUpdate>> mutations, String row, String cf, String cq, String vis, String value) {
+ ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(cf.getBytes()), ByteBuffer.wrap(cq.getBytes()));
+ update.setValue(value.getBytes());
+ update.setColVisibility(vis.getBytes());
+ mutations.put(ByteBuffer.wrap(row.getBytes()), Collections.singletonList(update));
+ }
+
+ /**
+ * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Set a range so only the entries between -Inf...5 come back (there should be
+ * 50,000)
- *
- * @throws Exception
+ */
+ @Test
+ public void readWriteBatchOneShotWithRange() throws Exception {
+ int maxInserts = 100000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
+ String format = "%1$05d";
+ for (int i = 0; i < maxInserts; i++) {
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+ tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+ mutations.clear();
+ }
+ }
+
+ Key stop = new Key();
+ stop.setRow("5".getBytes());
+ BatchScanOptions options = new BatchScanOptions();
+ options.ranges = Collections.singletonList(new Range(null, false, stop, false));
+ String cookie = tpc.proxy().createBatchScanner(userpass, testtable, options);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ i += kvList.getResultsSize();
+ hasNext = kvList.isMore();
+ }
+ assertEquals(i, 50000);
+ }
+
+ /**
+ * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Set a columnFamily so only the entries with specified column family come back (there should be
+ * 50,000)
- *
- * @throws Exception
+ */
+ @Test
+ public void readWriteBatchOneShotWithColumnFamilyOnly() throws Exception {
+ int maxInserts = 100000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
+ String format = "%1$05d";
+ for (int i = 0; i < maxInserts; i++) {
+
+ addMutation(mutations, String.format(format, i), "cf" + (i % 2) , "cq" + (i % 2), Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+ tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+ mutations.clear();
+ }
+ }
+
+ BatchScanOptions options = new BatchScanOptions();
+
+ ScanColumn sc = new ScanColumn();
+ sc.colFamily = ByteBuffer.wrap("cf0".getBytes());
+
+ options.columns = Collections.singletonList(sc);
+ String cookie = tpc.proxy().createBatchScanner(userpass, testtable, options);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ i += kvList.getResultsSize();
+ hasNext = kvList.isMore();
+ }
+ assertEquals(i, 50000);
+ }
+
+
+ /**
+ * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Set a columnFamily + columnQualififer so only the entries with specified column
+ * come back (there should be 50,000)
- *
- * @throws Exception
+ */
+ @Test
+ public void readWriteBatchOneShotWithFullColumn() throws Exception {
+ int maxInserts = 100000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
+ String format = "%1$05d";
+ for (int i = 0; i < maxInserts; i++) {
+
+ addMutation(mutations, String.format(format, i), "cf" + (i % 2) , "cq" + (i % 2), Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+ tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+ mutations.clear();
+ }
+ }
+
+ BatchScanOptions options = new BatchScanOptions();
+
+ ScanColumn sc = new ScanColumn();
+ sc.colFamily = ByteBuffer.wrap("cf0".getBytes());
+ sc.colQualifier = ByteBuffer.wrap("cq0".getBytes());
+
+ options.columns = Collections.singletonList(sc);
+ String cookie = tpc.proxy().createBatchScanner(userpass, testtable, options);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ i += kvList.getResultsSize();
+ hasNext = kvList.isMore();
+ }
+ assertEquals(i, 50000);
+ }
+
+
+ /**
+ * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Filter the results so only the even numbers come back.
- *
- * @throws Exception
+ */
+ @Test
+ public void readWriteBatchOneShotWithFilterIterator() throws Exception {
+ int maxInserts = 10000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
+ String format = "%1$05d";
+ for (int i = 0; i < maxInserts; i++) {
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+ tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+ mutations.clear();
+ }
+
+ }
+
+ String regex = ".*[02468]";
+
+ org.apache.accumulo.core.client.IteratorSetting is = new org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class);
+ RegExFilter.setRegexs(is, regex, null, null, null, false);
+
+ IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is);
+ ScanOptions opts = new ScanOptions();
+ opts.iterators = Collections.singletonList(pis);
+ String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ for (KeyValue kv : kvList.getResults()) {
+ assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
+
+ i += 2;
+ }
+ hasNext = kvList.isMore();
+ }
+ }
+
+ @Test
+ public void readWriteOneShotWithRange() throws Exception {
+ int maxInserts = 100000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
+ String format = "%1$05d";
+ for (int i = 0; i < maxInserts; i++) {
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+ tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+ mutations.clear();
+ }
+ }
+
+ Key stop = new Key();
+ stop.setRow("5".getBytes());
+ ScanOptions opts = new ScanOptions();
+ opts.range = new Range(null, false, stop, false);
+ String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ i += kvList.getResultsSize();
+ hasNext = kvList.isMore();
+ }
+ assertEquals(i, 50000);
+ }
+
+ /**
+ * Insert 100000 cells which have as the row [0..99999] (padded with zeros). Filter the results so only the even numbers come back.
- *
- * @throws Exception
+ */
+ @Test
+ public void readWriteOneShotWithFilterIterator() throws Exception {
+ int maxInserts = 10000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
+ String format = "%1$05d";
+ for (int i = 0; i < maxInserts; i++) {
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+
+ tpc.proxy().updateAndFlush(userpass, testtable, mutations);
+ mutations.clear();
+
+ }
+
+ }
+
+ String regex = ".*[02468]";
+
+ org.apache.accumulo.core.client.IteratorSetting is = new org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class);
+ RegExFilter.setRegexs(is, regex, null, null, null, false);
+
+ IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is);
+ ScanOptions opts = new ScanOptions();
+ opts.iterators = Collections.singletonList(pis);
+ String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ for (KeyValue kv : kvList.getResults()) {
+ assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
+
+ i += 2;
+ }
+ hasNext = kvList.isMore();
+ }
+ }
+
+ // @Test
+ // This test takes kind of a long time. Enable it if you think you may have memory issues.
+ public void manyWritesAndReads() throws Exception {
+ int maxInserts = 1000000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
+ String format = "%1$06d";
+ String writer = tpc.proxy().createWriter(userpass, testtable, null);
+ for (int i = 0; i < maxInserts; i++) {
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+
+ tpc.proxy().update(writer, mutations);
+ mutations.clear();
+
+ }
+
+ }
+
+ tpc.proxy().flush(writer);
+ tpc.proxy().closeWriter(writer);
+
+ String cookie = tpc.proxy().createScanner(userpass, testtable, null);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ for (KeyValue kv : kvList.getResults()) {
+ assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
+ i++;
+ }
+ hasNext = kvList.isMore();
+ if (hasNext)
+ assertEquals(k, kvList.getResults().size());
+ }
+ assertEquals(maxInserts, i);
+ }
+
+ @Test
+ public void asynchReadWrite() throws Exception {
+ int maxInserts = 10000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
+ String format = "%1$05d";
+ String writer = tpc.proxy().createWriter(userpass, testtable, null);
+ for (int i = 0; i < maxInserts; i++) {
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+ tpc.proxy().update(writer, mutations);
+ mutations.clear();
+ }
+ }
+
+ tpc.proxy().flush(writer);
+ tpc.proxy().closeWriter(writer);
+
+ String regex = ".*[02468]";
+
+ org.apache.accumulo.core.client.IteratorSetting is = new org.apache.accumulo.core.client.IteratorSetting(50, regex, RegExFilter.class);
+ RegExFilter.setRegexs(is, regex, null, null, null, false);
+
+ IteratorSetting pis = Util.iteratorSetting2ProxyIteratorSetting(is);
+ ScanOptions opts = new ScanOptions();
+ opts.iterators = Collections.singletonList(pis);
+ String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ int numRead = 0;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ for (KeyValue kv : kvList.getResults()) {
+ assertEquals(i, Integer.parseInt(new String(kv.getKey().getRow())));
+ numRead++;
+ i += 2;
+ }
+ hasNext = kvList.isMore();
+ }
+ assertEquals(maxInserts / 2, numRead);
+ }
+
+ @Test
+ public void testVisibility() throws Exception {
+
+ Set<ByteBuffer> auths = new HashSet<ByteBuffer>();
+ auths.add(ByteBuffer.wrap("even".getBytes()));
+ tpc.proxy().changeUserAuthorizations(userpass, "root", auths);
+
+ int maxInserts = 10000;
+ Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
+ String format = "%1$05d";
+ String writer = tpc.proxy().createWriter(userpass, testtable, null);
+ for (int i = 0; i < maxInserts; i++) {
+ if (i % 2 == 0)
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, "even", Util.randString(10));
+ else
+ addMutation(mutations, String.format(format, i), "cf" + i, "cq" + i, "odd", Util.randString(10));
+
+ if (i % 1000 == 0 || i == maxInserts - 1) {
+ tpc.proxy().update(writer, mutations);
+ mutations.clear();
+ }
+ }
+
+ tpc.proxy().flush(writer);
+ tpc.proxy().closeWriter(writer);
+ ScanOptions opts = new ScanOptions();
+ opts.authorizations = auths;
+ String cookie = tpc.proxy().createScanner(userpass, testtable, opts);
+
+ int i = 0;
+ boolean hasNext = true;
+
+ int k = 1000;
+ int numRead = 0;
+ while (hasNext) {
+ ScanResult kvList = tpc.proxy().nextK(cookie, k);
+ for (KeyValue kv : kvList.getResults()) {
+ assertEquals(Integer.parseInt(new String(kv.getKey().getRow())), i);
+ i += 2;
+ numRead++;
+ }
+ hasNext = kvList.isMore();
+
+ }
+ assertEquals(maxInserts / 2, numRead);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
index 442294f,0000000..05806ca
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
+++ b/server/src/main/java/org/apache/accumulo/server/conf/ConfigSanityCheck.java
@@@ -1,30 -1,0 +1,27 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import org.apache.accumulo.server.client.HdfsZooInstance;
+
+public class ConfigSanityCheck {
+
- /**
- * @param args
- */
+ public static void main(String[] args) {
+ new ServerConfiguration(HdfsZooInstance.getInstance()).getConfiguration();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
index 4f9d33a,0000000..01626ad
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
+++ b/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
@@@ -1,191 -1,0 +1,190 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.logger;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.tabletserver.log.DfsLogger;
+import org.apache.accumulo.server.tabletserver.log.MultiReader;
+import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+
+public class LogReader {
+
+ static class Opts extends Help {
+ @Parameter(names = "-r", description = "print only mutations associated with the given row")
+ String row;
+ @Parameter(names = "-m", description = "limit the number of mutations printed per row")
+ int maxMutations = 5;
+ @Parameter(names = "-t", description = "print only mutations that fall within the given key extent")
+ String extent;
+ @Parameter(names = "-p", description = "search for a row that matches the given regex")
+ String regexp;
+ @Parameter(description = "<logfile> { <logfile> ... }")
+ List<String> files = new ArrayList<String>();
+ }
+
+ /**
+ * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system.
+ *
+ * @param args
+ * - first argument is the file to print
- * @throws IOException
+ */
+ public static void main(String[] args) throws IOException {
+ Opts opts = new Opts();
+ opts.parseArgs(LogReader.class.getName(), args);
+ Configuration conf = CachedConfiguration.getInstance();
+ FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration()));
+ FileSystem local = TraceFileSystem.wrap(FileSystem.getLocal(conf));
+
+ Matcher rowMatcher = null;
+ KeyExtent ke = null;
+ Text row = null;
+ if (opts.files.isEmpty()) {
+ new JCommander(opts).usage();
+ return;
+ }
+ if (opts.row != null)
+ row = new Text(opts.row);
+ if (opts.extent != null) {
+ String sa[] = opts.extent.split(";");
+ ke = new KeyExtent(new Text(sa[0]), new Text(sa[1]), new Text(sa[2]));
+ }
+ if (opts.regexp != null) {
+ Pattern pattern = Pattern.compile(opts.regexp);
+ rowMatcher = pattern.matcher("");
+ }
+
+ Set<Integer> tabletIds = new HashSet<Integer>();
+
+ for (String file : opts.files) {
+
+ Map<String, String> meta = new HashMap<String, String>();
+ Path path = new Path(file);
+ LogFileKey key = new LogFileKey();
+ LogFileValue value = new LogFileValue();
+
+ if (fs.isFile(path)) {
+ // read log entries from a simple hdfs file
+ FSDataInputStream f = DfsLogger.readHeader(fs, path, meta);
+ try {
+ while (true) {
+ try {
+ key.readFields(f);
+ value.readFields(f);
+ } catch (EOFException ex) {
+ break;
+ }
+ printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
+ }
+ } finally {
+ f.close();
+ }
+ } else if (local.isFile(path)) {
+ // read log entries from a simple file
+ FSDataInputStream f = DfsLogger.readHeader(fs, path, meta);
+ try {
+ while (true) {
+ try {
+ key.readFields(f);
+ value.readFields(f);
+ } catch (EOFException ex) {
+ break;
+ }
+ printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
+ }
+ } finally {
+ f.close();
+ }
+ } else {
+ // read the log entries sorted in a map file
+ MultiReader input = new MultiReader(fs, conf, file);
+ while (input.next(key, value)) {
+ printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
+ }
+ }
+ }
+ }
+
+ public static void printLogEvent(LogFileKey key, LogFileValue value, Text row, Matcher rowMatcher, KeyExtent ke, Set<Integer> tabletIds, int maxMutations) {
+
+ if (ke != null) {
+ if (key.event == LogEvents.DEFINE_TABLET) {
+ if (key.tablet.equals(ke)) {
+ tabletIds.add(key.tid);
+ } else {
+ return;
+ }
+ } else if (!tabletIds.contains(key.tid)) {
+ return;
+ }
+ }
+
+ if (row != null || rowMatcher != null) {
+ if (key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) {
+ boolean found = false;
+ for (Mutation m : value.mutations) {
+ if (row != null && new Text(m.getRow()).equals(row)) {
+ found = true;
+ break;
+ }
+
+ if (rowMatcher != null) {
+ rowMatcher.reset(new String(m.getRow(), Constants.UTF8));
+ if (rowMatcher.matches()) {
+ found = true;
+ break;
+ }
+ }
+ }
+
+ if (!found)
+ return;
+ } else {
+ return;
+ }
+
+ }
+
+ System.out.println(key);
+ System.out.println(LogFileValue.format(value, maxMutations));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
index 4930bc2,0000000..e14008a
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
@@@ -1,152 -1,0 +1,144 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.balancer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.thrift.TException;
+
+/**
+ * A chaotic load balancer used for testing. It constantly shuffles tablets, preventing them from resting in a single location for very long. This is not
+ * designed for performance, do not use on production systems. I'm calling it the LokiLoadBalancer.
+ */
+public class ChaoticLoadBalancer extends TabletBalancer {
+ Random r = new Random();
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.master.balancer.TabletBalancer#getAssignments(java.util.SortedMap, java.util.Map, java.util.Map)
- */
+ @Override
+ public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
+ Map<KeyExtent,TServerInstance> assignments) {
+ long total = assignments.size() + unassigned.size();
+ long avg = (long) Math.ceil(((double) total) / current.size());
+ Map<TServerInstance,Long> toAssign = new HashMap<TServerInstance,Long>();
+ List<TServerInstance> tServerArray = new ArrayList<TServerInstance>();
+ for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
+ long numTablets = 0;
+ for (TableInfo ti : e.getValue().getTableMap().values()) {
+ numTablets += ti.tablets;
+ }
+ if (numTablets < avg) {
+ tServerArray.add(e.getKey());
+ toAssign.put(e.getKey(), avg - numTablets);
+ }
+ }
+
+ for (KeyExtent ke : unassigned.keySet())
+ {
+ int index = r.nextInt(tServerArray.size());
+ TServerInstance dest = tServerArray.get(index);
+ assignments.put(ke, dest);
+ long remaining = toAssign.get(dest).longValue() - 1;
+ if (remaining == 0) {
+ tServerArray.remove(index);
+ toAssign.remove(dest);
+ } else {
+ toAssign.put(dest, remaining);
+ }
+ }
+ }
+
+ /**
+ * Will balance randomly, maintaining distribution
+ */
+ @Override
+ public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
+ Map<TServerInstance,Long> numTablets = new HashMap<TServerInstance,Long>();
+ List<TServerInstance> underCapacityTServer = new ArrayList<TServerInstance>();
+
+ if (!migrations.isEmpty())
+ return 100;
+
+ boolean moveMetadata = r.nextInt(4) == 0;
+ long totalTablets = 0;
+ for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
+ long tabletCount = 0;
+ for (TableInfo ti : e.getValue().getTableMap().values()) {
+ tabletCount += ti.tablets;
+ }
+ numTablets.put(e.getKey(), tabletCount);
+ underCapacityTServer.add(e.getKey());
+ totalTablets += tabletCount;
+ }
+ // totalTablets is fuzzy due to asynchronicity of the stats
+ // *1.2 to handle fuzziness, and prevent locking for 'perfect' balancing scenarios
+ long avg = (long) Math.ceil(((double) totalTablets) / current.size() * 1.2);
+
+ for (Entry<TServerInstance, TabletServerStatus> e : current.entrySet())
+ {
+ for (String table : e.getValue().getTableMap().keySet())
+ {
+ if (!moveMetadata && "!METADATA".equals(table))
+ continue;
+ try {
+ for (TabletStats ts : getOnlineTabletsForTable(e.getKey(), table)) {
+ KeyExtent ke = new KeyExtent(ts.extent);
+ int index = r.nextInt(underCapacityTServer.size());
+ TServerInstance dest = underCapacityTServer.get(index);
+ if (dest.equals(e.getKey()))
+ continue;
+ migrationsOut.add(new TabletMigration(ke, e.getKey(), dest));
+ if (numTablets.put(dest, numTablets.get(dest) + 1) > avg)
+ underCapacityTServer.remove(index);
+ if (numTablets.put(e.getKey(), numTablets.get(e.getKey()) - 1) <= avg && !underCapacityTServer.contains(e.getKey()))
+ underCapacityTServer.add(e.getKey());
+
+ // We can get some craziness with only 1 tserver, so lets make sure there's always an option!
+ if (underCapacityTServer.isEmpty())
+ underCapacityTServer.addAll(numTablets.keySet());
+ }
+ } catch (ThriftSecurityException e1) {
+ // Shouldn't happen, but carry on if it does
+ e1.printStackTrace();
+ } catch (TException e1) {
+ // Shouldn't happen, but carry on if it does
+ e1.printStackTrace();
+ }
+ }
+ }
+
+ return 100;
+ }
+
- /*
- * (non-Javadoc)
- *
- * @see org.apache.accumulo.server.master.balancer.TabletBalancer#init(org.apache.accumulo.server.conf.ServerConfiguration)
- */
+ @Override
+ public void init(ServerConfiguration conf) {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
index d6dce2f,0000000..69387d3
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
@@@ -1,150 -1,0 +1,148 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.balancer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.accumulo.server.security.SecurityConstants;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+
+public abstract class TabletBalancer {
+
+ private static final Logger log = Logger.getLogger(TabletBalancer.class);
+
+ protected ServerConfiguration configuration;
+
+ /**
+ * Initialize the TabletBalancer. This gives the balancer the opportunity to read the configuration.
+ */
+ public void init(ServerConfiguration conf) {
+ configuration = conf;
+ }
+
+ /**
+ * Assign tablets to tablet servers. This method is called whenever the master finds tablets that are unassigned.
+ *
+ * @param current
+ * The current table-summary state of all the online tablet servers. Read-only. The TabletServerStatus for each server may be null if the tablet
+ * server has not yet responded to a recent request for status.
+ * @param unassigned
+ * A map from unassigned tablet to the last known tablet server. Read-only.
+ * @param assignments
+ * A map from tablet to assigned server. Write-only.
+ */
+ abstract public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
+ Map<KeyExtent,TServerInstance> assignments);
+
+ /**
+ * Ask the balancer if any migrations are necessary.
+ *
+ * @param current
+ * The current table-summary state of all the online tablet servers. Read-only.
+ * @param migrations
+ * the current set of migrations. Read-only.
+ * @param migrationsOut
+ * new migrations to perform; should not contain tablets in the current set of migrations. Write-only.
+ * @return the time, in milliseconds, to wait before re-balancing.
+ *
+ * This method will not be called when there are unassigned tablets.
+ */
+ public abstract long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut);
+
+ /**
+ * Fetch the tablets for the given table by asking the tablet server. Useful if your balance strategy needs details at the tablet level to decide what tablets
+ * to move.
+ *
+ * @param tserver
+ * The tablet server to ask.
+ * @param tableId
+ * The table id
+ * @return a list of tablet statistics
+ * @throws ThriftSecurityException
+ * tablet server disapproves of your internal System password.
+ * @throws TException
+ * any other problem
+ */
+ public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, String tableId) throws ThriftSecurityException, TException {
+ log.debug("Scanning tablet server " + tserver + " for table " + tableId);
+ Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), tserver.getLocation(), configuration.getConfiguration());
+ try {
+ List<TabletStats> onlineTabletsForTable = client.getTabletStats(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), tableId);
+ return onlineTabletsForTable;
+ } catch (TTransportException e) {
+ log.error("Unable to connect to " + tserver + ": " + e);
+ } finally {
+ ThriftUtil.returnClient(client);
+ }
+ return null;
+ }
+
+ /**
+ * Utility to ensure that the migrations from balance() are consistent:
+ * <ul>
+ * <li>Tablet objects are not null
+ * <li>Source and destination tablet servers are not null and current
+ * </ul>
+ *
- * @param current
- * @param migrations
+ * @return A list of TabletMigration object that passed sanity checks.
+ */
+ public static List<TabletMigration> checkMigrationSanity(Set<TServerInstance> current, List<TabletMigration> migrations) {
+ List<TabletMigration> result = new ArrayList<TabletMigration>(migrations.size());
+ for (TabletMigration m : migrations) {
+ if (m.tablet == null) {
+ log.warn("Balancer gave back a null tablet " + m);
+ continue;
+ }
+ if (m.newServer == null) {
+ log.warn("Balancer did not set the destination " + m);
+ continue;
+ }
+ if (m.oldServer == null) {
+ log.warn("Balancer did not set the source " + m);
+ continue;
+ }
+ if (!current.contains(m.oldServer)) {
+ log.warn("Balancer wants to move a tablet from a server that is not current: " + m);
+ continue;
+ }
+ if (!current.contains(m.newServer)) {
+ log.warn("Balancer wants to move a tablet to a server that is not current: " + m);
+ continue;
+ }
+ result.add(m);
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
index f9f03bd,0000000..540ebc0
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
@@@ -1,87 -1,0 +1,81 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * Interface for storing information about tablet assignments. There are three implementations:
+ *
+ * ZooTabletStateStore: information about the root tablet is stored in ZooKeeper MetaDataStateStore: information about the other tablets are stored in the
+ * metadata table
+ *
+ */
+public abstract class TabletStateStore implements Iterable<TabletLocationState> {
+
+ /**
+ * Identifying name for this tablet state store.
+ */
+ abstract public String name();
+
+ /**
+ * Scan the information about the tablets covered by this store
+ */
++ @Override
+ abstract public Iterator<TabletLocationState> iterator();
+
+ /**
+ * Store the assigned locations in the data store.
- *
- * @param assignments
- * @throws DistributedStoreException
+ */
+ abstract public void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException;
+
+ /**
+ * Tablet servers will update the data store with the location when they bring the tablet online
- *
- * @param assignments
- * @throws DistributedStoreException
+ */
+ abstract public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException;
+
+ /**
+ * Mark the tablets as having no known or future location.
+ *
+ * @param tablets
+ * the tablets' current information
- * @throws DistributedStoreException
+ */
+ abstract public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException;
+
+ public static void unassign(TabletLocationState tls) throws DistributedStoreException {
+ TabletStateStore store;
+ if (tls.extent.isRootTablet()) {
+ store = new ZooTabletStateStore();
+ } else {
+ store = new MetaDataStateStore();
+ }
+ store.unassign(Collections.singletonList(tls));
+ }
+
+ public static void setLocation(Assignment assignment) throws DistributedStoreException {
+ TabletStateStore store;
+ if (assignment.tablet.isRootTablet()) {
+ store = new ZooTabletStateStore();
+ } else {
+ store = new MetaDataStateStore();
+ }
+ store.setLocations(Collections.singletonList(assignment));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java
index 58a337f,0000000..45f6a60
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java
@@@ -1,109 -1,0 +1,84 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.tableOps;
+
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.accumulo.trace.thrift.TInfo;
+import org.apache.accumulo.fate.Repo;
+
+
+/**
+ *
+ */
+public class TraceRepo<T> implements Repo<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ TInfo tinfo;
+ Repo<T> repo;
+
+ public TraceRepo(Repo<T> repo) {
+ this.repo = repo;
+ tinfo = Tracer.traceInfo();
+ }
+
- /*
- * (non-Javadoc)
- *
- * @see org.apache.accumulo.server.fate.Repo#isReady(long, java.lang.Object)
- */
+ @Override
+ public long isReady(long tid, T environment) throws Exception {
+ Span span = Trace.trace(tinfo, repo.getDescription());
+ try {
+ return repo.isReady(tid, environment);
+ } finally {
+ span.stop();
+ }
+ }
+
- /*
- * (non-Javadoc)
- *
- * @see org.apache.accumulo.server.fate.Repo#call(long, java.lang.Object)
- */
+ @Override
+ public Repo<T> call(long tid, T environment) throws Exception {
+ Span span = Trace.trace(tinfo, repo.getDescription());
+ try {
+ Repo<T> result = repo.call(tid, environment);
+ if (result == null)
+ return result;
+ return new TraceRepo<T>(result);
+ } finally {
+ span.stop();
+ }
+ }
+
- /*
- * (non-Javadoc)
- *
- * @see org.apache.accumulo.server.fate.Repo#undo(long, java.lang.Object)
- */
+ @Override
+ public void undo(long tid, T environment) throws Exception {
+ Span span = Trace.trace(tinfo, repo.getDescription());
+ try {
+ repo.undo(tid, environment);
+ } finally {
+ span.stop();
+ }
+ }
+
- /*
- * (non-Javadoc)
- *
- * @see org.apache.accumulo.server.fate.Repo#getDescription()
- */
+ @Override
+ public String getDescription() {
+ return repo.getDescription();
+ }
+
- /*
- * (non-Javadoc)
- *
- * @see org.apache.accumulo.server.fate.Repo#getReturn()
- */
+ @Override
+ public String getReturn() {
+ return repo.getReturn();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java
index 829d7bc,0000000..7e50754
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java
+++ b/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java
@@@ -1,70 -1,0 +1,66 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metanalysis;
+
+import java.io.IOException;
+
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Output format for Accumulo write ahead logs.
+ */
+public class LogFileOutputFormat extends FileOutputFormat<LogFileKey,LogFileValue> {
+
+ private static class LogFileRecordWriter extends RecordWriter<LogFileKey,LogFileValue> {
+
+ private FSDataOutputStream out;
+
- /**
- * @param outputPath
- * @throws IOException
- */
+ public LogFileRecordWriter(Path outputPath) throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+
+ out = fs.create(outputPath);
+ }
+
+ @Override
+ public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+ out.close();
+ }
+
+ @Override
+ public void write(LogFileKey key, LogFileValue val) throws IOException, InterruptedException {
+ key.write(out);
+ val.write(out);
+ }
+
+ }
+
+ @Override
+ public RecordWriter<LogFileKey,LogFileValue> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ Path outputPath = getDefaultWorkFile(context, "");
+ return new LogFileRecordWriter(outputPath);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java
index 88f5cbe,0000000..0478d83
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java
+++ b/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java
@@@ -1,109 -1,0 +1,106 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metanalysis;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
- import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Looks up and prints mutations indexed by IndexMeta
+ */
+public class PrintEvents {
+
+ static class Opts extends ClientOpts {
+ @Parameter(names={"-t", "--tableId"}, description="table id", required=true)
+ String tableId;
+ @Parameter(names={"-e", "--endRow"}, description="end row")
+ String endRow;
+ @Parameter(names={"-t", "--time"}, description="time, in milliseconds", required=true)
+ long time;
+ }
+
- /**
- * @param args
- */
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(PrintEvents.class.getName(), args);
+
+ Connector conn = opts.getConnector();
+
+ printEvents(conn, opts.tableId, opts.endRow, opts.time);
+ }
+
+ /**
+ * @param conn
+ * @param tablePrefix
+ * @param tableId
+ * @param endRow
+ * @param time
+ */
+ private static void printEvents(Connector conn, String tableId, String endRow, Long time) throws Exception {
+ Scanner scanner = conn.createScanner("tabletEvents", new Authorizations());
+ String metaRow = tableId + (endRow == null ? "<" : ";" + endRow);
+ scanner.setRange(new Range(new Key(metaRow, String.format("%020d", time)), true, new Key(metaRow).followingKey(PartialKey.ROW), false));
+ int count = 0;
+
+ String lastLog = null;
+
+ loop1: for (Entry<Key,Value> entry : scanner) {
+ if (entry.getKey().getColumnQualifier().toString().equals("log")) {
+ if (lastLog == null || !lastLog.equals(entry.getValue().toString()))
+ System.out.println("Log : " + entry.getValue());
+ lastLog = entry.getValue().toString();
+ } else if (entry.getKey().getColumnQualifier().toString().equals("mut")) {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(entry.getValue().get()));
+ Mutation m = new Mutation();
+ m.readFields(dis);
+
+ LogFileValue lfv = new LogFileValue();
+ lfv.mutations = Collections.singletonList(m);
+
+ System.out.println(LogFileValue.format(lfv, 1));
+
+ List<ColumnUpdate> columnsUpdates = m.getUpdates();
+ for (ColumnUpdate cu : columnsUpdates) {
+ if (Constants.METADATA_PREV_ROW_COLUMN.equals(new Text(cu.getColumnFamily()), new Text(cu.getColumnQualifier())) && count > 0) {
+ System.out.println("Saw change to prevrow, stopping printing events.");
+ break loop1;
+ }
+ }
+ count++;
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
index 5a8ddec,0000000..d76c7a3
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
+++ b/server/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
@@@ -1,277 -1,0 +1,273 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.lang.management.ManagementFactory;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.time.DateUtils;
+
+public abstract class AbstractMetricsImpl {
+
+ public class Metric {
+
+ private long count = 0;
+ private long avg = 0;
+ private long min = 0;
+ private long max = 0;
+
+ public long getCount() {
+ return count;
+ }
+
+ public long getAvg() {
+ return avg;
+ }
+
+ public long getMin() {
+ return min;
+ }
+
+ public long getMax() {
+ return max;
+ }
+
+ public void incCount() {
+ count++;
+ }
+
+ public void addAvg(long a) {
+ if (a < 0)
+ return;
+ avg = (long) ((avg * .8) + (a * .2));
+ }
+
+ public void addMin(long a) {
+ if (a < 0)
+ return;
+ min = Math.min(min, a);
+ }
+
+ public void addMax(long a) {
+ if (a < 0)
+ return;
+ max = Math.max(max, a);
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this).append("count", count).append("average", avg).append("minimum", min).append("maximum", max).toString();
+ }
+
+ }
+
+ static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AbstractMetricsImpl.class);
+
+ private static ConcurrentHashMap<String,Metric> registry = new ConcurrentHashMap<String,Metric>();
+
+ private boolean currentlyLogging = false;
+
+ private File logDir = null;
+
+ private String metricsPrefix = null;
+
+ private Date today = new Date();
+
+ private File logFile = null;
+
+ private Writer logWriter = null;
+
+ private SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
+
+ private SimpleDateFormat logFormatter = new SimpleDateFormat("yyyyMMddhhmmssz");
+
+ private MetricsConfiguration config = null;
+
+ public AbstractMetricsImpl() {
+ this.metricsPrefix = getMetricsPrefix();
+ config = new MetricsConfiguration(metricsPrefix);
+ }
+
+ /**
+ * Registers a StandardMBean with the MBean Server
- *
- * @throws Exception
+ */
+ public void register(StandardMBean mbean) throws Exception {
+ // Register this object with the MBeanServer
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ if (null == getObjectName())
+ throw new IllegalArgumentException("MBean object name must be set.");
+ mbs.registerMBean(mbean, getObjectName());
+
+ setupLogging();
+ }
+
+ /**
+ * Registers this MBean with the MBean Server
- *
- * @throws Exception
+ */
+ public void register() throws Exception {
+ // Register this object with the MBeanServer
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ if (null == getObjectName())
+ throw new IllegalArgumentException("MBean object name must be set.");
+ mbs.registerMBean(this, getObjectName());
+
+ setupLogging();
+ }
+
+ public void createMetric(String name) {
+ registry.put(name, new Metric());
+ }
+
+ public Metric getMetric(String name) {
+ return registry.get(name);
+ }
+
+ public long getMetricCount(String name) {
+ return registry.get(name).getCount();
+ }
+
+ public long getMetricAvg(String name) {
+ return registry.get(name).getAvg();
+ }
+
+ public long getMetricMin(String name) {
+ return registry.get(name).getMin();
+ }
+
+ public long getMetricMax(String name) {
+ return registry.get(name).getMax();
+ }
+
+ private void setupLogging() throws IOException {
+ if (null == config.getMetricsConfiguration())
+ return;
+ // If we are already logging, then return
+ if (!currentlyLogging && config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
+ // Check to see if directory exists, else make it
+ String mDir = config.getMetricsConfiguration().getString("logging.dir");
+ if (null != mDir) {
+ File dir = new File(mDir);
+ if (!dir.isDirectory())
+ if (!dir.mkdir())
+ log.warn("Could not create log directory: " + dir);
+ logDir = dir;
+ // Create new log file
+ startNewLog();
+ }
+ currentlyLogging = true;
+ }
+ }
+
+ private void startNewLog() throws IOException {
+ if (null != logWriter) {
+ logWriter.flush();
+ logWriter.close();
+ }
+ logFile = new File(logDir, metricsPrefix + "-" + formatter.format(today) + ".log");
+ if (!logFile.exists()) {
+ if (!logFile.createNewFile()) {
+ log.error("Unable to create new log file");
+ currentlyLogging = false;
+ return;
+ }
+ }
+ logWriter = new OutputStreamWriter(new FileOutputStream(logFile, true), Constants.UTF8);
+ }
+
+ private void writeToLog(String name) throws IOException {
+ if (null == logWriter)
+ return;
+ // Increment the date if we have to
+ Date now = new Date();
+ if (!DateUtils.isSameDay(today, now)) {
+ today = now;
+ startNewLog();
+ }
+ logWriter.append(logFormatter.format(now)).append(" Metric: ").append(name).append(": ").append(registry.get(name).toString()).append("\n");
+ }
+
+ public void add(String name, long time) {
+ if (isEnabled()) {
+ registry.get(name).incCount();
+ registry.get(name).addAvg(time);
+ registry.get(name).addMin(time);
+ registry.get(name).addMax(time);
+ // If we are not currently logging and should be, then initialize
+ if (!currentlyLogging && config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
+ try {
+ setupLogging();
+ } catch (IOException ioe) {
+ log.error("Error setting up log", ioe);
+ }
+ } else if (currentlyLogging && !config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
+ // if we are currently logging and shouldn't be, then close logs
+ try {
+ logWriter.flush();
+ logWriter.close();
+ logWriter = null;
+ logFile = null;
+ } catch (Exception e) {
+ log.error("Error stopping metrics logging", e);
+ }
+ currentlyLogging = false;
+ }
+ if (currentlyLogging) {
+ try {
+ writeToLog(name);
+ } catch (IOException ioe) {
+ log.error("Error writing to metrics log", ioe);
+ }
+ }
+ }
+ }
+
+ public boolean isEnabled() {
+ return config.isEnabled();
+ }
+
+ protected abstract ObjectName getObjectName();
+
+ protected abstract String getMetricsPrefix();
+
+ @Override
+ protected void finalize() {
+ if (null != logWriter) {
+ try {
+ logWriter.close();
+ } catch (Exception e) {
+ // do nothing
+ } finally {
+ logWriter = null;
+ }
+ }
+ logFile = null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/security/handler/InsecurePermHandler.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/security/handler/InsecurePermHandler.java
index d51f3f9,0000000..3bda9d6
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/security/handler/InsecurePermHandler.java
+++ b/server/src/main/java/org/apache/accumulo/server/security/handler/InsecurePermHandler.java
@@@ -1,146 -1,0 +1,104 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+
+/**
+ * This is a Permission Handler implementation that doesn't actually do any security. Use at your own risk.
+ */
+public class InsecurePermHandler implements PermissionHandler {
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.security.handler.PermissionHandler#initialize(java.lang.String)
- */
+ @Override
+ public void initialize(String instanceId, boolean initialize) {
+ return;
+ }
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.security.handler.PermissionHandler#validSecurityHandlers(org.apache.accumulo.server.security.handler.Authenticator, org.apache.accumulo.server.security.handler.Authorizor)
- */
+ @Override
+ public boolean validSecurityHandlers(Authenticator authent, Authorizor author) {
+ return true;
+ }
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.security.handler.PermissionHandler#initializeSecurity(java.lang.String)
- */
+ @Override
+ public void initializeSecurity(TCredentials token, String rootuser) throws AccumuloSecurityException {
+ return;
+ }
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.security.handler.PermissionHandler#hasSystemPermission(java.lang.String, org.apache.accumulo.core.security.SystemPermission)
- */
+ @Override
+ public boolean hasSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ return true;
+ }
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.security.handler.PermissionHandler#hasCachedSystemPermission(java.lang.String, org.apache.accumulo.core.security.SystemPermission)
- */
+ @Override
+ public boolean hasCachedSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ return true;
+ }
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.security.handler.PermissionHandler#hasTablePermission(java.lang.String, java.lang.String, org.apache.accumulo.core.security.TablePermission)
- */
+ @Override
+ public boolean hasTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ return true;
+ }
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.security.handler.PermissionHandler#hasCachedTablePermission(java.lang.String, java.lang.String, org.apache.accumulo.core.security.TablePermission)
- */
+ @Override
+ public boolean hasCachedTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ return true;
+ }
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.security.handler.PermissionHandler#grantSystemPermission(java.lang.String, org.apache.accumulo.core.security.SystemPermission)
- */
+ @Override
+ public void grantSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ return;
+ }
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.security.handler.PermissionHandler#revokeSystemPermission(java.lang.String, org.apache.accumulo.core.security.SystemPermission)
- */
+ @Override
+ public void revokeSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ return;
+ }
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.security.handler.PermissionHandler#grantTablePermission(java.lang.String, java.lang.String, org.apache.accumulo.core.security.TablePermission)
- */
+ @Override
+ public void grantTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ return;
+ }
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.security.handler.PermissionHandler#revokeTablePermission(java.lang.String, java.lang.String, org.apache.accumulo.core.security.TablePermission)
- */
+ @Override
+ public void revokeTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ return;
+ }
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.security.handler.PermissionHandler#cleanTablePermissions(java.lang.String)
- */
+ @Override
+ public void cleanTablePermissions(String table) throws AccumuloSecurityException, TableNotFoundException {
+ return;
+ }
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.security.handler.PermissionHandler#initUser(java.lang.String)
- */
+ @Override
+ public void initUser(String user) throws AccumuloSecurityException {
+ return;
+ }
+
- /* (non-Javadoc)
- * @see org.apache.accumulo.server.security.handler.PermissionHandler#dropUser(java.lang.String)
- */
+ @Override
+ public void cleanUser(String user) throws AccumuloSecurityException {
+ return;
+ }
+
+ @Override
+ public void initTable(String table) throws AccumuloSecurityException {
+ }
+
+}