You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2014/01/23 08:36:40 UTC
[22/23] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Conflicts:
core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e7e5c009
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e7e5c009
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e7e5c009
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: e7e5c009737b3249cee667f75b48de3be4adb2fc
Parents: b353a24 c4cd3b1
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Jan 22 23:50:23 2014 -0600
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Wed Jan 22 23:50:23 2014 -0600
----------------------------------------------------------------------
.../apache/accumulo/core/util/AddressUtil.java | 2 +-
.../apache/accumulo/core/zookeeper/ZooUtil.java | 10 ++-
.../apache/accumulo/fate/util/AddressUtil.java | 60 +++++++++++++
.../accumulo/fate/zookeeper/ZooSession.java | 14 +--
.../accumulo/fate/util/AddressUtilTest.java | 95 ++++++++++++++++++++
.../org/apache/accumulo/server/Accumulo.java | 22 ++++-
.../accumulo/server/fs/VolumeManagerImpl.java | 3 +
.../accumulo/server/util/TabletOperations.java | 10 ++-
.../accumulo/master/tableOps/DeleteTable.java | 8 ++
.../org/apache/accumulo/tracer/TraceServer.java | 67 ++++++++++----
.../org/apache/accumulo/tserver/Compactor.java | 2 +
11 files changed, 264 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
index 46510ee,af9a1a6..bd9a5ca
--- a/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
@@@ -16,25 -16,36 +16,25 @@@
*/
package org.apache.accumulo.core.util;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.io.Text;
-import org.apache.thrift.transport.TSocket;
+import com.google.common.net.HostAndPort;
- public class AddressUtil {
+ public class AddressUtil extends org.apache.accumulo.fate.util.AddressUtil {
- static public InetSocketAddress parseAddress(String address, int defaultPort) throws NumberFormatException {
- String[] parts = address.split(":", 2);
- if (address.contains("+"))
- parts = address.split("\\+", 2);
- if (parts.length == 2) {
- if (parts[1].isEmpty())
- return new InetSocketAddress(parts[0], defaultPort);
- return new InetSocketAddress(parts[0], Integer.parseInt(parts[1]));
- }
- return new InetSocketAddress(address, defaultPort);
- }
-
- static public InetSocketAddress parseAddress(Text address, int defaultPort) {
- return parseAddress(address.toString(), defaultPort);
+
+ static public HostAndPort parseAddress(String address) throws NumberFormatException {
+ return parseAddress(address, false);
}
-
- static public TSocket createTSocket(String address, int defaultPort) {
- InetSocketAddress addr = parseAddress(address, defaultPort);
- return new TSocket(addr.getHostName(), addr.getPort());
+
+ static public HostAndPort parseAddress(String address, boolean ignoreMissingPort) throws NumberFormatException {
+ address = address.replace('+', ':');
+ HostAndPort hap = HostAndPort.fromString(address);
+ if (!ignoreMissingPort && !hap.hasPort())
+ throw new IllegalArgumentException("Address was expected to contain port. address=" + address);
+
+ return hap;
}
-
- static public String toString(InetSocketAddress addr) {
- return addr.getAddress().getHostAddress() + ":" + addr.getPort();
+
+ public static HostAndPort parseAddress(String address, int defaultPort) {
+ return parseAddress(address, true).withDefaultPort(defaultPort);
}
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java
index 9206beb,86dc4d2..fa0bdf6
--- a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java
@@@ -16,23 -16,10 +16,24 @@@
*/
package org.apache.accumulo.core.zookeeper;
+import java.io.FileNotFoundException;
+import java.io.IOException;
++import java.net.UnknownHostException;
+
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
public class ZooUtil extends org.apache.accumulo.fate.zookeeper.ZooUtil {
+
+ private static final Logger log = Logger.getLogger(ZooUtil.class);
+
public static String getRoot(final Instance instance) {
return getRoot(instance.getInstanceID());
}
@@@ -40,35 -27,4 +41,42 @@@
public static String getRoot(final String instanceId) {
return Constants.ZROOT + "/" + instanceId;
}
+
+ /**
+ * Utility to support certain client side utilities to minimize command-line options.
+ */
+
+ public static String getInstanceIDFromHdfs(Path instanceDirectory) {
+ try {
+
+ @SuppressWarnings("deprecation")
+ FileSystem fs = FileUtil.getFileSystem(instanceDirectory.toString(), CachedConfiguration.getInstance(), AccumuloConfiguration.getSiteConfiguration());
+ FileStatus[] files = null;
+ try {
+ files = fs.listStatus(instanceDirectory);
+ } catch (FileNotFoundException ex) {
+ // ignored
+ }
+ log.debug("Trying to read instance id from " + instanceDirectory);
+ if (files == null || files.length == 0) {
+ log.error("unable obtain instance id at " + instanceDirectory);
+ throw new RuntimeException("Accumulo not initialized, there is no instance id at " + instanceDirectory);
+ } else if (files.length != 1) {
+ log.error("multiple potential instances in " + instanceDirectory);
+ throw new RuntimeException("Accumulo found multiple possible instance ids in " + instanceDirectory);
+ } else {
+ String result = files[0].getPath().getName();
+ return result;
+ }
+ } catch (IOException e) {
- throw new RuntimeException("Accumulo not initialized, there is no instance id at " + instanceDirectory, e);
++ log.error("Problem reading instance id out of hdfs at " + instanceDirectory, e);
++ throw new RuntimeException("Can't tell if Accumulo is initialized; can't read instance id at " + instanceDirectory, e);
++ } catch (IllegalArgumentException exception) {
++ /* HDFS throws this when there's a UnknownHostException due to DNS troubles. */
++ if (exception.getCause() instanceof UnknownHostException) {
++ log.error("Problem reading instance id out of hdfs at " + instanceDirectory, exception);
++ }
++ throw exception;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
index 83f54b0,0000000..15e157d
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
@@@ -1,264 -1,0 +1,282 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.trace.DistributedTrace;
++import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.util.Version;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.log4j.helpers.FileWatchdog;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+public class Accumulo {
+
+ private static final Logger log = Logger.getLogger(Accumulo.class);
+
+ public static synchronized void updateAccumuloVersion(VolumeManager fs) {
+ try {
+ if (getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) {
+ fs.create(new Path(ServerConstants.getDataVersionLocation() + "/" + ServerConstants.DATA_VERSION));
+ fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + ServerConstants.PREV_DATA_VERSION));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to set accumulo version: an error occurred.", e);
+ }
+ }
+
+ public static synchronized int getAccumuloPersistentVersion(VolumeManager fs) {
+ int dataVersion;
+ try {
+ FileStatus[] files = fs.getDefaultVolume().listStatus(ServerConstants.getDataVersionLocation());
+ if (files == null || files.length == 0) {
+ dataVersion = -1; // assume it is 0.5 or earlier
+ } else {
+ dataVersion = Integer.parseInt(files[0].getPath().getName());
+ }
+ return dataVersion;
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to read accumulo version: an error occurred.", e);
+ }
+ }
+
+ public static void enableTracing(String address, String application) {
+ try {
+ DistributedTrace.enable(HdfsZooInstance.getInstance(), ZooReaderWriter.getInstance(), application, address);
+ } catch (Exception ex) {
+ log.error("creating remote sink for trace spans", ex);
+ }
+ }
+
+ private static class LogMonitor extends FileWatchdog implements Watcher {
+ String path;
+
+ protected LogMonitor(String instance, String filename, int delay) {
+ super(filename);
+ setDelay(delay);
+ this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT;
+ }
+
+ private void setMonitorPort() {
+ try {
+ String port = new String(ZooReaderWriter.getInstance().getData(path, null));
+ System.setProperty("org.apache.accumulo.core.host.log.port", port);
+ log.info("Changing monitor log4j port to "+port);
+ doOnChange();
+ } catch (Exception e) {
+ log.error("Error reading zookeeper data for monitor log4j port", e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null)
+ setMonitorPort();
+ log.info("Set watch for monitor log4j port");
+ } catch (Exception e) {
+ log.error("Unable to set watch for monitor log4j port " + path);
+ }
+ super.run();
+ }
+
+ @Override
+ protected void doOnChange() {
+ LogManager.resetConfiguration();
+ new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository());
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ setMonitorPort();
+ if (event.getPath() != null) {
+ try {
+ ZooReaderWriter.getInstance().exists(event.getPath(), this);
+ } catch (Exception ex) {
+ log.error("Unable to reset watch for monitor log4j port", ex);
+ }
+ }
+ }
+ }
+
+ public static void init(VolumeManager fs, ServerConfiguration config, String application) throws UnknownHostException {
+
+ System.setProperty("org.apache.accumulo.core.application", application);
+
+ if (System.getenv("ACCUMULO_LOG_DIR") != null)
+ System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_LOG_DIR"));
+ else
+ System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_HOME") + "/logs/");
+
+ String localhost = InetAddress.getLocalHost().getHostName();
+ System.setProperty("org.apache.accumulo.core.ip.localhost.hostname", localhost);
+
+ if (System.getenv("ACCUMULO_LOG_HOST") != null)
+ System.setProperty("org.apache.accumulo.core.host.log", System.getenv("ACCUMULO_LOG_HOST"));
+ else
+ System.setProperty("org.apache.accumulo.core.host.log", localhost);
+
+ int logPort = config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT);
+ System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort));
+
+ // Use a specific log config, if it exists
+ String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"), application);
+ if (!new File(logConfig).exists()) {
+ // otherwise, use the generic config
+ logConfig = String.format("%s/generic_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
+ }
+ // Turn off messages about not being able to reach the remote logger... we protect against that.
+ LogLog.setQuietMode(true);
+
+ // Configure logging
+ if (logPort==0)
+ new LogMonitor(config.getInstance().getInstanceID(), logConfig, 5000).start();
+ else
+ DOMConfigurator.configureAndWatch(logConfig, 5000);
+
+ // Read the auditing config
+ String auditConfig = String.format("%s/auditLog.xml", System.getenv("ACCUMULO_CONF_DIR"));
+
+ DOMConfigurator.configureAndWatch(auditConfig, 5000);
+
+ log.info(application + " starting");
+ log.info("Instance " + config.getInstance().getInstanceID());
+ int dataVersion = Accumulo.getAccumuloPersistentVersion(fs);
+ log.info("Data Version " + dataVersion);
+ Accumulo.waitForZookeeperAndHdfs(fs);
+
+ Version codeVersion = new Version(Constants.VERSION);
+ if (dataVersion != ServerConstants.DATA_VERSION && dataVersion != ServerConstants.PREV_DATA_VERSION) {
+ throw new RuntimeException("This version of accumulo (" + codeVersion + ") is not compatible with files stored using data version " + dataVersion);
+ }
+
+ TreeMap<String,String> sortedProps = new TreeMap<String,String>();
+ for (Entry<String,String> entry : config.getConfiguration())
+ sortedProps.put(entry.getKey(), entry.getValue());
+
+ for (Entry<String,String> entry : sortedProps.entrySet()) {
+ String key = entry.getKey();
+ log.info(key + " = " + (Property.isSensitive(key) ? "<hidden>" : entry.getValue()));
+ }
+
+ monitorSwappiness();
+ }
+
+ /**
+ *
+ */
+ public static void monitorSwappiness() {
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ String procFile = "/proc/sys/vm/swappiness";
+ File swappiness = new File(procFile);
+ if (swappiness.exists() && swappiness.canRead()) {
+ InputStream is = new FileInputStream(procFile);
+ try {
+ byte[] buffer = new byte[10];
+ int bytes = is.read(buffer);
+ String setting = new String(buffer, 0, bytes);
+ setting = setting.trim();
+ if (bytes > 0 && Integer.parseInt(setting) > 10) {
+ log.warn("System swappiness setting is greater than ten (" + setting + ") which can cause time-sensitive operations to be delayed. "
+ + " Accumulo is time sensitive because it needs to maintain distributed lock agreement.");
+ }
+ } finally {
+ is.close();
+ }
+ }
+ } catch (Throwable t) {
+ log.error(t, t);
+ }
+ }
+ }, 1000, 10 * 60 * 1000);
+ }
+
+ public static void waitForZookeeperAndHdfs(VolumeManager fs) {
+ log.info("Attempting to talk to zookeeper");
+ while (true) {
+ try {
+ ZooReaderWriter.getInstance().getChildren(Constants.ZROOT);
+ break;
+ } catch (InterruptedException e) {
+ // ignored
+ } catch (KeeperException ex) {
+ log.info("Waiting for accumulo to be initialized");
+ UtilWaitThread.sleep(1000);
+ }
+ }
+ log.info("Zookeeper connected and initialized, attemping to talk to HDFS");
+ long sleep = 1000;
++ int unknownHostTries = 3;
+ while (true) {
+ try {
+ if (fs.isReady())
+ break;
+ log.warn("Waiting for the NameNode to leave safemode");
+ } catch (IOException ex) {
- log.warn("Unable to connect to HDFS");
++ log.warn("Unable to connect to HDFS", ex);
++ } catch (IllegalArgumentException exception) {
++ /* Unwrap the UnknownHostException so we can deal with it directly */
++ if (exception.getCause() instanceof UnknownHostException) {
++ if (unknownHostTries > 0) {
++ log.warn("Unable to connect to HDFS, will retry. cause: " + exception.getCause());
++ /* We need to make sure our sleep period is long enough to avoid getting a cached failure of the host lookup. */
++ sleep = Math.max(sleep, (AddressUtil.getAddressCacheNegativeTtl((UnknownHostException)(exception.getCause()))+1)*1000);
++ } else {
++ log.error("Unable to connect to HDFS and have exceeded max number of retries.", exception);
++ throw exception;
++ }
++ unknownHostTries--;
++ } else {
++ throw exception;
++ }
+ }
- log.info("Sleeping " + sleep / 1000. + " seconds");
++ log.info("Backing off due to failure; current sleep period is " + sleep / 1000. + " seconds");
+ UtilWaitThread.sleep(sleep);
++ /* Back off to give transient failures more time to clear. */
+ sleep = Math.min(60 * 1000, sleep * 2);
+ }
+ log.info("Connected to HDFS");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index eb7a330,0000000..034bc92
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@@ -1,503 -1,0 +1,506 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+public class VolumeManagerImpl implements VolumeManager {
+
+ private static final Logger log = Logger.getLogger(VolumeManagerImpl.class);
+
+ Map<String,? extends FileSystem> volumes;
+ String defaultVolume;
+ AccumuloConfiguration conf;
+ VolumeChooser chooser;
+
+ protected VolumeManagerImpl(Map<String,? extends FileSystem> volumes, String defaultVolume, AccumuloConfiguration conf) {
+ this.volumes = volumes;
+ this.defaultVolume = defaultVolume;
+ this.conf = conf;
+ ensureSyncIsEnabled();
+ chooser = Property.createInstanceFromPropertyName(conf, Property.GENERAL_VOLUME_CHOOSER, VolumeChooser.class, new RandomVolumeChooser());
+ }
+
+ public static org.apache.accumulo.server.fs.VolumeManager getLocal() throws IOException {
+ return new VolumeManagerImpl(Collections.singletonMap("", FileSystem.getLocal(CachedConfiguration.getInstance())), "",
+ DefaultConfiguration.getDefaultConfiguration());
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException ex = null;
+ for (FileSystem fs : volumes.values()) {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ ex = e;
+ }
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ @Override
+ public boolean closePossiblyOpenFile(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ try {
+ return dfs.recoverLease(path);
+ } catch (FileNotFoundException ex) {
+ throw ex;
+ }
+ } else if (fs instanceof LocalFileSystem) {
+ // ignore
+ } else {
+ throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
+ }
+ fs.append(path).close();
+ log.info("Recovered lease on " + path.toString() + " using append");
+ return true;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ return fs.create(path);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, boolean overwrite) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ return fs.create(path, overwrite);
+ }
+
+ private static long correctBlockSize(Configuration conf, long blockSize) {
+ if (blockSize <= 0)
+ blockSize = conf.getLong("dfs.block.size", 67108864);
+
+ int checkSum = conf.getInt("io.bytes.per.checksum", 512);
+ blockSize -= blockSize % checkSum;
+ blockSize = Math.max(blockSize, checkSum);
+ return blockSize;
+ }
+
+ private static int correctBufferSize(Configuration conf, int bufferSize) {
+ if (bufferSize <= 0)
+ bufferSize = conf.getInt("io.file.buffer.size", 4096);
+ return bufferSize;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ if (bufferSize == 0) {
+ fs.getConf().getInt("io.file.buffer.size", 4096);
+ }
+ return fs.create(path, overwrite, bufferSize, replication, correctBlockSize(fs.getConf(), blockSize));
+ }
+
+ @Override
+ public boolean createNewFile(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ return fs.createNewFile(path);
+ }
+
+ @Override
+ public FSDataOutputStream createSyncable(Path logPath, int bufferSize, short replication, long blockSize) throws IOException {
+ FileSystem fs = getFileSystemByPath(logPath);
+ blockSize = correctBlockSize(fs.getConf(), blockSize);
+ bufferSize = correctBufferSize(fs.getConf(), bufferSize);
+ try {
+ // This...
+ // EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
+ // return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
+ // Becomes this:
+ Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
+ List<Enum<?>> flags = new ArrayList<Enum<?>>();
+ if (createFlags.isEnum()) {
+ for (Object constant : createFlags.getEnumConstants()) {
+ if (constant.toString().equals("SYNC_BLOCK")) {
+ flags.add((Enum<?>) constant);
+ log.debug("Found synch enum " + constant);
+ }
+ if (constant.toString().equals("CREATE")) {
+ flags.add((Enum<?>) constant);
+ log.debug("Found CREATE enum " + constant);
+ }
+ }
+ }
+ Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1));
+ log.debug("CreateFlag set: " + set);
+ Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
+ log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
+ return (FSDataOutputStream) create.invoke(fs, logPath, FsPermission.getDefault(), set, bufferSize, replication, blockSize, null);
+ } catch (ClassNotFoundException ex) {
+ // Expected in hadoop 1.0
+ return fs.create(logPath, true, bufferSize, replication, blockSize);
+ } catch (Exception ex) {
+ log.debug(ex, ex);
+ return fs.create(logPath, true, bufferSize, replication, blockSize);
+ }
+ }
+
+ @Override
+ public boolean delete(Path path) throws IOException {
+ return getFileSystemByPath(path).delete(path, false);
+ }
+
+ @Override
+ public boolean deleteRecursively(Path path) throws IOException {
+ return getFileSystemByPath(path).delete(path, true);
+ }
+
+ protected void ensureSyncIsEnabled() {
+ for (Entry<String,? extends FileSystem> entry : getFileSystems().entrySet()) {
+ final String volumeName = entry.getKey();
+ final FileSystem fs = entry.getValue();
+
+ if (fs instanceof DistributedFileSystem) {
+ final String DFS_DURABLE_SYNC = "dfs.durable.sync", DFS_SUPPORT_APPEND = "dfs.support.append";
+ final String ticketMessage = "See ACCUMULO-623 and ACCUMULO-1637 for more details.";
+ // Check to make sure that we have proper defaults configured
+ try {
+ // If the default is off (0.20.205.x or 1.0.x)
+ DFSConfigKeys configKeys = new DFSConfigKeys();
+
+ // Can't use the final constant itself as Java will inline it at compile time
+ Field dfsSupportAppendDefaultField = configKeys.getClass().getField("DFS_SUPPORT_APPEND_DEFAULT");
+ boolean dfsSupportAppendDefaultValue = dfsSupportAppendDefaultField.getBoolean(configKeys);
+
+ if (!dfsSupportAppendDefaultValue) {
+ // See if the user did the correct override
+ if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, false)) {
+ String msg = "Accumulo requires that dfs.support.append to true. " + ticketMessage;
+ log.fatal(msg);
+ throw new RuntimeException(msg);
+ }
+ }
+ } catch (NoSuchFieldException e) {
+ // If we can't find DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT, the user is running
+ // 1.1.x or 1.2.x. This is ok, though, as, by default, these versions have append/sync enabled.
+ } catch (Exception e) {
+ log.warn("Error while checking for " + DFS_SUPPORT_APPEND + " on volume " + volumeName + ". The user should ensure that Hadoop is configured to properly supports append and sync. " + ticketMessage, e);
+ }
+
+ // If either of these parameters are configured to be false, fail.
+ // This is a sign that someone is writing bad configuration.
+ if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, true) || !fs.getConf().getBoolean(DFS_DURABLE_SYNC, true)) {
+ String msg = "Accumulo requires that " + DFS_SUPPORT_APPEND + " and " + DFS_DURABLE_SYNC + " not be configured as false. " + ticketMessage;
+ log.fatal(msg);
+ throw new RuntimeException(msg);
+ }
+
+ try {
+ // Check DFSConfigKeys to see if DFS_DATANODE_SYNCONCLOSE_KEY exists (should be everything >=1.1.1 and the 0.23 line)
+ Class<?> dfsConfigKeysClz = Class.forName("org.apache.hadoop.hdfs.DFSConfigKeys");
+ dfsConfigKeysClz.getDeclaredField("DFS_DATANODE_SYNCONCLOSE_KEY");
+
+ // Everything else
+ if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
+ log.warn("dfs.datanode.synconclose set to false in hdfs-site.xml: data loss is possible on system reset or power loss");
+ }
+ } catch (ClassNotFoundException ex) {
+ // hadoop 1.0.X or hadoop 1.1.0
+ } catch (SecurityException e) {
+ // hadoop 1.0.X or hadoop 1.1.0
+ } catch (NoSuchFieldException e) {
+ // hadoop 1.0.X or hadoop 1.1.0
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean exists(Path path) throws IOException {
+ return getFileSystemByPath(path).exists(path);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ return getFileSystemByPath(path).getFileStatus(path);
+ }
+
+ @Override
+ public FileSystem getFileSystemByPath(Path path) {
+ if (path.toString().contains(":")) {
+ try {
+ return path.getFileSystem(CachedConfiguration.getInstance());
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ return volumes.get(defaultVolume);
+ }
+
+ @Override
+ public Map<String,? extends FileSystem> getFileSystems() {
+ return volumes;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ return getFileSystemByPath(path).listStatus(path);
+ }
+
+ @Override
+ public boolean mkdirs(Path path) throws IOException {
+ return getFileSystemByPath(path).mkdirs(path);
+ }
+
+ @Override
+ public FSDataInputStream open(Path path) throws IOException {
+ return getFileSystemByPath(path).open(path);
+ }
+
+ @Override
+ public boolean rename(Path path, Path newPath) throws IOException {
+ FileSystem source = getFileSystemByPath(path);
+ FileSystem dest = getFileSystemByPath(newPath);
+ if (source != dest) {
+ throw new NotImplementedException("Cannot rename files across volumes: " + path + " -> " + newPath);
+ }
+ return source.rename(path, newPath);
+ }
+
+ @Override
+ public boolean moveToTrash(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ Trash trash = new Trash(fs, fs.getConf());
+ return trash.moveToTrash(path);
+ }
+
+ @Override
+ public short getDefaultReplication(Path path) {
+ FileSystem fs = getFileSystemByPath(path);
+ try {
+ // try calling hadoop 2 method
+ Method method = fs.getClass().getMethod("getDefaultReplication", Path.class);
+ return ((Short) method.invoke(fs, path)).shortValue();
+ } catch (NoSuchMethodException e) {
+ // ignore
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+
+ @SuppressWarnings("deprecation")
+ short rep = fs.getDefaultReplication();
+ return rep;
+ }
+
+ @Override
+ public boolean isFile(Path path) throws IOException {
+ return getFileSystemByPath(path).isFile(path);
+ }
+
+ public static VolumeManager get() throws IOException {
+ AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
+ return get(conf);
+ }
+
+ static private final String DEFAULT = "";
+
+ public static VolumeManager get(AccumuloConfiguration conf) throws IOException {
+ Map<String,FileSystem> fileSystems = new HashMap<String,FileSystem>();
+ Configuration hadoopConf = CachedConfiguration.getInstance();
+ fileSystems.put(DEFAULT, FileUtil.getFileSystem(hadoopConf, conf));
+ String ns = conf.get(Property.INSTANCE_VOLUMES);
+ if (ns != null && !ns.isEmpty()) {
+ for (String space : ns.split(",")) {
+ if (space.equals(DEFAULT))
+ throw new IllegalArgumentException();
+
+ if (space.contains(":")) {
+ fileSystems.put(space, new Path(space).getFileSystem(hadoopConf));
+ } else {
+ fileSystems.put(space, FileSystem.get(hadoopConf));
+ }
+ }
+ }
+ return new VolumeManagerImpl(fileSystems, DEFAULT, conf);
+ }
+
+ @Override
+ public boolean isReady() throws IOException {
+ for (FileSystem fs : getFileSystems().values()) {
+ if (!(fs instanceof DistributedFileSystem))
+ continue;
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
+ // Becomes this:
+ Class<?> safeModeAction;
+ try {
+ // hadoop 2.0
+ safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
+ } catch (ClassNotFoundException ex) {
+ // hadoop 1.0
+ try {
+ safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot figure out the right class for Constants");
+ }
+ }
+ Object get = null;
+ for (Object obj : safeModeAction.getEnumConstants()) {
+ if (obj.toString().equals("SAFEMODE_GET"))
+ get = obj;
+ }
+ if (get == null) {
+ throw new RuntimeException("cannot find SAFEMODE_GET");
+ }
+ try {
+ Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
+ boolean inSafeMode = (Boolean) setSafeMode.invoke(dfs, get);
+ if (inSafeMode) {
+ return false;
+ }
++ } catch (IllegalArgumentException exception) {
++ /* Send IAEs back as-is, so that those that wrap UnknownHostException can be handled in the same place as similar sources of failure. */
++ throw exception;
+ } catch (Exception ex) {
+ throw new RuntimeException("cannot find method setSafeMode");
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public FileSystem getDefaultVolume() {
+ return volumes.get(defaultVolume);
+ }
+
+ @Override
+ public FileStatus[] globStatus(Path pathPattern) throws IOException {
+ return getFileSystemByPath(pathPattern).globStatus(pathPattern);
+ }
+
+ @Override
+ public Path getFullPath(Key key) {
+ // TODO sanity check col fam
+ String relPath = key.getColumnQualifierData().toString();
+ byte[] tableId = KeyExtent.tableOfMetadataRow(key.getRow());
+ return getFullPath(new String(tableId), relPath);
+ }
+
+ @Override
+ public Path matchingFileSystem(Path source, String[] options) {
+ try {
+ if (ViewFSUtils.isViewFS(source, CachedConfiguration.getInstance())) {
+ return ViewFSUtils.matchingFileSystem(source, options, CachedConfiguration.getInstance());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ URI uri1 = source.toUri();
+ for (String option : options) {
+ URI uri3 = URI.create(option);
+ if (uri1.getScheme().equals(uri3.getScheme())) {
+ String a1 = uri1.getAuthority();
+ String a2 = uri3.getAuthority();
+ if ((a1 == null && a2 == null) || (a1 != null && a1.equals(a2)))
+ return new Path(option);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Path getFullPath(String tableId, String path) {
+ if (path.contains(":"))
+ return new Path(path);
+
+ if (path.startsWith("../"))
+ path = path.substring(2);
+ else if (path.startsWith("/"))
+ path = "/" + tableId + path;
+ else
+ throw new IllegalArgumentException("Unexpected path prefix " + path);
+
+ return getFullPath(FileType.TABLE, path);
+ }
+
+ @Override
+ public Path getFullPath(FileType fileType, String path) {
+ if (path.contains(":"))
+ return new Path(path);
+
+ // normalize the path
+ Path fullPath = new Path(ServerConstants.getDefaultBaseDir(), fileType.getDirectory());
+ if (path.startsWith("/"))
+ path = path.substring(1);
+ fullPath = new Path(fullPath, path);
+
+ FileSystem fs = getFileSystemByPath(fullPath);
+ return fs.makeQualified(fullPath);
+ }
+
+ @Override
+ public ContentSummary getContentSummary(Path dir) throws IOException {
+ return getFileSystemByPath(dir).getContentSummary(dir);
+ }
+
+ @Override
+ public String choose(String[] options) {
+ return chooser.choose(options);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
index 14cf37b,0000000..b237cd0
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
@@@ -1,83 -1,0 +1,91 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
++import java.net.UnknownHostException;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.tablets.UniqueNameAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class TabletOperations {
+
+ private static final Logger log = Logger.getLogger(TabletOperations.class);
+
+ public static String createTabletDirectory(VolumeManager fs, String tableId, Text endRow) {
+ String lowDirectory;
+
+ UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+ String volume = fs.choose(ServerConstants.getTablesDirs());
+
+ while (true) {
+ try {
+ if (endRow == null) {
+ lowDirectory = Constants.DEFAULT_TABLET_LOCATION;
+ Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" + lowDirectory);
+ if (fs.exists(lowDirectoryPath) || fs.mkdirs(lowDirectoryPath)) {
+ FileSystem pathFs = fs.getFileSystemByPath(lowDirectoryPath);
+ return lowDirectoryPath.makeQualified(pathFs.getUri(), pathFs.getWorkingDirectory()).toString();
+ }
+ log.warn("Failed to create " + lowDirectoryPath + " for unknown reason");
+ } else {
+ lowDirectory = "/" + Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName();
+ Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" + lowDirectory);
+ if (fs.exists(lowDirectoryPath))
+ throw new IllegalStateException("Dir exist when it should not " + lowDirectoryPath);
+ if (fs.mkdirs(lowDirectoryPath)) {
+ FileSystem lowDirectoryFs = fs.getFileSystemByPath(lowDirectoryPath);
+ return lowDirectoryPath.makeQualified(lowDirectoryFs.getUri(), lowDirectoryFs.getWorkingDirectory()).toString();
+ }
+ }
+ } catch (IOException e) {
+ log.warn(e);
+ }
+
+ log.warn("Failed to create dir for tablet in table " + tableId + " in volume " + volume + " + will retry ...");
+ UtilWaitThread.sleep(3000);
+
+ }
+ }
+
+ public static String createTabletDirectory(String tableDir, Text endRow) {
+ while (true) {
+ try {
+ VolumeManager fs = VolumeManagerImpl.get();
+ return createTabletDirectory(fs, tableDir, endRow);
+ } catch (IOException e) {
- log.warn(e);
++ log.warn("problem creating tablet directory", e);
++ } catch (IllegalArgumentException exception) {
++ /* thrown in some edge cases of DNS failure by Hadoop instead of UnknownHostException */
++ if (exception.getCause() instanceof UnknownHostException) {
++ log.warn("problem creating tablet directory", exception);
++ } else {
++ throw exception;
++ }
+ }
+ UtilWaitThread.sleep(3000);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
index aaeaac5,0000000..166ec89
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
@@@ -1,251 -1,0 +1,259 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.tableOps;
+
+import java.io.IOException;
+import java.util.Arrays;
++import java.net.UnknownHostException;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.GrepIterator;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.master.state.MetaDataTableScanner;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletState;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+class CleanUp extends MasterRepo {
+
+ final private static Logger log = Logger.getLogger(CleanUp.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private String tableId, namespaceId;
+
+ private long creationTime;
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ /*
+ * handle the case where we start executing on a new machine where the current time is in the past relative to the previous machine
+ *
+ * if the new machine has time in the future, that will work ok w/ hasCycled
+ */
+ if (System.currentTimeMillis() < creationTime) {
+ creationTime = System.currentTimeMillis();
+ }
+
+ }
+
+ public CleanUp(String tableId, String namespaceId) {
+ this.tableId = tableId;
+ this.namespaceId = namespaceId;
+ creationTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public long isReady(long tid, Master master) throws Exception {
+ if (!master.hasCycled(creationTime)) {
+ return 50;
+ }
+
+ boolean done = true;
+ Range tableRange = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
+ Scanner scanner = master.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ MetaDataTableScanner.configureScanner(scanner, master);
+ scanner.setRange(tableRange);
+
+ KeyExtent prevExtent = null;
+ for (Entry<Key,Value> entry : scanner) {
+ TabletLocationState locationState = MetaDataTableScanner.createTabletLocationState(entry.getKey(), entry.getValue());
+ if (!locationState.extent.isPreviousExtent(prevExtent)) {
+ log.debug("Still waiting for table to be deleted: " + tableId + " saw inconsistency" + prevExtent + " " + locationState.extent);
+ done = false;
+ break;
+ }
+ prevExtent = locationState.extent;
+
+ TabletState state = locationState.getState(master.onlineTabletServers());
+ if (state.equals(TabletState.ASSIGNED) || state.equals(TabletState.HOSTED)) {
+ log.debug("Still waiting for table to be deleted: " + tableId + " locationState: " + locationState);
+ done = false;
+ break;
+ }
+ }
+
+ if (!done)
+ return 50;
+
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+
+ master.clearMigrations(tableId);
+
+ int refCount = 0;
+
+ try {
+ // look for other tables that references this table's files
+ Connector conn = master.getConnector();
+ BatchScanner bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 8);
+ try {
+ Range allTables = MetadataSchema.TabletsSection.getRange();
+ Range tableRange = MetadataSchema.TabletsSection.getRange(tableId);
+ Range beforeTable = new Range(allTables.getStartKey(), true, tableRange.getStartKey(), false);
+ Range afterTable = new Range(tableRange.getEndKey(), false, allTables.getEndKey(), true);
+ bs.setRanges(Arrays.asList(beforeTable, afterTable));
+ bs.fetchColumnFamily(DataFileColumnFamily.NAME);
+ IteratorSetting cfg = new IteratorSetting(40, "grep", GrepIterator.class);
+ GrepIterator.setTerm(cfg, "/" + tableId + "/");
+ bs.addScanIterator(cfg);
+
+ for (Entry<Key,Value> entry : bs) {
+ if (entry.getKey().getColumnQualifier().toString().contains("/" + tableId + "/")) {
+ refCount++;
+ }
+ }
+ } finally {
+ bs.close();
+ }
+
+ } catch (Exception e) {
+ refCount = -1;
+ log.error("Failed to scan " + MetadataTable.NAME + " looking for references to deleted table " + tableId, e);
+ }
+
+ // remove metadata table entries
+ try {
+ // Intentionally do not pass master lock. If master loses lock, this operation may complete before master can kill itself.
+ // If the master lock passed to deleteTable, it is possible that the delete mutations will be dropped. If the delete operations
+ // are dropped and the operation completes, then the deletes will not be repeated.
+ MetadataTableUtil.deleteTable(tableId, refCount != 0, SystemCredentials.get(), null);
+ } catch (Exception e) {
+ log.error("error deleting " + tableId + " from metadata table", e);
+ }
+
+ // remove any problem reports the table may have
+ try {
+ ProblemReports.getInstance().deleteProblemReports(tableId);
+ } catch (Exception e) {
+ log.error("Failed to delete problem reports for table " + tableId, e);
+ }
+
+ if (refCount == 0) {
+ // delete the map files
+ try {
+ VolumeManager fs = master.getFileSystem();
+ for (String dir : ServerConstants.getTablesDirs()) {
+ fs.deleteRecursively(new Path(dir, tableId));
+ }
+ } catch (IOException e) {
+ log.error("Unable to remove deleted table directory", e);
++ } catch (IllegalArgumentException exception) {
++ if (exception.getCause() instanceof UnknownHostException) {
++ /* Thrown if HDFS encounters a DNS problem in some edge cases */
++ log.error("Unable to remove deleted table directory", exception);
++ } else {
++ throw exception;
++ }
+ }
+ }
+
+ // remove table from zookeeper
+ try {
+ TableManager.getInstance().removeTable(tableId);
+ Tables.clearCache(master.getInstance());
+ } catch (Exception e) {
+ log.error("Failed to find table id in zookeeper", e);
+ }
+
+ // remove any permissions associated with this table
+ try {
+ AuditedSecurityOperation.getInstance().deleteTable(SystemCredentials.get().toThrift(master.getInstance()), tableId);
+ } catch (ThriftSecurityException e) {
+ log.error(e.getMessage(), e);
+ }
+
+ Utils.unreserveTable(tableId, tid, true);
+ Utils.unreserveNamespace(namespaceId, tid, false);
+
+ Logger.getLogger(CleanUp.class).debug("Deleted table " + tableId);
+
+ return null;
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {
+ // nothing to do
+ }
+
+}
+
+public class DeleteTable extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private String tableId, namespaceId;
+
+ public DeleteTable(String tableId) {
+ this.tableId = tableId;
+ Instance inst = HdfsZooInstance.getInstance();
+ this.namespaceId = Tables.getNamespace(inst, tableId);
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+
+ return Utils.reserveNamespace(namespaceId, tid, false, false, TableOperation.DELETE)
+ + Utils.reserveTable(tableId, tid, true, true, TableOperation.DELETE);
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master environment) throws Exception {
+ TableManager.getInstance().transitionTableState(tableId, TableState.DELETING);
+ environment.getEventCoordinator().event("deleting table %s ", tableId);
+ return new CleanUp(tableId, namespaceId);
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {
+ Utils.unreserveNamespace(namespaceId, tid, false);
+ Utils.unreserveTable(tableId, tid, true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index d513ebc,0000000..32898f4
mode 100644,000000..100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@@ -1,291 -1,0 +1,322 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tracer;
+
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
++import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.AgeOffFilter;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.trace.TraceFormatter;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.start.classloader.AccumuloClassLoader;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.thrift.RemoteSpan;
+import org.apache.accumulo.trace.thrift.SpanReceiver.Iface;
+import org.apache.accumulo.trace.thrift.SpanReceiver.Processor;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+public class TraceServer implements Watcher {
+
+ final private static Logger log = Logger.getLogger(TraceServer.class);
+ final private ServerConfiguration serverConfiguration;
+ final private TServer server;
- private BatchWriter writer = null;
- private Connector connector;
++ final private AtomicReference<BatchWriter> writer;
++ final private Connector connector;
+ final String table;
+
+ private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) {
+ m.put(new Text(cf), new Text(cq), new Value(bytes, 0, len));
+ }
+
+ static class ByteArrayTransport extends TTransport {
+ TByteArrayOutputStream out = new TByteArrayOutputStream();
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public void open() throws TTransportException {}
+
+ @Override
+ public void close() {}
+
+ @Override
+ public int read(byte[] buf, int off, int len) {
+ return 0;
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ out.write(buf, off, len);
+ }
+
+ public byte[] get() {
+ return out.get();
+ }
+
+ public int len() {
+ return out.len();
+ }
+ }
+
+ class Receiver implements Iface {
+ @Override
+ public void span(RemoteSpan s) throws TException {
+ String idString = Long.toHexString(s.traceId);
+ String startString = Long.toHexString(s.start);
+ Mutation spanMutation = new Mutation(new Text(idString));
+ Mutation indexMutation = new Mutation(new Text("idx:" + s.svc + ":" + startString));
+ long diff = s.stop - s.start;
+ indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString + ":" + Long.toHexString(diff)).getBytes()));
+ ByteArrayTransport transport = new ByteArrayTransport();
+ TCompactProtocol protocol = new TCompactProtocol(transport);
+ s.write(protocol);
+ String parentString = Long.toHexString(s.parentId);
+ if (s.parentId == Span.ROOT_SPAN_ID)
+ parentString = "";
+ put(spanMutation, "span", parentString + ":" + Long.toHexString(s.spanId), transport.get(), transport.len());
+ // Map the root span to time so we can look up traces by time
+ Mutation timeMutation = null;
+ if (s.parentId == Span.ROOT_SPAN_ID) {
+ timeMutation = new Mutation(new Text("start:" + startString));
+ put(timeMutation, "id", idString, transport.get(), transport.len());
+ }
+ try {
- if (writer == null)
- resetWriter();
- if (writer == null)
++ final BatchWriter writer = TraceServer.this.writer.get();
++ /* Check for null, because we expect spans to come in much faster than flush calls.
++ In the case of failure, we'd rather avoid logging tons of NPEs.
++ */
++ if (null == writer) {
++ log.warn("writer is not ready; discarding span.");
+ return;
++ }
+ writer.addMutation(spanMutation);
+ writer.addMutation(indexMutation);
+ if (timeMutation != null)
+ writer.addMutation(timeMutation);
- } catch (Exception ex) {
- log.error("Unable to write mutation to table: " + spanMutation, ex);
++ } catch (MutationsRejectedException exception) {
++ log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for span information and stacktrace. cause: " + exception);
++ if (log.isDebugEnabled()) {
++ log.debug("discarded span due to rejection of mutation: " + spanMutation, exception);
++ }
++ /* XXX this could be e.g. an IllegalArgumentExceptoion if we're trying to write this mutation to a writer that has been closed since we retrieved it */
++ } catch (RuntimeException exception) {
++ log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for stacktrace. cause: " + exception);
++ log.debug("unable to write mutation to table due to exception.", exception);
+ }
+ }
+
+ }
+
+ public TraceServer(ServerConfiguration serverConfiguration, String hostname) throws Exception {
+ this.serverConfiguration = serverConfiguration;
+ AccumuloConfiguration conf = serverConfiguration.getConfiguration();
+ table = conf.get(Property.TRACE_TABLE);
++ Connector connector = null;
+ while (true) {
+ try {
+ String principal = conf.get(Property.TRACE_USER);
+ AuthenticationToken at;
+ Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX);
+ if (loginMap.isEmpty()) {
+ Property p = Property.TRACE_PASSWORD;
+ at = new PasswordToken(conf.get(p).getBytes());
+ } else {
+ Properties props = new Properties();
+ AuthenticationToken token = AccumuloClassLoader.getClassLoader().loadClass(conf.get(Property.TRACE_TOKEN_TYPE)).asSubclass(AuthenticationToken.class)
+ .newInstance();
+
+ int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length() + 1;
+ for (Entry<String,String> entry : loginMap.entrySet()) {
+ props.put(entry.getKey().substring(prefixLength), entry.getValue());
+ }
+
+ token.init(props);
+
+ at = token;
+ }
+
+ connector = serverConfiguration.getInstance().getConnector(principal, at);
+ if (!connector.tableOperations().exists(table)) {
+ connector.tableOperations().create(table);
+ IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName());
+ AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l);
+ connector.tableOperations().attachIterator(table, setting);
+ }
+ connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
+ break;
+ } catch (Exception ex) {
+ log.info("Waiting to checking/create the trace table.", ex);
+ UtilWaitThread.sleep(1000);
+ }
+ }
++ this.connector = connector;
++ // make sure we refer to the final variable from now on.
++ connector = null;
+
+ int port = conf.getPort(Property.TRACE_PORT);
+ final ServerSocket sock = ServerSocketChannel.open().socket();
+ sock.setReuseAddress(true);
+ sock.bind(new InetSocketAddress(hostname, port));
+ final TServerTransport transport = new TServerSocket(sock);
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.processor(new Processor<Iface>(new Receiver()));
+ server = new TThreadPoolServer(options);
+ registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort());
- writer = connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS));
++ writer = new AtomicReference<BatchWriter>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS)));
+ }
+
+ public void run() throws Exception {
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ flush();
+ }
+ }, 1000, 1000);
+ server.serve();
+ }
+
+ private void flush() {
+ try {
- writer.flush();
- } catch (Exception e) {
- log.error("Error flushing traces", e);
++ final BatchWriter writer = this.writer.get();
++ if (null != writer) {
++ writer.flush();
++ }
++ } catch (MutationsRejectedException exception) {
++ log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception);
++ log.debug("flushing traces failed due to exception", exception);
++ resetWriter();
++ /* XXX e.g. if the writer was closed between when we grabbed it and when we called flush. */
++ } catch (RuntimeException exception) {
++ log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception);
++ log.debug("flushing traces failed due to exception", exception);
+ resetWriter();
+ }
+ }
+
- synchronized private void resetWriter() {
++ private void resetWriter() {
++ BatchWriter writer = null;
+ try {
- if (writer != null)
- writer.close();
++ writer = connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS));
+ } catch (Exception ex) {
- log.error("Error closing batch writer", ex);
++ log.warn("Unable to create a batch writer, will retry. Set log level to DEBUG to see stacktrace. cause: " + ex);
++ log.debug("batch writer creation failed with exception.", ex);
+ } finally {
- writer = null;
++ /* Trade in the new writer (even if null) for the one we need to close. */
++ writer = this.writer.getAndSet(writer);
+ try {
- writer = connector.createBatchWriter(table, new BatchWriterConfig());
++ if (null != writer) {
++ writer.close();
++ }
+ } catch (Exception ex) {
- log.error("Unable to create a batch writer: " + ex);
++ log.warn("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + ex);
++ log.debug("batch writer close failed with exception", ex);
+ }
+ }
+ }
+
+ private void registerInZooKeeper(String name) throws Exception {
+ String root = ZooUtil.getRoot(serverConfiguration.getInstance()) + Constants.ZTRACERS;
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes());
+ zoo.exists(path, this);
+ }
+
+ public static void main(String[] args) throws Exception {
+ SecurityUtil.serverLogin();
+ ServerOpts opts = new ServerOpts();
+ opts.parseArgs("tracer", args);
+ Instance instance = HdfsZooInstance.getInstance();
+ ServerConfiguration conf = new ServerConfiguration(instance);
+ VolumeManager fs = VolumeManagerImpl.get();
+ Accumulo.init(fs, conf, "tracer");
+ String hostname = opts.getAddress();
+ TraceServer server = new TraceServer(conf, hostname);
+ Accumulo.enableTracing(hostname, "tserver");
+ server.run();
+ log.info("tracer stopping");
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
+ if (event.getState() == KeeperState.Expired) {
+ log.warn("Trace server lost zookeeper registration at " + event.getPath());
+ server.stop();
+ } else if (event.getType() == EventType.NodeDeleted) {
+ log.warn("Trace server zookeeper entry lost " + event.getPath());
+ server.stop();
+ }
+ if (event.getPath() != null) {
+ try {
+ if (ZooReaderWriter.getInstance().exists(event.getPath(), this))
+ return;
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ log.warn("Trace server unable to reset watch on zookeeper registration");
+ server.stop();
+ }
+ }
+
+}