You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2014/04/09 19:58:07 UTC
[36/64] [abbrv] Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java
index efadfae,0000000..09fbbd2
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java
@@@ -1,279 -1,0 +1,278 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.net.Socket;
+import java.net.URLEncoder;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.net.SocketFactory;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.lang.math.LongRange;
+import org.apache.log4j.Category;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.Filter;
+import org.apache.log4j.spi.LocationInfo;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.log4j.spi.ThrowableInformation;
+import org.apache.log4j.varia.LevelRangeFilter;
+import org.apache.log4j.xml.XMLLayout;
+
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.Parameter;
+
+public class SendLogToChainsaw extends XMLLayout {
+
+ private static Pattern logPattern = Pattern.compile(
+ "^(\\d\\d)\\s(\\d\\d):(\\d\\d):(\\d\\d),(\\d\\d\\d)\\s\\[(.*)\\]\\s(TRACE|DEBUG|INFO|WARN|FATAL|ERROR)\\s*?:(.*)$", Pattern.UNIX_LINES);
+
+ private File[] logFiles = null;
+
+ private SocketFactory factory = SocketFactory.getDefault();
+
+ private WildcardFileFilter fileFilter = null;
+
+ private Socket socket = null;
+
+ private Pattern lineFilter = null;
+
+ private LongRange dateFilter = null;
+
+ private LevelRangeFilter levelFilter = null;
+
+ public SendLogToChainsaw(String directory, String fileNameFilter, String host, int port, Date start, Date end, String regex, String level) throws Exception {
+
+ // Set up the file name filter
+ if (null != fileNameFilter) {
+ fileFilter = new WildcardFileFilter(fileNameFilter);
+ } else {
+ fileFilter = new WildcardFileFilter("*");
+ }
+
+ // Get the list of files that match
+ File dir = new File(directory);
+ if (dir.isDirectory()) {
+ logFiles = dir.listFiles((FilenameFilter) fileFilter);
+ } else {
+ throw new IllegalArgumentException(directory + " is not a directory or is not readable.");
+ }
+
+ if (logFiles.length == 0) {
+ throw new IllegalArgumentException("No files match the supplied filter.");
+ }
+
+ socket = factory.createSocket(host, port);
+
+ lineFilter = Pattern.compile(regex);
+
+ // Create Date Filter
+ if (null != start) {
+ if (end == null)
+ end = new Date(System.currentTimeMillis());
+ dateFilter = new LongRange(start.getTime(), end.getTime());
+ }
+
+ if (null != level) {
+ Level base = Level.toLevel(level.toUpperCase());
+ levelFilter = new LevelRangeFilter();
+ levelFilter.setAcceptOnMatch(true);
+ levelFilter.setLevelMin(base);
+ levelFilter.setLevelMax(Level.FATAL);
+ }
+ }
+
+ public void processLogFiles() throws IOException {
+ String line = null;
+ String out = null;
+ InputStreamReader isReader = null;
+ BufferedReader reader = null;
+ try {
+ for (File log : logFiles) {
+ // Parse the server type and name from the log file name
+ String threadName = log.getName().substring(0, log.getName().indexOf("."));
+ try {
+ isReader = new InputStreamReader(new FileInputStream(log), Constants.UTF8);
+ } catch (FileNotFoundException e) {
+ System.out.println("Unable to find file: " + log.getAbsolutePath());
+ throw e;
+ }
+ reader = new BufferedReader(isReader);
+
+ try {
+ line = reader.readLine();
+ while (null != line) {
+ out = convertLine(line, threadName);
+ if (null != out) {
+ if (socket != null && socket.isConnected())
+ socket.getOutputStream().write(out.getBytes(Constants.UTF8));
+ else
+ System.err.println("Unable to send data to transport");
+ }
+ line = reader.readLine();
+ }
+ } catch (IOException e) {
+ System.out.println("Error processing line: " + line + ". Output was " + out);
+ throw e;
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ if (isReader != null) {
+ isReader.close();
+ }
+ }
+ }
+ } finally {
+ if (socket != null && socket.isConnected()) {
+ socket.close();
+ }
+ }
+ }
+
+ private String convertLine(String line, String threadName) throws UnsupportedEncodingException {
+ String result = null;
+ Matcher m = logPattern.matcher(line);
+ if (m.matches()) {
+
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(new Date(System.currentTimeMillis()));
+ Integer date = Integer.parseInt(m.group(1));
+ Integer hour = Integer.parseInt(m.group(2));
+ Integer min = Integer.parseInt(m.group(3));
+ Integer sec = Integer.parseInt(m.group(4));
+ Integer ms = Integer.parseInt(m.group(5));
+ String clazz = m.group(6);
+ String level = m.group(7);
+ String message = m.group(8);
+ // Apply the regex filter if supplied
+ if (null != lineFilter) {
+ Matcher match = lineFilter.matcher(message);
+ if (!match.matches())
+ return null;
+ }
+ // URL encode the message
+ message = URLEncoder.encode(message, "UTF-8");
+ // Assume that we are processing logs from today.
+ // If the date in the line is greater than today, then it must be
+ // from the previous month.
+ cal.set(Calendar.DATE, date);
+ cal.set(Calendar.HOUR_OF_DAY, hour);
+ cal.set(Calendar.MINUTE, min);
+ cal.set(Calendar.SECOND, sec);
+ cal.set(Calendar.MILLISECOND, ms);
+ if (date > cal.get(Calendar.DAY_OF_MONTH)) {
+ cal.add(Calendar.MONTH, -1);
+ }
+ long ts = cal.getTimeInMillis();
+ // If this event is not between the start and end dates, then skip it.
+ if (null != dateFilter && !dateFilter.containsLong(ts))
+ return null;
+ Category c = Logger.getLogger(clazz);
+ Level l = Level.toLevel(level);
+ LoggingEvent event = new LoggingEvent(clazz, c, ts, l, message, threadName, (ThrowableInformation) null, (String) null, (LocationInfo) null,
+ (Map<?,?>) null);
+ // Check the log level filter
+ if (null != levelFilter && (levelFilter.decide(event) == Filter.DENY)) {
+ return null;
+ }
+ result = format(event);
+ }
+ return result;
+ }
+
+ private static class DateConverter implements IStringConverter<Date> {
+ @Override
+ public Date convert(String value) {
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
+ try {
+ return formatter.parse(value);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ private static class Opts extends Help {
+
+ @Parameter(names={"-d", "--logDirectory"}, description="ACCUMULO log directory path", required=true)
+ String dir;
+
+ @Parameter(names={"-f", "--fileFilter"}, description="filter to apply to names of logs")
+ String filter;
+
+ @Parameter(names={"-h", "--host"}, description="host where chainsaw is running", required=true)
+ String hostname;
+
+ @Parameter(names={"-p", "--port"}, description="port where XMLSocketReceiver is listening", required=true)
+ int portnum;
+
+ @Parameter(names={"-s", "--start"}, description="start date filter (yyyyMMddHHmmss)", required=true, converter=DateConverter.class)
+ Date startDate;
+
+ @Parameter(names={"-e", "--end"}, description="end date filter (yyyyMMddHHmmss)", required=true, converter=DateConverter.class)
+ Date endDate;
+
+ @Parameter(names={"-l", "--level"}, description="filter log level")
+ String level;
+
+ @Parameter(names={"-m", "--messageFilter"}, description="regex filter for log messages")
+ String regex;
+ }
+
+
+
+
+ /**
+ *
+ * @param args
+ * 0: path to log directory parameter
+ * 1: filter to apply for logs to include (uses wildcards (i.e. logger* and IS case sensitive) parameter
+ * 2: chainsaw host parameter
+ * 3: chainsaw port parameter
+ * 4: start date filter parameter
+ * 5: end date filter parameter
+ * 6: optional regex filter to match on each log4j message parameter
+ * 7: optional level filter
- * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(SendLogToChainsaw.class.getName(), args);
+
+ SendLogToChainsaw c = new SendLogToChainsaw(opts.dir, opts.filter, opts.hostname, opts.portnum, opts.startDate, opts.endDate, opts.regex, opts.level);
+ c.processLogFiles();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index 50476a2,0000000..fa4de30
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@@ -1,313 -1,0 +1,314 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.TBufferedSocket;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.thrift.metrics.ThriftMetrics;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class TServerUtils {
+ private static final Logger log = Logger.getLogger(TServerUtils.class);
+
+ public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
+
+ public static class ServerPort {
+ public final TServer server;
+ public final int port;
+
+ public ServerPort(TServer server, int port) {
+ this.server = server;
+ this.port = port;
+ }
+ }
+
+ /**
+ * Start a server, at the given port, or higher, if that port is not available.
+ *
+ * @param portHintProperty
+ * the port to attempt to open, can be zero, meaning "any available port"
+ * @param processor
+ * the service to be started
+ * @param serverName
+ * the name of the class that is providing the service
+ * @param threadName
+ * name this service's thread for better debugging
- * @param portSearchProperty
- * @param minThreadProperty
- * @param timeBetweenThreadChecksProperty
+ * @return the server object created, and the port actually used
+ * @throws UnknownHostException
+ * when we don't know our own address
+ */
+ public static ServerPort startServer(AccumuloConfiguration conf, Property portHintProperty, TProcessor processor, String serverName, String threadName,
+ Property portSearchProperty,
+ Property minThreadProperty,
+ Property timeBetweenThreadChecksProperty,
+ Property maxMessageSizeProperty) throws UnknownHostException {
+ int portHint = conf.getPort(portHintProperty);
+ int minThreads = 2;
+ if (minThreadProperty != null)
+ minThreads = conf.getCount(minThreadProperty);
+ long timeBetweenThreadChecks = 1000;
+ if (timeBetweenThreadChecksProperty != null)
+ timeBetweenThreadChecks = conf.getTimeInMillis(timeBetweenThreadChecksProperty);
+ long maxMessageSize = 10 * 1000 * 1000;
+ if (maxMessageSizeProperty != null)
+ maxMessageSize = conf.getMemoryInBytes(maxMessageSizeProperty);
+ boolean portSearch = false;
+ if (portSearchProperty != null)
+ portSearch = conf.getBoolean(portSearchProperty);
+ Random random = new Random();
+ for (int j = 0; j < 100; j++) {
+
+ // Are we going to slide around, looking for an open port?
+ int portsToSearch = 1;
+ if (portSearch)
+ portsToSearch = 1000;
+
+ for (int i = 0; i < portsToSearch; i++) {
+ int port = portHint + i;
+ if (portHint != 0 && i > 0)
+ port = 1024 + random.nextInt(65535 - 1024);
+ if (port > 65535)
+ port = 1024 + port % (65535 - 1024);
+ try {
+ return TServerUtils.startTServer(port, processor, serverName, threadName, minThreads, timeBetweenThreadChecks, maxMessageSize);
+ } catch (Exception ex) {
+ log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName + ")");
+ UtilWaitThread.sleep(250);
+ }
+ }
+ }
+ throw new UnknownHostException("Unable to find a listen port");
+ }
+
+ public static class TimedProcessor implements TProcessor {
+
+ final TProcessor other;
+ ThriftMetrics metrics = null;
+ long idleStart = 0;
+
+ TimedProcessor(TProcessor next, String serverName, String threadName) {
+ this.other = next;
+ // Register the metrics MBean
+ try {
+ metrics = new ThriftMetrics(serverName, threadName);
+ metrics.register();
+ } catch (Exception e) {
+ log.error("Exception registering MBean with MBean Server", e);
+ }
+ idleStart = System.currentTimeMillis();
+ }
+
+ @Override
+ public boolean process(TProtocol in, TProtocol out) throws TException {
+ long now = 0;
+ if (metrics.isEnabled()) {
+ now = System.currentTimeMillis();
+ metrics.add(ThriftMetrics.idle, (now - idleStart));
+ }
+ try {
+ try {
+ return other.process(in, out);
+ } catch (NullPointerException ex) {
+ // THRIFT-1447 - remove with thrift 0.9
+ return true;
+ }
+ } finally {
+ if (metrics.isEnabled()) {
+ idleStart = System.currentTimeMillis();
+ metrics.add(ThriftMetrics.execute, idleStart - now);
+ }
+ }
+ }
+ }
+
+ public static class ClientInfoProcessorFactory extends TProcessorFactory {
+
+ public ClientInfoProcessorFactory(TProcessor processor) {
+ super(processor);
+ }
+
++ @Override
+ public TProcessor getProcessor(TTransport trans) {
+ if (trans instanceof TBufferedSocket) {
+ TBufferedSocket tsock = (TBufferedSocket) trans;
+ clientAddress.set(tsock.getClientString());
+ }
+ return super.getProcessor(trans);
+ }
+ }
+
+ public static class THsHaServer extends org.apache.thrift.server.THsHaServer {
+ public THsHaServer(Args args) {
+ super(args);
+ }
+
++ @Override
+ protected Runnable getRunnable(FrameBuffer frameBuffer) {
+ return new Invocation(frameBuffer);
+ }
+
+ private class Invocation implements Runnable {
+
+ private final FrameBuffer frameBuffer;
+
+ public Invocation(final FrameBuffer frameBuffer) {
+ this.frameBuffer = frameBuffer;
+ }
+
++ @Override
+ public void run() {
+ if (frameBuffer.trans_ instanceof TNonblockingSocket) {
+ TNonblockingSocket tsock = (TNonblockingSocket) frameBuffer.trans_;
+ Socket sock = tsock.getSocketChannel().socket();
+ clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
+ }
+ frameBuffer.invoke();
+ }
+ }
+ }
+
+ public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, final int numThreads,
+ long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
+ TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
+ if (port == 0) {
+ port = transport.getPort();
+ }
+ THsHaServer.Args options = new THsHaServer.Args(transport);
+ options.protocolFactory(ThriftUtil.protocolFactory());
+ options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+ options.maxReadBufferBytes = maxMessageSize;
+ options.stopTimeoutVal(5);
+ /*
+ * Create our own very special thread pool.
+ */
+ final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
+ // periodically adjust the number of threads we need by checking how busy our threads are
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+ int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
+ log.info("Increasing server thread pool size on " + serverName + " to " + larger);
+ pool.setMaximumPoolSize(larger);
+ pool.setCorePoolSize(larger);
+ } else {
+ if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+ int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
+ if (smaller != pool.getCorePoolSize()) {
+ // there is a race condition here... the active count could be higher by the time
+ // we decrease the core pool size... so the active count could end up higher than
+ // the core pool size, in which case everything will be queued... the increase case
+ // should handle this and prevent deadlock
+ log.info("Decreasing server thread pool size on " + serverName + " to " + smaller);
+ pool.setCorePoolSize(smaller);
+ }
+ }
+ }
+ }
+ }, timeBetweenThreadChecks, timeBetweenThreadChecks);
+ options.executorService(pool);
+ processor = new TServerUtils.TimedProcessor(processor, serverName, threadName);
+ options.processorFactory(new TProcessorFactory(processor));
+ return new ServerPort(new THsHaServer(options), port);
+ }
+
+ public static ServerPort startThreadPoolServer(int port, TProcessor processor, String serverName, String threadName, int numThreads)
+ throws TTransportException {
+
+ // if port is zero, then we must bind to get the port number
+ ServerSocket sock;
+ try {
+ sock = ServerSocketChannel.open().socket();
+ sock.setReuseAddress(true);
+ sock.bind(new InetSocketAddress(port));
+ port = sock.getLocalPort();
+ } catch (IOException ex) {
+ throw new TTransportException(ex);
+ }
+ TServerTransport transport = new TBufferedServerSocket(sock, 32 * 1024);
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.protocolFactory(ThriftUtil.protocolFactory());
+ options.transportFactory(ThriftUtil.transportFactory());
+ processor = new TServerUtils.TimedProcessor(processor, serverName, threadName);
+ options.processorFactory(new ClientInfoProcessorFactory(processor));
+ return new ServerPort(new TThreadPoolServer(options), port);
+ }
+
+ public static ServerPort startTServer(int port, TProcessor processor, String serverName, String threadName, int numThreads, long timeBetweenThreadChecks, long maxMessageSize)
+ throws TTransportException {
+ ServerPort result = startHsHaServer(port, processor, serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize);
+ // ServerPort result = startThreadPoolServer(port, processor, serverName, threadName, -1);
+ final TServer finalServer = result.server;
+ Runnable serveTask = new Runnable() {
++ @Override
+ public void run() {
+ try {
+ finalServer.serve();
+ } catch (Error e) {
+ Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
+ }
+ }
+ };
+ serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
+ Thread thread = new Daemon(serveTask, threadName);
+ thread.start();
+ return result;
+ }
+
+ // Existing connections will keep our thread running: reach in with reflection and insist that they shutdown.
+ public static void stopTServer(TServer s) {
+ if (s == null)
+ return;
+ s.stop();
+ try {
+ Field f = s.getClass().getDeclaredField("executorService_");
+ f.setAccessible(true);
+ ExecutorService es = (ExecutorService) f.get(s);
+ es.shutdownNow();
+ } catch (Exception e) {
+ TServerUtils.log.error("Unable to call shutdownNow", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
index c3f4a72,0000000..92e0674
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
@@@ -1,48 -1,0 +1,45 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import com.beust.jcommander.Parameter;
+
+public class TableDiskUsage {
+
+ static class Opts extends ClientOpts {
+ @Parameter(description=" <table> { <table> ... } ")
+ List<String> tables = new ArrayList<String>();
+ }
+
- /**
- * @param args
- */
+ public static void main(String[] args) throws Exception {
+ FileSystem fs = FileSystem.get(new Configuration());
+ Opts opts = new Opts();
+ opts.parseArgs(TableDiskUsage.class.getName(), args);
+ Connector conn = opts.getConnector();
+ org.apache.accumulo.core.util.TableDiskUsage.printDiskUsage(DefaultConfiguration.getInstance(), opts.tables, fs, conn, false);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
index 2fc0bd3,0000000..34c2151
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java
@@@ -1,75 -1,0 +1,72 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.List;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+
+import com.beust.jcommander.Parameter;
+
+public class TabletServerLocks {
+
+ static class Opts extends Help {
+ @Parameter(names="-list")
+ boolean list = false;
+ @Parameter(names="-delete")
+ String delete = null;
+ }
- /**
- * @param args
- */
+ public static void main(String[] args) throws Exception {
+
+ Instance instance = HdfsZooInstance.getInstance();
+ String tserverPath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+ Opts opts = new Opts();
+ opts.parseArgs(TabletServerLocks.class.getName(), args);
+
+ ZooCache cache = new ZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
+
+ if (opts.list) {
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+
+ List<String> tabletServers = zoo.getChildren(tserverPath);
+
+ for (String tabletServer : tabletServers) {
+ byte[] lockData = ZooLock.getLockData(cache, tserverPath + "/" + tabletServer, null);
+ String holder = null;
+ if (lockData != null) {
+ holder = new String(lockData, Constants.UTF8);
+ }
+
+ System.out.printf("%32s %16s%n", tabletServer, holder);
+ }
+ } else if (opts.delete != null) {
+ ZooLock.deleteLock(tserverPath + "/" + args[1]);
+ } else {
+ System.out.println("Usage : " + TabletServerLocks.class.getName() + " -list|-delete <tserver lock>");
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java
----------------------------------------------------------------------
diff --cc start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java
index 5f7fd5e,0000000..1e2b4b5
mode 100644,000000..100644
--- a/start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java
+++ b/start/src/main/java/org/apache/accumulo/start/classloader/AccumuloClassLoader.java
@@@ -1,255 -1,0 +1,252 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.start.classloader;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import org.apache.log4j.Logger;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+/**
+ *
+ */
+public class AccumuloClassLoader {
+
+ public static final String CLASSPATH_PROPERTY_NAME = "general.classpaths";
+
+ public static final String ACCUMULO_CLASSPATH_VALUE =
+ "$ACCUMULO_CONF_DIR,\n" +
+ "$ACCUMULO_HOME/lib/[^.].*.jar,\n" +
+ "$ZOOKEEPER_HOME/zookeeper[^.].*.jar,\n" +
+ "$HADOOP_CONF_DIR,\n" +
+ "$HADOOP_PREFIX/[^.].*.jar,\n" +
+ "$HADOOP_PREFIX/lib/[^.].*.jar,\n" +
+ "$HADOOP_PREFIX/share/hadoop/common/.*.jar,\n" +
+ "$HADOOP_PREFIX/share/hadoop/common/lib/.*.jar,\n" +
+ "$HADOOP_PREFIX/share/hadoop/hdfs/.*.jar,\n" +
+ "$HADOOP_PREFIX/share/hadoop/mapreduce/.*.jar,\n" +
+ "/usr/lib/hadoop/[^.].*.jar,\n" +
+ "/usr/lib/hadoop/lib/[^.].*.jar,\n" +
+ "/usr/lib/hadoop-hdfs/[^.].*.jar,\n" +
+ "/usr/lib/hadoop-mapreduce/[^.].*.jar,\n" +
+ "/usr/lib/hadoop-yarn/[^.].*.jar,\n"
+ ;
+
+ private static String SITE_CONF;
+
+ private static URLClassLoader classloader;
+
+ private static Logger log = Logger.getLogger(AccumuloClassLoader.class);
+
+ static {
+ String configFile = System.getProperty("org.apache.accumulo.config.file", "accumulo-site.xml");
+ if (System.getenv("ACCUMULO_CONF_DIR") != null) {
+ // accumulo conf dir should be set
+ SITE_CONF = System.getenv("ACCUMULO_CONF_DIR") + "/" + configFile;
+ } else if (System.getenv("ACCUMULO_HOME") != null) {
+ // if no accumulo conf dir, try accumulo home default
+ SITE_CONF = System.getenv("ACCUMULO_HOME") + "/conf/" + configFile;
+ } else {
+ SITE_CONF = null;
+ }
+ }
+
+ /**
+ * Parses and XML Document for a property node for a <name> with the value propertyName if it finds one the function return that property's value for its
+ * <value> node. If not found the function will return null
+ *
+ * @param d
+ * XMLDocument to search through
+ * @param propertyName
+ */
+ private static String getAccumuloClassPathStrings(Document d, String propertyName) {
+ NodeList pnodes = d.getElementsByTagName("property");
+ for (int i = pnodes.getLength() - 1; i >= 0; i--) {
+ Element current_property = (Element) pnodes.item(i);
+ Node cname = current_property.getElementsByTagName("name").item(0);
+ if (cname != null && cname.getTextContent().compareTo(propertyName) == 0) {
+ Node cvalue = current_property.getElementsByTagName("value").item(0);
+ if (cvalue != null) {
+ return cvalue.getTextContent();
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Looks for the site configuration file for Accumulo and if it has a property for propertyName return it otherwise returns defaultValue Should throw an
+ * exception if the default configuration can not be read;
+ *
+ * @param propertyName
+ * Name of the property to pull
+ * @param defaultValue
+ * Value to default to if not found.
+ * @return site or default class path String
+ */
+
+ public static String getAccumuloString(String propertyName, String defaultValue) {
+
+ try {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ DocumentBuilder db = dbf.newDocumentBuilder();
+ String site_classpath_string = null;
+ try {
+ Document site_conf = db.parse(SITE_CONF);
+ site_classpath_string = getAccumuloClassPathStrings(site_conf, propertyName);
+ } catch (Exception e) {
+ /* we don't care because this is optional and we can use defaults */
+ }
+ if (site_classpath_string != null)
+ return site_classpath_string;
+ return defaultValue;
+ } catch (Exception e) {
+ throw new IllegalStateException("ClassPath Strings Lookup failed", e);
+ }
+ }
+
+ /**
+ * Replace environment variables in the classpath string with their actual value
- *
- * @param classpath
- * @param env
+ */
+ public static String replaceEnvVars(String classpath, Map<String,String> env) {
+ Pattern envPat = Pattern.compile("\\$[A-Za-z][a-zA-Z0-9_]*");
+ Matcher envMatcher = envPat.matcher(classpath);
+ while (envMatcher.find(0)) {
+ // name comes after the '$'
+ String varName = envMatcher.group().substring(1);
+ String varValue = env.get(varName);
+ if (varValue == null) {
+ varValue = "";
+ }
+ classpath = (classpath.substring(0, envMatcher.start()) + varValue + classpath.substring(envMatcher.end()));
+ envMatcher.reset(classpath);
+ }
+ return classpath;
+ }
+
+ /**
+ * Populate the list of URLs with the items in the classpath string
+ *
+ * @param classpath
+ * @param urls
+ * @throws MalformedURLException
+ */
+ private static void addUrl(String classpath, ArrayList<URL> urls) throws MalformedURLException {
+ classpath = classpath.trim();
+ if (classpath.length() == 0)
+ return;
+
+ classpath = replaceEnvVars(classpath, System.getenv());
+
+ // Try to make a URI out of the classpath
+ URI uri = null;
+ try {
+ uri = new URI(classpath);
+ } catch (URISyntaxException e) {
+ // Not a valid URI
+ }
+
+ if (null == uri || !uri.isAbsolute() || (null != uri.getScheme() && uri.getScheme().equals("file://"))) {
+ // Then treat this URI as a File.
+ // This checks to see if the url string is a dir if it expand and get all jars in that directory
+ final File extDir = new File(classpath);
+ if (extDir.isDirectory())
+ urls.add(extDir.toURI().toURL());
+ else {
+ if (extDir.getParentFile() != null) {
+ File[] extJars = extDir.getParentFile().listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.matches("^" + extDir.getName());
+ }
+ });
+ if (extJars != null && extJars.length > 0) {
+ for (File jar : extJars)
+ urls.add(jar.toURI().toURL());
+ } else {
+ log.debug("ignoring classpath entry " + classpath);
+ }
+ } else {
+ log.debug("ignoring classpath entry " + classpath);
+ }
+ }
+ } else {
+ urls.add(uri.toURL());
+ }
+
+ }
+
+ private static ArrayList<URL> findAccumuloURLs() throws IOException {
+ String cp = getAccumuloString(AccumuloClassLoader.CLASSPATH_PROPERTY_NAME, AccumuloClassLoader.ACCUMULO_CLASSPATH_VALUE);
+ if (cp == null)
+ return new ArrayList<URL>();
+ String[] cps = replaceEnvVars(cp, System.getenv()).split(",");
+ ArrayList<URL> urls = new ArrayList<URL>();
+ for (String classpath : cps) {
+ if (!classpath.startsWith("#")) {
+ addUrl(classpath, urls);
+ }
+ }
+ return urls;
+ }
+
+ public static synchronized ClassLoader getClassLoader() throws IOException {
+ if (classloader == null) {
+ ArrayList<URL> urls = findAccumuloURLs();
+
+ ClassLoader parentClassLoader = AccumuloClassLoader.class.getClassLoader();
+
+ log.debug("Create 2nd tier ClassLoader using URLs: " + urls.toString());
+ URLClassLoader aClassLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), parentClassLoader) {
+ @Override
+ protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+
+ if (name.startsWith("org.apache.accumulo.start.classloader.vfs")) {
+ Class<?> c = findLoadedClass(name);
+ if (c == null) {
+ try {
+ // try finding this class here instead of parent
+ findClass(name);
+ } catch (ClassNotFoundException e) {}
+ }
+ }
+ return super.loadClass(name, resolve);
+ }
+ };
+ classloader = aClassLoader;
+ }
+
+ return classloader;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/start/src/main/java/org/apache/accumulo/start/classloader/vfs/ContextManager.java
----------------------------------------------------------------------
diff --cc start/src/main/java/org/apache/accumulo/start/classloader/vfs/ContextManager.java
index 7742cbe,0000000..e1ff55e
mode 100644,000000..100644
--- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/ContextManager.java
+++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/ContextManager.java
@@@ -1,207 -1,0 +1,205 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.start.classloader.vfs;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.vfs2.FileSystemException;
+import org.apache.commons.vfs2.FileSystemManager;
+
+public class ContextManager {
+
+ // there is a lock per context so that one context can initialize w/o blocking another context
+ private class Context {
+ AccumuloReloadingVFSClassLoader loader;
+ ContextConfig cconfig;
+ boolean closed = false;
+
+ Context(ContextConfig cconfig) {
+ this.cconfig = cconfig;
+ }
+
+ synchronized ClassLoader getClassLoader() throws FileSystemException {
+ if (closed)
+ return null;
+
+ if (loader == null) {
+ loader = new AccumuloReloadingVFSClassLoader(cconfig.uris, vfs, parent, cconfig.preDelegation);
+ }
+
+ return loader.getClassLoader();
+ }
+
+ synchronized void close() {
+ closed = true;
+ loader.close();
+ loader = null;
+ }
+ }
+
+ private Map<String,Context> contexts = new HashMap<String,Context>();
+
+ private volatile ContextsConfig config;
+ private FileSystemManager vfs;
+ private ReloadingClassLoader parent;
+
+ ContextManager(FileSystemManager vfs, ReloadingClassLoader parent) {
+ this.vfs = vfs;
+ this.parent = parent;
+ }
+
+ public static class ContextConfig {
+ String uris;
+ boolean preDelegation;
+
+ public ContextConfig(String uris, boolean preDelegation) {
+ this.uris = uris;
+ this.preDelegation = preDelegation;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof ContextConfig) {
+ ContextConfig oc = (ContextConfig) o;
+
+ return uris.equals(oc.uris) && preDelegation == oc.preDelegation;
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return uris.hashCode() + (preDelegation ? Boolean.TRUE : Boolean.FALSE).hashCode();
+ }
+ }
+
+ public interface ContextsConfig {
+ ContextConfig getContextConfig(String context);
+ }
+
+ public static class DefaultContextsConfig implements ContextsConfig {
+
+ private Iterable<Entry<String,String>> config;
+
+ public DefaultContextsConfig(Iterable<Entry<String,String>> config) {
+ this.config = config;
+ }
+
+ @Override
+ public ContextConfig getContextConfig(String context) {
+
+ String key = AccumuloVFSClassLoader.VFS_CONTEXT_CLASSPATH_PROPERTY + context;
+
+ String uris = null;
+ boolean preDelegate = true;
+
+ Iterator<Entry<String,String>> iter = config.iterator();
+ while (iter.hasNext()) {
+ Entry<String,String> entry = iter.next();
+ if (entry.getKey().equals(key)) {
+ uris = entry.getValue();
+ }
+
+ if (entry.getKey().equals(key + ".delegation") && entry.getValue().trim().equalsIgnoreCase("post")) {
+ preDelegate = false;
+ }
+ }
+
+ if (uris != null)
+ return new ContextConfig(uris, preDelegate);
+
+ return null;
+ }
+ }
+
+ /**
+ * configuration must be injected for ContextManager to work
- *
- * @param config
+ */
+ public synchronized void setContextConfig(ContextsConfig config) {
+ if (this.config != null)
+ throw new IllegalStateException("Context manager config already set");
+ this.config = config;
+ }
+
+ public ClassLoader getClassLoader(String contextName) throws FileSystemException {
+
+ ContextConfig cconfig = config.getContextConfig(contextName);
+
+ if (cconfig == null)
+ throw new IllegalArgumentException("Unknown context " + contextName);
+
+ Context context = null;
+ Context contextToClose = null;
+
+ synchronized (this) {
+ // only manipulate internal data structs in this sync block... avoid creating or closing classloader, reading config, etc... basically avoid operations
+ // that may block
+ context = contexts.get(contextName);
+
+ if (context == null) {
+ context = new Context(cconfig);
+ contexts.put(contextName, context);
+ } else if (!context.cconfig.equals(cconfig)) {
+ contextToClose = context;
+ context = new Context(cconfig);
+ contexts.put(contextName, context);
+ }
+ }
+
+ if (contextToClose != null)
+ contextToClose.close();
+
+ ClassLoader loader = context.getClassLoader();
+ if (loader == null) {
+ // ooppss, context was closed by another thread, try again
+ return getClassLoader(contextName);
+ }
+
+ return loader;
+
+ }
+
+ public <U> Class<? extends U> loadClass(String context, String classname, Class<U> extension) throws ClassNotFoundException {
+ try {
+ return getClassLoader(context).loadClass(classname).asSubclass(extension);
+ } catch (IOException e) {
+ throw new ClassNotFoundException("IO Error loading class " + classname, e);
+ }
+ }
+
+ public void removeUnusedContexts(Set<String> inUse) {
+
+ Map<String,Context> unused;
+
+ synchronized (this) {
+ unused = new HashMap<String,Context>(contexts);
+ unused.keySet().removeAll(inUse);
+ contexts.keySet().removeAll(unused.keySet());
+ }
+
+ for (Context context : unused.values()) {
+ // close outside of lock
+ context.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
----------------------------------------------------------------------
diff --cc start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
index 0a6931f,0000000..277c741
mode 100644,000000..100644
--- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
+++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/PostDelegatingVFSClassLoader.java
@@@ -1,52 -1,0 +1,47 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.start.classloader.vfs;
+
+import org.apache.commons.vfs2.FileObject;
+import org.apache.commons.vfs2.FileSystemException;
+import org.apache.commons.vfs2.FileSystemManager;
+import org.apache.commons.vfs2.impl.VFSClassLoader;
+
+/**
+ *
+ */
+public class PostDelegatingVFSClassLoader extends VFSClassLoader {
+
- /**
- * @param files
- * @param manager
- * @param parent
- * @throws FileSystemException
- */
+ public PostDelegatingVFSClassLoader(FileObject[] files, FileSystemManager manager, ClassLoader parent) throws FileSystemException {
+ super(files, manager, parent);
+ }
+
++ @Override
+ protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ Class<?> c = findLoadedClass(name);
+ if (c == null) {
+ try {
+ // try finding this class here instead of parent
+ findClass(name);
+ } catch (ClassNotFoundException e) {
+
+ }
+ }
+ return super.loadClass(name, resolve);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
----------------------------------------------------------------------
diff --cc start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
index 92b2720,0000000..104ea09
mode 100644,000000..100644
--- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
+++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/providers/HdfsFileSystem.java
@@@ -1,164 -1,0 +1,159 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.start.classloader.vfs.providers;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.vfs2.CacheStrategy;
+import org.apache.commons.vfs2.Capability;
+import org.apache.commons.vfs2.FileName;
+import org.apache.commons.vfs2.FileObject;
+import org.apache.commons.vfs2.FileSystemException;
+import org.apache.commons.vfs2.FileSystemOptions;
+import org.apache.commons.vfs2.provider.AbstractFileName;
+import org.apache.commons.vfs2.provider.AbstractFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A VFS FileSystem that interacts with HDFS.
+ *
+ * @since 2.1
+ */
+public class HdfsFileSystem extends AbstractFileSystem
+{
+ private static final Log log = LogFactory.getLog(HdfsFileSystem.class);
+
+ private FileSystem fs;
+
- /**
- *
- * @param rootName
- * @param fileSystemOptions
- */
+ protected HdfsFileSystem(final FileName rootName, final FileSystemOptions fileSystemOptions)
+ {
+ super(rootName, null, fileSystemOptions);
+ }
+
+ /**
+ * @see org.apache.commons.vfs2.provider.AbstractFileSystem#addCapabilities(java.util.Collection)
+ */
+ @Override
+ protected void addCapabilities(final Collection<Capability> capabilities)
+ {
+ capabilities.addAll(HdfsFileProvider.CAPABILITIES);
+ }
+
+ /**
+ * @see org.apache.commons.vfs2.provider.AbstractFileSystem#close()
+ */
+ @Override
+ public void close()
+ {
+ try
+ {
+ if (null != fs)
+ {
+ fs.close();
+ }
+ }
+ catch (final IOException e)
+ {
+ throw new RuntimeException("Error closing HDFS client", e);
+ }
+ super.close();
+ }
+
+ /**
+ * @see org.apache.commons.vfs2.provider.AbstractFileSystem#createFile(org.apache.commons.vfs2.provider.AbstractFileName)
+ */
+ @Override
+ protected FileObject createFile(final AbstractFileName name) throws Exception
+ {
+ throw new FileSystemException("Operation not supported");
+ }
+
+ /**
+ * @see org.apache.commons.vfs2.provider.AbstractFileSystem#resolveFile(org.apache.commons.vfs2.FileName)
+ */
+ @Override
+ public FileObject resolveFile(final FileName name) throws FileSystemException
+ {
+
+ synchronized (this)
+ {
+ if (null == this.fs)
+ {
+ final String hdfsUri = name.getRootURI();
+ final Configuration conf = new Configuration(true);
+ conf.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, hdfsUri);
+ this.fs = null;
+ try
+ {
+ fs = org.apache.hadoop.fs.FileSystem.get(conf);
+ }
+ catch (final IOException e)
+ {
+ log.error("Error connecting to filesystem " + hdfsUri, e);
+ throw new FileSystemException("Error connecting to filesystem " + hdfsUri, e);
+ }
+ }
+ }
+
+ boolean useCache = (null != getContext().getFileSystemManager().getFilesCache());
+ FileObject file;
+ if (useCache)
+ {
+ file = this.getFileFromCache(name);
+ }
+ else
+ {
+ file = null;
+ }
+ if (null == file)
+ {
+ String path = null;
+ try
+ {
+ path = URLDecoder.decode(name.getPath(), "UTF-8");
+ }
+ catch (final UnsupportedEncodingException e)
+ {
+ path = name.getPath();
+ }
+ final Path filePath = new Path(path);
+ file = new HdfsFileObject((AbstractFileName) name, this, fs, filePath);
+ if (useCache)
+ {
+ this.putFileToCache(file);
+ }
+
+ }
+
+ /**
+ * resync the file information if requested
+ */
+ if (getFileSystemManager().getCacheStrategy().equals(CacheStrategy.ON_RESOLVE)) {
+ file.refresh();
+ }
+
+ return file;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
index 65cf80c,0000000..7b1313a
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
@@@ -1,129 -1,0 +1,120 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
- import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.accumulo.core.client.impl.MasterClient;
- import org.apache.accumulo.core.master.MasterNotRunningException;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
+import org.apache.accumulo.core.master.thrift.RecoveryStatus;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.monitor.Monitor;
+import org.apache.accumulo.server.security.SecurityConstants;
- import org.apache.thrift.transport.TTransportException;
+
+public class GetMasterStats {
- /**
- * @param args
- * @throws MasterNotRunningException
- * @throws IOException
- * @throws TTransportException
- */
+ public static void main(String[] args) throws Exception {
+ MasterClientService.Iface client = null;
+ MasterMonitorInfo stats = null;
+ try {
+ client = MasterClient.getConnectionWithRetry(HdfsZooInstance.getInstance());
+ stats = client.getMasterStats(Tracer.traceInfo(), SecurityConstants.getSystemCredentials());
+ } finally {
+ if (client != null)
+ MasterClient.close(client);
+ }
+ out(0, "State: " + stats.state.name());
+ out(0, "Goal State: " + stats.goalState.name());
+ if (stats.serversShuttingDown != null && stats.serversShuttingDown.size() > 0) {
+ out(0, "Servers to shutdown");
+ for (String server : stats.serversShuttingDown) {
+ out(1, "%s", server);
+ }
+ }
+ out(0, "Unassigned tablets: %d", stats.unassignedTablets);
+ if (stats.badTServers != null && stats.badTServers.size() > 0) {
+ out(0, "Bad servers");
+
+ for (Entry<String,Byte> entry : stats.badTServers.entrySet()) {
+ out(1, "%s: %d", entry.getKey(), (int) entry.getValue());
+ }
+ }
+ if (stats.tableMap != null && stats.tableMap.size() > 0) {
+ out(0, "Tables");
+ for (Entry<String,TableInfo> entry : stats.tableMap.entrySet()) {
+ TableInfo v = entry.getValue();
+ out(1, "%s", entry.getKey());
+ out(2, "Records: %d", v.recs);
+ out(2, "Records in Memory: %d", v.recsInMemory);
+ out(2, "Tablets: %d", v.tablets);
+ out(2, "Online Tablets: %d", v.onlineTablets);
+ out(2, "Ingest Rate: %.2f", v.ingestRate);
+ out(2, "Query Rate: %.2f", v.queryRate);
+ }
+ }
+ if (stats.tServerInfo != null && stats.tServerInfo.size() > 0) {
+ out(0, "Tablet Servers");
+ long now = System.currentTimeMillis();
+ for (TabletServerStatus server : stats.tServerInfo) {
+ TableInfo summary = Monitor.summarizeTableStats(server);
+ out(1, "Name: %s", server.name);
+ out(2, "Ingest: %.2f", summary.ingestRate);
+ out(2, "Last Contact: %s", server.lastContact);
+ out(2, "OS Load Average: %.2f", server.osLoad);
+ out(2, "Queries: %.2f", summary.queryRate);
+ out(2, "Time Difference: %.1f", ((now - server.lastContact) / 1000.));
+ out(2, "Total Records: %d", summary.recs);
+ out(2, "Lookups: %d", server.lookups);
+ if (server.holdTime > 0)
+ out(2, "Hold Time: %d", server.holdTime);
+ if (server.tableMap != null && server.tableMap.size() > 0) {
+ out(2, "Tables");
+ for (Entry<String,TableInfo> status : server.tableMap.entrySet()) {
+ TableInfo info = status.getValue();
+ out(3, "Table: %s", status.getKey());
+ out(4, "Tablets: %d", info.onlineTablets);
+ out(4, "Records: %d", info.recs);
+ out(4, "Records in Memory: %d", info.recsInMemory);
+ out(4, "Ingest: %.2f", info.ingestRate);
+ out(4, "Queries: %.2f", info.queryRate);
+ out(4, "Major Compacting: %d", info.majors == null ? 0 : info.majors.running);
+ out(4, "Queued for Major Compaction: %d", info.majors == null ? 0 : info.majors.queued);
+ out(4, "Minor Compacting: %d", info.minors == null ? 0 : info.minors.running);
+ out(4, "Queued for Minor Compaction: %d", info.minors == null ? 0 : info.minors.queued);
+ }
+ }
+ out(2, "Recoveries: %d", server.logSorts.size());
+ for (RecoveryStatus sort : server.logSorts) {
+ out(3, "File: %s", sort.name);
+ out(3, "Progress: %.2f%%", sort.progress * 100);
+ out(3, "Time running: %s", sort.runtime / 1000.);
+ }
+ }
+ }
+ }
+
+ private static void out(int indent, String string, Object... args) {
+ for (int i = 0; i < indent; i++) {
+ System.out.print(" ");
+ }
+ System.out.println(String.format(string, args));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/NativeMapPerformanceTest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/NativeMapPerformanceTest.java
index 73b73f4,0000000..16e7a98
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/NativeMapPerformanceTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/NativeMapPerformanceTest.java
@@@ -1,198 -1,0 +1,195 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.tabletserver.NativeMap;
+import org.apache.hadoop.io.Text;
+
+public class NativeMapPerformanceTest {
+
+ private static final byte ROW_PREFIX[] = new byte[] {'r'};
+ private static final byte COL_PREFIX[] = new byte[] {'c'};
+
+ static Key nk(int r, int c) {
+ return new Key(new Text(FastFormat.toZeroPaddedString(r, 9, 10, ROW_PREFIX)), new Text(FastFormat.toZeroPaddedString(c, 6, 10, COL_PREFIX)));
+ }
+
+ static Mutation nm(int r) {
+ return new Mutation(new Text(FastFormat.toZeroPaddedString(r, 9, 10, ROW_PREFIX)));
+ }
+
+ static Text ET = new Text();
+
+ private static void pc(Mutation m, int c, Value v) {
+ m.put(new Text(FastFormat.toZeroPaddedString(c, 6, 10, COL_PREFIX)), ET, Long.MAX_VALUE, v);
+ }
+
+ static void runPerformanceTest(int numRows, int numCols, int numLookups, String mapType) {
+
+ SortedMap<Key,Value> tm = null;
+ NativeMap nm = null;
+
+ if (mapType.equals("SKIP_LIST"))
+ tm = new ConcurrentSkipListMap<Key,Value>();
+ else if (mapType.equals("TREE_MAP"))
+ tm = Collections.synchronizedSortedMap(new TreeMap<Key,Value>());
+ else if (mapType.equals("NATIVE_MAP"))
+ nm = new NativeMap();
+ else
+ throw new IllegalArgumentException(" map type must be SKIP_LIST, TREE_MAP, or NATIVE_MAP");
+
+ Random rand = new Random(19);
+
+ // puts
+ long tps = System.currentTimeMillis();
+
+ if (nm != null) {
+ for (int i = 0; i < numRows; i++) {
+ int row = rand.nextInt(1000000000);
+ Mutation m = nm(row);
+ for (int j = 0; j < numCols; j++) {
+ int col = rand.nextInt(1000000);
+ Value val = new Value("test".getBytes(Constants.UTF8));
+ pc(m, col, val);
+ }
+ nm.mutate(m, i);
+ }
+ } else {
+ for (int i = 0; i < numRows; i++) {
+ int row = rand.nextInt(1000000000);
+ for (int j = 0; j < numCols; j++) {
+ int col = rand.nextInt(1000000);
+ Key key = nk(row, col);
+ Value val = new Value("test".getBytes(Constants.UTF8));
+ tm.put(key, val);
+ }
+ }
+ }
+
+ long tpe = System.currentTimeMillis();
+
+ // Iteration
+ Iterator<Entry<Key,Value>> iter;
+ if (nm != null) {
+ iter = nm.iterator();
+ } else {
+ iter = tm.entrySet().iterator();
+ }
+
+ long tis = System.currentTimeMillis();
+
+ while (iter.hasNext()) {
+ iter.next();
+ }
+
+ long tie = System.currentTimeMillis();
+
+ rand = new Random(19);
+ int rowsToLookup[] = new int[numLookups];
+ int colsToLookup[] = new int[numLookups];
+ for (int i = 0; i < Math.min(numLookups, numRows); i++) {
+ int row = rand.nextInt(1000000000);
+ int col = -1;
+ for (int j = 0; j < numCols; j++) {
+ col = rand.nextInt(1000000);
+ }
+
+ rowsToLookup[i] = row;
+ colsToLookup[i] = col;
+ }
+
+ // get
+
+ long tgs = System.currentTimeMillis();
+ if (nm != null) {
+ for (int i = 0; i < numLookups; i++) {
+ Key key = nk(rowsToLookup[i], colsToLookup[i]);
+ if (nm.get(key) == null) {
+ throw new RuntimeException("Did not find " + rowsToLookup[i] + " " + colsToLookup[i] + " " + i);
+ }
+ }
+ } else {
+ for (int i = 0; i < numLookups; i++) {
+ Key key = nk(rowsToLookup[i], colsToLookup[i]);
+ if (tm.get(key) == null) {
+ throw new RuntimeException("Did not find " + rowsToLookup[i] + " " + colsToLookup[i] + " " + i);
+ }
+ }
+ }
+ long tge = System.currentTimeMillis();
+
+ long memUsed = 0;
+ if (nm != null) {
+ memUsed = nm.getMemoryUsed();
+ }
+
+ int size = (nm == null ? tm.size() : nm.size());
+
+ // delete
+ long tds = System.currentTimeMillis();
+
+ if (nm != null)
+ nm.delete();
+
+ long tde = System.currentTimeMillis();
+
+ if (tm != null)
+ tm.clear();
+
+ System.gc();
+ System.gc();
+ System.gc();
+ System.gc();
+
+ UtilWaitThread.sleep(3000);
+
+ System.out.printf("mapType:%10s put rate:%,6.2f scan rate:%,6.2f get rate:%,6.2f delete time : %6.2f mem : %,d%n", "" + mapType, (numRows * numCols)
+ / ((tpe - tps) / 1000.0), (size) / ((tie - tis) / 1000.0), numLookups / ((tge - tgs) / 1000.0), (tde - tds) / 1000.0, memUsed);
+
+ }
+
- /**
- * @param args
- */
+ public static void main(String[] args) {
+
+ if (args.length != 3) {
+ throw new IllegalArgumentException("Usage : " + NativeMapPerformanceTest.class.getName() + " <map type> <rows> <columns>");
+ }
+
+ String mapType = args[0];
+ int rows = Integer.parseInt(args[1]);
+ int cols = Integer.parseInt(args[2]);
+
+ runPerformanceTest(rows, cols, 10000, mapType);
+ runPerformanceTest(rows, cols, 10000, mapType);
+ runPerformanceTest(rows, cols, 10000, mapType);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/NativeMapStressTest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/NativeMapStressTest.java
index 8411c86,0000000..5115541
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/NativeMapStressTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/NativeMapStressTest.java
@@@ -1,277 -1,0 +1,274 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.OpTimer;
+import org.apache.accumulo.server.tabletserver.NativeMap;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class NativeMapStressTest {
+
+ private static final Logger log = Logger.getLogger(NativeMapStressTest.class);
+
- /**
- * @param args
- */
+ public static void main(String[] args) {
+ testLotsOfMapDeletes(true);
+ testLotsOfMapDeletes(false);
+ testLotsOfOverwrites();
+ testLotsOfGetsAndScans();
+ }
+
+ private static void put(NativeMap nm, String row, String val, int mc) {
+ Mutation m = new Mutation(new Text(row));
+ m.put(new Text(), new Text(), Long.MAX_VALUE, new Value(val.getBytes(Constants.UTF8)));
+ nm.mutate(m, mc);
+ }
+
+ private static void testLotsOfGetsAndScans() {
+
+ ArrayList<Thread> threads = new ArrayList<Thread>();
+
+ final int numThreads = 8;
+ final int totalGets = 100000000;
+ final int mapSizePerThread = (int) (4000000 / (double) numThreads);
+ final int getsPerThread = (int) (totalGets / (double) numThreads);
+
+ for (int tCount = 0; tCount < numThreads; tCount++) {
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ NativeMap nm = new NativeMap();
+
+ Random r = new Random();
+
+ OpTimer opTimer = new OpTimer(log, Level.INFO);
+
+ opTimer.start("Creating map of size " + mapSizePerThread);
+
+ for (int i = 0; i < mapSizePerThread; i++) {
+ String row = String.format("r%08d", i);
+ String val = row + "v";
+ put(nm, row, val, i);
+ }
+
+ opTimer.stop("Created map of size " + nm.size() + " in %DURATION%");
+
+ opTimer.start("Doing " + getsPerThread + " gets()");
+
+ for (int i = 0; i < getsPerThread; i++) {
+ String row = String.format("r%08d", r.nextInt(mapSizePerThread));
+ String val = row + "v";
+
+ Value value = nm.get(new Key(new Text(row)));
+ if (value == null || !value.toString().equals(val)) {
+ log.error("nm.get(" + row + ") failed");
+ }
+ }
+
+ opTimer.stop("Finished " + getsPerThread + " gets in %DURATION%");
+
+ int scanned = 0;
+
+ opTimer.start("Doing " + getsPerThread + " random iterations");
+
+ for (int i = 0; i < getsPerThread; i++) {
+ int startRow = r.nextInt(mapSizePerThread);
+ String row = String.format("r%08d", startRow);
+
+ Iterator<Entry<Key,Value>> iter = nm.iterator(new Key(new Text(row)));
+
+ int count = 0;
+
+ while (iter.hasNext() && count < 10) {
+ String row2 = String.format("r%08d", startRow + count);
+ String val2 = row2 + "v";
+
+ Entry<Key,Value> entry = iter.next();
+ if (!entry.getValue().toString().equals(val2) || !entry.getKey().equals(new Key(new Text(row2)))) {
+ log.error("nm.iter(" + row2 + ") failed row = " + row + " count = " + count + " row2 = " + row + " val2 = " + val2);
+ }
+
+ count++;
+ }
+
+ scanned += count;
+ }
+
+ opTimer.stop("Finished " + getsPerThread + " random iterations (scanned = " + scanned + ") in %DURATION%");
+
+ nm.delete();
+ }
+ };
+
+ Thread t = new Thread(r);
+ t.start();
+
+ threads.add(t);
+ }
+
+ for (Thread thread : threads) {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static void testLotsOfMapDeletes(final boolean doRemoves) {
+ final int numThreads = 8;
+ final int rowRange = 10000;
+ final int mapsPerThread = 50;
+ final int totalInserts = 100000000;
+ final int insertsPerMapPerThread = (int) (totalInserts / (double) numThreads / mapsPerThread);
+
+ System.out.println("insertsPerMapPerThread " + insertsPerMapPerThread);
+
+ ArrayList<Thread> threads = new ArrayList<Thread>();
+
+ for (int i = 0; i < numThreads; i++) {
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+
+ int inserts = 0;
+ int removes = 0;
+
+ for (int i = 0; i < mapsPerThread; i++) {
+
+ NativeMap nm = new NativeMap();
+
+ for (int j = 0; j < insertsPerMapPerThread; j++) {
+ String row = String.format("r%08d", j % rowRange);
+ String val = row + "v";
+ put(nm, row, val, j);
+ inserts++;
+ }
+
+ if (doRemoves) {
+ Iterator<Entry<Key,Value>> iter = nm.iterator();
+ while (iter.hasNext()) {
+ iter.next();
+ iter.remove();
+ removes++;
+ }
+ }
+
+ nm.delete();
+ }
+
+ System.out.println("inserts " + inserts + " removes " + removes + " " + Thread.currentThread().getName());
+ }
+ };
+
+ Thread t = new Thread(r);
+ t.start();
+
+ threads.add(t);
+ }
+
+ for (Thread thread : threads) {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static void testLotsOfOverwrites() {
+ final Map<Integer,NativeMap> nativeMaps = new HashMap<Integer,NativeMap>();
+
+ int numThreads = 8;
+ final int insertsPerThread = (int) (100000000 / (double) numThreads);
+ final int rowRange = 10000;
+ final int numMaps = 50;
+
+ ArrayList<Thread> threads = new ArrayList<Thread>();
+
+ for (int i = 0; i < numThreads; i++) {
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ Random r = new Random();
+ int inserts = 0;
+
+ for (int i = 0; i < insertsPerThread / 100.0; i++) {
+ int map = r.nextInt(numMaps);
+
+ NativeMap nm;
+
+ synchronized (nativeMaps) {
+ nm = nativeMaps.get(map);
+ if (nm == null) {
+ nm = new NativeMap();
+ nativeMaps.put(map, nm);
+
+ }
+ }
+
+ synchronized (nm) {
+ for (int j = 0; j < 100; j++) {
+ String row = String.format("r%08d", r.nextInt(rowRange));
+ String val = row + "v";
+ put(nm, row, val, j);
+ inserts++;
+ }
+ }
+ }
+
+ System.out.println("inserts " + inserts + " " + Thread.currentThread().getName());
+ }
+ };
+
+ Thread t = new Thread(r);
+ t.start();
+
+ threads.add(t);
+ }
+
+ for (Thread thread : threads) {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ Set<Entry<Integer,NativeMap>> es = nativeMaps.entrySet();
+ for (Entry<Integer,NativeMap> entry : es) {
+ entry.getValue().delete();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
index a35ca66,0000000..0d52f12
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java
@@@ -1,181 -1,0 +1,180 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.continuous;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.util.reflection.CounterUtils;
+import org.apache.accumulo.test.continuous.ContinuousIngest.BaseOpts;
+import org.apache.accumulo.test.continuous.ContinuousIngest.ShortConverter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.validators.PositiveInteger;
+
+/**
+ * A map only job that reads a table created by continuous ingest and creates doubly linked list. This map reduce job tests the ability of a map only job to
+ * read and write to accumulo at the same time. This map reduce job mutates the table in such a way that it should not create any undefined nodes.
+ *
+ */
+public class ContinuousMoru extends Configured implements Tool {
+ private static final String PREFIX = ContinuousMoru.class.getSimpleName() + ".";
+ private static final String MAX_CQ = PREFIX + "MAX_CQ";
+ private static final String MAX_CF = PREFIX + "MAX_CF";
+ private static final String MAX = PREFIX + "MAX";
+ private static final String MIN = PREFIX + "MIN";
+ private static final String CI_ID = PREFIX + "CI_ID";
+
+ static enum Counts {
+ SELF_READ;
+ }
+
+ public static class CMapper extends Mapper<Key,Value,Text,Mutation> {
+
+ private short max_cf;
+ private short max_cq;
+ private Random random;
+ private String ingestInstanceId;
+ private byte[] iiId;
+ private long count;
+
+ private static final ColumnVisibility EMPTY_VIS = new ColumnVisibility();
+
+ @Override
+ public void setup(Context context) throws IOException, InterruptedException {
+ int max_cf = context.getConfiguration().getInt(MAX_CF, -1);
+ int max_cq = context.getConfiguration().getInt(MAX_CQ, -1);
+
+ if (max_cf > Short.MAX_VALUE || max_cq > Short.MAX_VALUE)
+ throw new IllegalArgumentException();
+
+ this.max_cf = (short) max_cf;
+ this.max_cq = (short) max_cq;
+
+ random = new Random();
+ ingestInstanceId = context.getConfiguration().get(CI_ID);
+ iiId = ingestInstanceId.getBytes(Constants.UTF8);
+
+ count = 0;
+ }
+
+ @Override
+ public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
+
+ ContinuousWalk.validate(key, data);
+
+ if (WritableComparator.compareBytes(iiId, 0, iiId.length, data.get(), 0, iiId.length) != 0) {
+ // only rewrite data not written by this M/R job
+ byte[] val = data.get();
+
+ int offset = ContinuousWalk.getPrevRowOffset(val);
+ if (offset > 0) {
+ long rowLong = Long.parseLong(new String(val, offset, 16, Constants.UTF8), 16);
+ Mutation m = ContinuousIngest.genMutation(rowLong, random.nextInt(max_cf), random.nextInt(max_cq), EMPTY_VIS, iiId, count++, key.getRowData()
+ .toArray(), random, true);
+ context.write(null, m);
+ }
+
+ } else {
+ CounterUtils.increment(context.getCounter(Counts.SELF_READ));
+ }
+ }
+ }
+
+ static class Opts extends BaseOpts {
+ @Parameter(names = "--maxColF", description = "maximum column family value to use", converter=ShortConverter.class)
+ short maxColF = Short.MAX_VALUE;
+
+ @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter=ShortConverter.class)
+ short maxColQ = Short.MAX_VALUE;
+
+ @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class)
+ int maxMaps = 0;
+ }
+
+ @Override
+ public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException {
+ Opts opts = new Opts();
+ BatchWriterOpts bwOpts = new BatchWriterOpts();
+ opts.parseArgs(ContinuousMoru.class.getName(), args, bwOpts);
+
+ Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(AccumuloInputFormat.class);
+ opts.setAccumuloConfigs(job);
+
+ // set up ranges
+ try {
+ Set<Range> ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
+ AccumuloInputFormat.setRanges(job, ranges);
+ AccumuloInputFormat.setAutoAdjustRanges(job, false);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ job.setMapperClass(CMapper.class);
+
+ job.setNumReduceTasks(0);
+
+ job.setOutputFormatClass(AccumuloOutputFormat.class);
+ AccumuloOutputFormat.setBatchWriterOptions(job, bwOpts.getBatchWriterConfig());
+
+ Configuration conf = job.getConfiguration();
+ conf.setLong(MIN, opts.min);
+ conf.setLong(MAX, opts.max);
+ conf.setInt(MAX_CF, opts.maxColF);
+ conf.setInt(MAX_CQ, opts.maxColQ);
+ conf.set(CI_ID, UUID.randomUUID().toString());
+
+ job.waitForCompletion(true);
+ opts.stopTracing();
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ /**
+ *
+ * @param args
+ * instanceName zookeepers username password table columns outputpath
- * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousMoru(), args);
+ if (res != 0)
+ System.exit(res);
+ }
+}