You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/01/10 16:15:39 UTC
[4/7] git commit: ACCUMULO-2160 back-port real bugs found by findbugs
ACCUMULO-2160 back-port real bugs found by findbugs
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e135e039
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e135e039
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e135e039
Branch: refs/heads/master
Commit: e135e039a33c1e3917bed87c16efcf6535418d15
Parents: 2d51e32 cb50a74
Author: Eric Newton <er...@gmail.com>
Authored: Thu Jan 9 15:50:51 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Thu Jan 9 15:50:51 2014 -0500
----------------------------------------------------------------------
server/base/src/main/java/org/apache/accumulo/server/Accumulo.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e135e039/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
index 83f54b0,0000000..c546590
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
@@@ -1,264 -1,0 +1,264 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.trace.DistributedTrace;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.util.Version;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.log4j.helpers.FileWatchdog;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+public class Accumulo {
+
+ private static final Logger log = Logger.getLogger(Accumulo.class);
+
+ public static synchronized void updateAccumuloVersion(VolumeManager fs) {
+ try {
+ if (getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) {
+ fs.create(new Path(ServerConstants.getDataVersionLocation() + "/" + ServerConstants.DATA_VERSION));
+ fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + ServerConstants.PREV_DATA_VERSION));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to set accumulo version: an error occurred.", e);
+ }
+ }
+
+ public static synchronized int getAccumuloPersistentVersion(VolumeManager fs) {
+ int dataVersion;
+ try {
+ FileStatus[] files = fs.getDefaultVolume().listStatus(ServerConstants.getDataVersionLocation());
+ if (files == null || files.length == 0) {
+ dataVersion = -1; // assume it is 0.5 or earlier
+ } else {
+ dataVersion = Integer.parseInt(files[0].getPath().getName());
+ }
+ return dataVersion;
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to read accumulo version: an error occurred.", e);
+ }
+ }
+
+ public static void enableTracing(String address, String application) {
+ try {
+ DistributedTrace.enable(HdfsZooInstance.getInstance(), ZooReaderWriter.getInstance(), application, address);
+ } catch (Exception ex) {
+ log.error("creating remote sink for trace spans", ex);
+ }
+ }
+
+ private static class LogMonitor extends FileWatchdog implements Watcher {
+ String path;
+
+ protected LogMonitor(String instance, String filename, int delay) {
+ super(filename);
+ setDelay(delay);
+ this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT;
+ }
+
+ private void setMonitorPort() {
+ try {
+ String port = new String(ZooReaderWriter.getInstance().getData(path, null));
+ System.setProperty("org.apache.accumulo.core.host.log.port", port);
+ log.info("Changing monitor log4j port to "+port);
+ doOnChange();
+ } catch (Exception e) {
+ log.error("Error reading zookeeper data for monitor log4j port", e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null)
+ setMonitorPort();
+ log.info("Set watch for monitor log4j port");
+ } catch (Exception e) {
+ log.error("Unable to set watch for monitor log4j port " + path);
+ }
+ super.run();
+ }
+
+ @Override
+ protected void doOnChange() {
+ LogManager.resetConfiguration();
+ new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository());
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ setMonitorPort();
+ if (event.getPath() != null) {
+ try {
+ ZooReaderWriter.getInstance().exists(event.getPath(), this);
+ } catch (Exception ex) {
+ log.error("Unable to reset watch for monitor log4j port", ex);
+ }
+ }
+ }
+ }
+
+ public static void init(VolumeManager fs, ServerConfiguration config, String application) throws UnknownHostException {
+
+ System.setProperty("org.apache.accumulo.core.application", application);
+
+ if (System.getenv("ACCUMULO_LOG_DIR") != null)
+ System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_LOG_DIR"));
+ else
+ System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_HOME") + "/logs/");
+
+ String localhost = InetAddress.getLocalHost().getHostName();
+ System.setProperty("org.apache.accumulo.core.ip.localhost.hostname", localhost);
+
+ if (System.getenv("ACCUMULO_LOG_HOST") != null)
+ System.setProperty("org.apache.accumulo.core.host.log", System.getenv("ACCUMULO_LOG_HOST"));
+ else
+ System.setProperty("org.apache.accumulo.core.host.log", localhost);
+
+ int logPort = config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT);
+ System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort));
+
+ // Use a specific log config, if it exists
- String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"), application);
++ String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
+ if (!new File(logConfig).exists()) {
+ // otherwise, use the generic config
+ logConfig = String.format("%s/generic_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
+ }
+ // Turn off messages about not being able to reach the remote logger... we protect against that.
+ LogLog.setQuietMode(true);
+
+ // Configure logging
+ if (logPort==0)
+ new LogMonitor(config.getInstance().getInstanceID(), logConfig, 5000).start();
+ else
+ DOMConfigurator.configureAndWatch(logConfig, 5000);
+
+ // Read the auditing config
+ String auditConfig = String.format("%s/auditLog.xml", System.getenv("ACCUMULO_CONF_DIR"));
+
+ DOMConfigurator.configureAndWatch(auditConfig, 5000);
+
+ log.info(application + " starting");
+ log.info("Instance " + config.getInstance().getInstanceID());
+ int dataVersion = Accumulo.getAccumuloPersistentVersion(fs);
+ log.info("Data Version " + dataVersion);
+ Accumulo.waitForZookeeperAndHdfs(fs);
+
+ Version codeVersion = new Version(Constants.VERSION);
+ if (dataVersion != ServerConstants.DATA_VERSION && dataVersion != ServerConstants.PREV_DATA_VERSION) {
+ throw new RuntimeException("This version of accumulo (" + codeVersion + ") is not compatible with files stored using data version " + dataVersion);
+ }
+
+ TreeMap<String,String> sortedProps = new TreeMap<String,String>();
+ for (Entry<String,String> entry : config.getConfiguration())
+ sortedProps.put(entry.getKey(), entry.getValue());
+
+ for (Entry<String,String> entry : sortedProps.entrySet()) {
+ String key = entry.getKey();
+ log.info(key + " = " + (Property.isSensitive(key) ? "<hidden>" : entry.getValue()));
+ }
+
+ monitorSwappiness();
+ }
+
+ /**
+ *
+ */
+ public static void monitorSwappiness() {
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ String procFile = "/proc/sys/vm/swappiness";
+ File swappiness = new File(procFile);
+ if (swappiness.exists() && swappiness.canRead()) {
+ InputStream is = new FileInputStream(procFile);
+ try {
+ byte[] buffer = new byte[10];
+ int bytes = is.read(buffer);
+ String setting = new String(buffer, 0, bytes);
+ setting = setting.trim();
+ if (bytes > 0 && Integer.parseInt(setting) > 10) {
+ log.warn("System swappiness setting is greater than ten (" + setting + ") which can cause time-sensitive operations to be delayed. "
+ + " Accumulo is time sensitive because it needs to maintain distributed lock agreement.");
+ }
+ } finally {
+ is.close();
+ }
+ }
+ } catch (Throwable t) {
+ log.error(t, t);
+ }
+ }
+ }, 1000, 10 * 60 * 1000);
+ }
+
+ public static void waitForZookeeperAndHdfs(VolumeManager fs) {
+ log.info("Attempting to talk to zookeeper");
+ while (true) {
+ try {
+ ZooReaderWriter.getInstance().getChildren(Constants.ZROOT);
+ break;
+ } catch (InterruptedException e) {
+ // ignored
+ } catch (KeeperException ex) {
+ log.info("Waiting for accumulo to be initialized");
+ UtilWaitThread.sleep(1000);
+ }
+ }
+ log.info("Zookeeper connected and initialized, attemping to talk to HDFS");
+ long sleep = 1000;
+ while (true) {
+ try {
+ if (fs.isReady())
+ break;
+ log.warn("Waiting for the NameNode to leave safemode");
+ } catch (IOException ex) {
+ log.warn("Unable to connect to HDFS");
+ }
+ log.info("Sleeping " + sleep / 1000. + " seconds");
+ UtilWaitThread.sleep(sleep);
+ sleep = Math.min(60 * 1000, sleep * 2);
+ }
+ log.info("Connected to HDFS");
+ }
+
+}