You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/12/13 00:31:07 UTC
[1/3] git commit: ACCUMULO-1999 Backport changes from 1664 which
advertise the Master's random port
Updated Branches:
refs/heads/1.6.0-SNAPSHOT 7f37d96c3 -> 2610e821b
ACCUMULO-1999 Backport changes from 1664 which advertise the Master's random port
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/622fc7d2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/622fc7d2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/622fc7d2
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 622fc7d20744ae8a7e75ba0dc945777f94254b48
Parents: c2cd051
Author: Josh Elser <el...@apache.org>
Authored: Tue Dec 10 14:55:18 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Dec 12 15:38:15 2013 -0500
----------------------------------------------------------------------
.../1GB/native-standalone/generic_logger.xml | 2 +-
conf/examples/1GB/standalone/generic_logger.xml | 2 +-
.../2GB/native-standalone/generic_logger.xml | 2 +-
conf/examples/2GB/standalone/generic_logger.xml | 2 +-
.../3GB/native-standalone/generic_logger.xml | 2 +-
conf/examples/3GB/standalone/generic_logger.xml | 2 +-
.../512MB/native-standalone/generic_logger.xml | 2 +-
.../512MB/standalone/generic_logger.xml | 2 +-
.../org/apache/accumulo/core/Constants.java | 3 +
.../accumulo/core/client/impl/MasterClient.java | 3 +
.../apache/accumulo/fate/zookeeper/ZooLock.java | 5 +
.../minicluster/MiniAccumuloCluster.java | 17 +-
.../org/apache/accumulo/server/Accumulo.java | 64 +++++++-
.../server/gc/SimpleGarbageCollector.java | 2 +-
.../apache/accumulo/server/master/Master.java | 10 +-
.../accumulo/server/monitor/LogService.java | 15 +-
.../apache/accumulo/server/monitor/Monitor.java | 12 +-
.../org/apache/accumulo/server/util/Info.java | 33 ++++
.../server/util/TNonblockingServerSocket.java | 157 +++++++++++++++++++
.../accumulo/server/util/TServerUtils.java | 6 +-
.../java/org/apache/accumulo/start/Main.java | 6 +-
.../accumulo/fate/zookeeper/ZooLockTest.java | 18 +++
22 files changed, 345 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/1GB/native-standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/1GB/native-standalone/generic_logger.xml b/conf/examples/1GB/native-standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/1GB/native-standalone/generic_logger.xml
+++ b/conf/examples/1GB/native-standalone/generic_logger.xml
@@ -43,7 +43,7 @@
<!-- Send all logging data to a centralized logger -->
<appender name="N1" class="org.apache.log4j.net.SocketAppender">
<param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/>
- <param name="port" value="4560"/>
+ <param name="port" value="${org.apache.accumulo.core.host.log.port}"/>
<param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
<param name="Threshold" value="WARN"/>
</appender>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/1GB/standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/1GB/standalone/generic_logger.xml b/conf/examples/1GB/standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/1GB/standalone/generic_logger.xml
+++ b/conf/examples/1GB/standalone/generic_logger.xml
@@ -43,7 +43,7 @@
<!-- Send all logging data to a centralized logger -->
<appender name="N1" class="org.apache.log4j.net.SocketAppender">
<param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/>
- <param name="port" value="4560"/>
+ <param name="port" value="${org.apache.accumulo.core.host.log.port}"/>
<param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
<param name="Threshold" value="WARN"/>
</appender>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/2GB/native-standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/2GB/native-standalone/generic_logger.xml b/conf/examples/2GB/native-standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/2GB/native-standalone/generic_logger.xml
+++ b/conf/examples/2GB/native-standalone/generic_logger.xml
@@ -43,7 +43,7 @@
<!-- Send all logging data to a centralized logger -->
<appender name="N1" class="org.apache.log4j.net.SocketAppender">
<param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/>
- <param name="port" value="4560"/>
+ <param name="port" value="${org.apache.accumulo.core.host.log.port}"/>
<param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
<param name="Threshold" value="WARN"/>
</appender>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/2GB/standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/2GB/standalone/generic_logger.xml b/conf/examples/2GB/standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/2GB/standalone/generic_logger.xml
+++ b/conf/examples/2GB/standalone/generic_logger.xml
@@ -43,7 +43,7 @@
<!-- Send all logging data to a centralized logger -->
<appender name="N1" class="org.apache.log4j.net.SocketAppender">
<param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/>
- <param name="port" value="4560"/>
+ <param name="port" value="${org.apache.accumulo.core.host.log.port}"/>
<param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
<param name="Threshold" value="WARN"/>
</appender>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/3GB/native-standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/3GB/native-standalone/generic_logger.xml b/conf/examples/3GB/native-standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/3GB/native-standalone/generic_logger.xml
+++ b/conf/examples/3GB/native-standalone/generic_logger.xml
@@ -43,7 +43,7 @@
<!-- Send all logging data to a centralized logger -->
<appender name="N1" class="org.apache.log4j.net.SocketAppender">
<param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/>
- <param name="port" value="4560"/>
+ <param name="port" value="${org.apache.accumulo.core.host.log.port}"/>
<param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
<param name="Threshold" value="WARN"/>
</appender>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/3GB/standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/3GB/standalone/generic_logger.xml b/conf/examples/3GB/standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/3GB/standalone/generic_logger.xml
+++ b/conf/examples/3GB/standalone/generic_logger.xml
@@ -43,7 +43,7 @@
<!-- Send all logging data to a centralized logger -->
<appender name="N1" class="org.apache.log4j.net.SocketAppender">
<param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/>
- <param name="port" value="4560"/>
+ <param name="port" value="${org.apache.accumulo.core.host.log.port}"/>
<param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
<param name="Threshold" value="WARN"/>
</appender>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/512MB/native-standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/512MB/native-standalone/generic_logger.xml b/conf/examples/512MB/native-standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/512MB/native-standalone/generic_logger.xml
+++ b/conf/examples/512MB/native-standalone/generic_logger.xml
@@ -43,7 +43,7 @@
<!-- Send all logging data to a centralized logger -->
<appender name="N1" class="org.apache.log4j.net.SocketAppender">
<param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/>
- <param name="port" value="4560"/>
+ <param name="port" value="${org.apache.accumulo.core.host.log.port}"/>
<param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
<param name="Threshold" value="WARN"/>
</appender>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/conf/examples/512MB/standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/512MB/standalone/generic_logger.xml b/conf/examples/512MB/standalone/generic_logger.xml
index 5dc38ac..565bc82 100644
--- a/conf/examples/512MB/standalone/generic_logger.xml
+++ b/conf/examples/512MB/standalone/generic_logger.xml
@@ -43,7 +43,7 @@
<!-- Send all logging data to a centralized logger -->
<appender name="N1" class="org.apache.log4j.net.SocketAppender">
<param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/>
- <param name="port" value="4560"/>
+ <param name="port" value="${org.apache.accumulo.core.host.log.port}"/>
<param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
<param name="Threshold" value="WARN"/>
</appender>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 17faff6..9bb3419 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -63,6 +63,9 @@ public class Constants {
public static final String ZGC = "/gc";
public static final String ZGC_LOCK = ZGC + "/lock";
+ public static final String ZMONITOR = "/monitor";
+ public static final String ZMONITOR_LOG4J_PORT = ZMONITOR + "/log4j_port";
+
public static final String ZCONFIG = "/config";
public static final String ZTSERVERS = "/tservers";
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
index 81cc546..2a22c54 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
@@ -57,6 +57,9 @@ public class MasterClient {
}
String master = locations.get(0);
+ if (master.endsWith(":0"))
+ return null;
+
int portHint = instance.getConfiguration().getPort(Property.MASTER_CLIENTPORT);
try {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
index fb2f3d8..8772a83 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
@@ -334,6 +334,11 @@ public class ZooLock implements Watcher {
return lock != null;
}
+ public synchronized void replaceLockData(byte[] b) throws KeeperException, InterruptedException {
+ if (getLockPath()!=null)
+ zooKeeper.getZooKeeper().setData(getLockPath(), b, -1);
+ }
+
@Override
public synchronized void process(WatchedEvent event) {
log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
index 3b4d8a3..01115ec 100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
@@ -171,6 +171,15 @@ public class MiniAccumuloCluster {
if (!siteConfig.containsKey(key))
fileWriter.append("<property><name>" + key + "</name><value>" + value + "</value></property>\n");
}
+
+ /**
+ * Sets a given key with a random port for the value on the site config if it doesn't already exist.
+ */
+ private void mergePropWithRandomPort(Map<String,String> siteConfig, String key) {
+ if (!siteConfig.containsKey(key)) {
+ siteConfig.put(key, "0");
+ }
+ }
/**
*
@@ -228,8 +237,6 @@ public class MiniAccumuloCluster {
appendProp(fileWriter, Property.INSTANCE_DFS_DIR, accumuloDir.getAbsolutePath(), siteConfig);
appendProp(fileWriter, Property.INSTANCE_ZK_HOST, "localhost:" + zooKeeperPort, siteConfig);
appendProp(fileWriter, Property.INSTANCE_SECRET, INSTANCE_SECRET, siteConfig);
- appendProp(fileWriter, Property.MASTER_CLIENTPORT, "" + PortUtils.getRandomFreePort(), siteConfig);
- appendProp(fileWriter, Property.TSERV_CLIENTPORT, "" + PortUtils.getRandomFreePort(), siteConfig);
appendProp(fileWriter, Property.TSERV_PORTSEARCH, "true", siteConfig);
appendProp(fileWriter, Property.LOGGER_DIR, walogDir.getAbsolutePath(), siteConfig);
appendProp(fileWriter, Property.TSERV_DATACACHE_SIZE, "10M", siteConfig);
@@ -238,8 +245,12 @@ public class MiniAccumuloCluster {
appendProp(fileWriter, Property.TSERV_WALOG_MAX_SIZE, "100M", siteConfig);
appendProp(fileWriter, Property.TSERV_NATIVEMAP_ENABLED, "false", siteConfig);
appendProp(fileWriter, Property.TRACE_TOKEN_PROPERTY_PREFIX + ".password", config.getRootPassword(), siteConfig);
- appendProp(fileWriter, Property.TRACE_PORT, "" + PortUtils.getRandomFreePort(), siteConfig);
appendProp(fileWriter, Property.GC_CYCLE_DELAY, "30s", siteConfig);
+ mergePropWithRandomPort(siteConfig, Property.MASTER_CLIENTPORT.getKey());
+ mergePropWithRandomPort(siteConfig, Property.TRACE_PORT.getKey());
+ mergePropWithRandomPort(siteConfig, Property.TSERV_CLIENTPORT.getKey());
+ mergePropWithRandomPort(siteConfig, Property.MONITOR_PORT.getKey());
+ mergePropWithRandomPort(siteConfig, Property.GC_PORT.getKey());
// since there is a small amount of memory, check more frequently for majc... setting may not be needed in 1.5
appendProp(fileWriter, Property.TSERV_MAJC_DELAY, "3", siteConfig);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
index f8ca31a..33bb871 100644
--- a/server/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.trace.DistributedTrace;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.Version;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.util.time.SimpleTimer;
@@ -39,10 +40,14 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.apache.log4j.helpers.FileWatchdog;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.xml.DOMConfigurator;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
public class Accumulo {
@@ -82,6 +87,57 @@ public class Accumulo {
}
}
+ private static class LogMonitor extends FileWatchdog implements Watcher {
+ String path;
+
+ protected LogMonitor(String instance, String filename, int delay) {
+ super(filename);
+ setDelay(delay);
+ this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT;
+ }
+
+ private void setMonitorPort() {
+ try {
+ String port = new String(ZooReaderWriter.getInstance().getData(path, null));
+ System.setProperty("org.apache.accumulo.core.host.log.port", port);
+ log.info("Changing monitor log4j port to "+port);
+ doOnChange();
+ } catch (Exception e) {
+ log.error("Error reading zookeeper data for monitor log4j port", e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null)
+ setMonitorPort();
+ log.info("Set watch for monitor log4j port");
+ } catch (Exception e) {
+ log.error("Unable to set watch for monitor log4j port " + path);
+ }
+ super.run();
+ }
+
+ @Override
+ protected void doOnChange() {
+ LogManager.resetConfiguration();
+ new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository());
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ setMonitorPort();
+ if (event.getPath() != null) {
+ try {
+ ZooReaderWriter.getInstance().exists(event.getPath(), this);
+ } catch (Exception ex) {
+ log.error("Unable to reset watch for monitor log4j port", ex);
+ }
+ }
+ }
+ }
+
public static void init(FileSystem fs, ServerConfiguration config, String application) throws UnknownHostException {
System.setProperty("org.apache.accumulo.core.application", application);
@@ -99,6 +155,9 @@ public class Accumulo {
else
System.setProperty("org.apache.accumulo.core.host.log", localhost);
+ int logPort = config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT);
+ System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort));
+
// Use a specific log config, if it exists
String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"), application);
if (!new File(logConfig).exists()) {
@@ -109,7 +168,10 @@ public class Accumulo {
LogLog.setQuietMode(true);
// Configure logging
- DOMConfigurator.configureAndWatch(logConfig, 5000);
+ if (logPort==0)
+ new LogMonitor(config.getInstance().getInstanceID(), logConfig, 5000).start();
+ else
+ DOMConfigurator.configureAndWatch(logConfig, 5000);
log.info(application + " starting");
log.info("Instance " + config.getInstance().getInstanceID());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java b/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
index 4278f5b..af8e308 100644
--- a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
+++ b/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
@@ -418,7 +418,7 @@ public class SimpleGarbageCollector implements Iface {
int port = instance.getConfiguration().getPort(Property.GC_PORT);
long maxMessageSize = instance.getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
try {
- TServerUtils.startTServer(port, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, 1000, maxMessageSize);
+ port = TServerUtils.startTServer(port, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, 1000, maxMessageSize).port;
} catch (Exception ex) {
log.fatal(ex, ex);
throw new RuntimeException(ex);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/master/Master.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/Master.java b/server/src/main/java/org/apache/accumulo/server/master/Master.java
index 779673a..5cb928b 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/Master.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java
@@ -149,6 +149,7 @@ import org.apache.accumulo.server.util.Halt;
import org.apache.accumulo.server.util.MetadataTable;
import org.apache.accumulo.server.util.SystemPropUtil;
import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.util.TServerUtils.ServerPort;
import org.apache.accumulo.server.util.TablePropUtil;
import org.apache.accumulo.server.util.TabletIterator.TabletDeletedException;
import org.apache.accumulo.server.util.time.SimpleTimer;
@@ -2114,8 +2115,13 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
}
Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new MasterClientServiceHandler()));
- clientService = TServerUtils.startServer(getSystemConfiguration(), Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null,
- Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE).server;
+ ServerPort serverPort = TServerUtils.startServer(getSystemConfiguration(), Property.MASTER_CLIENTPORT, processor, "Master",
+ "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
+ clientService = serverPort.server;
+ InetSocketAddress sock = org.apache.accumulo.core.util.AddressUtil.parseAddress(hostname, serverPort.port);
+ String address = org.apache.accumulo.core.util.AddressUtil.toString(sock);
+ log.info("Setting master lock data to " + address);
+ masterLock.replaceLockData(address.getBytes());
while (!clientService.isServing()) {
UtilWaitThread.sleep(100);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java b/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
index a123e9f..ce5dab8 100644
--- a/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
+++ b/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
@@ -24,9 +24,13 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.net.SocketNode;
@@ -55,6 +59,10 @@ public class LogService extends org.apache.log4j.AppenderSkeleton {
}
}
+ public int getLocalPort() {
+ return server.getLocalPort();
+ }
+
public void run() {
try {
while (true) {
@@ -70,9 +78,12 @@ public class LogService extends org.apache.log4j.AppenderSkeleton {
}
}
- static void startLogListener(AccumuloConfiguration conf) {
+ static void startLogListener(AccumuloConfiguration conf, String instanceId) {
try {
- new Daemon(new SocketServer(conf.getPort(Property.MONITOR_LOG4J_PORT))).start();
+ SocketServer server = new SocketServer(conf.getPort(Property.MONITOR_LOG4J_PORT));
+ ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instanceId) + Constants.ZMONITOR_LOG4J_PORT,
+ Integer.toString(server.getLocalPort()).getBytes(), NodeExistsPolicy.OVERWRITE);
+ new Daemon(server).start();
} catch (Throwable t) {
log.info("Unable to listen to cluster-wide ports", t);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
index 3904088..c373610 100644
--- a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
+++ b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
@@ -50,6 +50,7 @@ import org.apache.accumulo.core.util.ServerServices.Service;
import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
@@ -478,9 +479,18 @@ public class Monitor {
server.addServlet(ShowTrace.class, "/trace/show");
if (server.isUsingSsl())
server.addServlet(ShellServlet.class, "/shell");
- LogService.startLogListener(Monitor.getSystemConfiguration());
server.start();
+ try {
+ String monitorAddress = org.apache.accumulo.core.util.AddressUtil.toString(new InetSocketAddress(hostname, server.getPort()));
+ ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMONITOR, monitorAddress.getBytes(),
+ NodeExistsPolicy.OVERWRITE);
+ log.info("Set monitor address in zookeeper to " + monitorAddress);
+ } catch (Exception ex) {
+ log.error("Unable to set monitor address in zookeeper");
+ }
+ LogService.startLogListener(Monitor.getSystemConfiguration(), instance.getInstanceID());
+
new Daemon(new LoggingRunnable(log, new ZooKeeperStatus()), "ZooKeeperStatus").start();
// need to regularly fetch data so plot data is updated
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/util/Info.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/Info.java b/server/src/main/java/org/apache/accumulo/server/util/Info.java
new file mode 100644
index 0000000..4f03d82
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/util/Info.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+
+public class Info {
+ public static void main(String[] args) throws Exception {
+ ZooReaderWriter zrw = ZooReaderWriter.getInstance();
+ Instance instance = HdfsZooInstance.getInstance();
+ System.out.println("monitor: " + new String(zrw.getData(ZooUtil.getRoot(instance) + Constants.ZMONITOR, null)));
+ System.out.println("masters: " + instance.getMasterLocations());
+ System.out.println("zookeepers: " + instance.getZooKeepers());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java b/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
new file mode 100644
index 0000000..4154d9d
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.accumulo.server.util;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+/**
+ * Wrapper around ServerSocketChannel.
+ *
+ * This class is copied from org.apache.thrift.transport.TNonblockingServerSocket version 0.9.
+ * The only change (apart from the logging statements) is the addition of the {@link #getPort()} method to retrieve the port used by the ServerSocket.
+ */
+public class TNonblockingServerSocket extends TNonblockingServerTransport {
+ private static final Logger log = Logger.getLogger(TNonblockingServerTransport.class.getName());
+
+ /**
+ * This channel is where all the nonblocking magic happens.
+ */
+ private ServerSocketChannel serverSocketChannel = null;
+
+ /**
+ * Underlying ServerSocket object
+ */
+ private ServerSocket serverSocket_ = null;
+
+ /**
+ * Timeout for client sockets from accept
+ */
+ private int clientTimeout_ = 0;
+
+ /**
+ * Creates just a port listening server socket
+ */
+ public TNonblockingServerSocket(int port) throws TTransportException {
+ this(port, 0);
+ }
+
+ /**
+ * Creates just a port listening server socket
+ */
+ public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
+ this(new InetSocketAddress(port), clientTimeout);
+ }
+
+ public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
+ this(bindAddr, 0);
+ }
+
+ public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
+ clientTimeout_ = clientTimeout;
+ try {
+ serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(false);
+
+ // Make server socket
+ serverSocket_ = serverSocketChannel.socket();
+ // Prevent 2MSL delay problem on server restarts
+ serverSocket_.setReuseAddress(true);
+ // Bind to listening port
+ serverSocket_.bind(bindAddr);
+ } catch (IOException ioe) {
+ serverSocket_ = null;
+ throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
+ }
+ }
+
+ public void listen() throws TTransportException {
+ // Make sure not to block on accept
+ if (serverSocket_ != null) {
+ try {
+ serverSocket_.setSoTimeout(0);
+ } catch (SocketException sx) {
+ sx.printStackTrace();
+ }
+ }
+ }
+
+ protected TNonblockingSocket acceptImpl() throws TTransportException {
+ if (serverSocket_ == null) {
+ throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+ }
+ try {
+ SocketChannel socketChannel = serverSocketChannel.accept();
+ if (socketChannel == null) {
+ return null;
+ }
+
+ TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
+ tsocket.setTimeout(clientTimeout_);
+ return tsocket;
+ } catch (IOException iox) {
+ throw new TTransportException(iox);
+ }
+ }
+
+ public void registerSelector(Selector selector) {
+ try {
+ // Register the server socket channel, indicating an interest in
+ // accepting new connections
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+ } catch (ClosedChannelException e) {
+ // this shouldn't happen, ideally...
+ // TODO: decide what to do with this.
+ }
+ }
+
+ public void close() {
+ if (serverSocket_ != null) {
+ try {
+ serverSocket_.close();
+ } catch (IOException iox) {
+ log.warn("WARNING: Could not close server socket: " + iox.getMessage());
+ }
+ serverSocket_ = null;
+ }
+ }
+
+ public void interrupt() {
+ // The thread-safeness of this is dubious, but Java documentation suggests
+ // that it is safe to do this from a different thread context
+ close();
+ }
+
+ public int getPort() {
+ return serverSocket_.getLocalPort();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index be14023..ef25513 100644
--- a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@ -44,7 +44,6 @@ import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
@@ -111,7 +110,7 @@ public class TServerUtils {
for (int i = 0; i < portsToSearch; i++) {
int port = portHint + i;
- if (portHint == 0)
+ if (portHint != 0 && i > 0)
port = 1024 + random.nextInt(65535 - 1024);
if (port > 65535)
port = 1024 + port % (65535 - 1024);
@@ -213,6 +212,9 @@ public class TServerUtils {
public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, final int numThreads,
long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
+ if (port == 0) {
+ port = transport.getPort();
+ }
THsHaServer.Args options = new THsHaServer.Args(transport);
options.protocolFactory(ThriftUtil.protocolFactory());
options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/start/src/main/java/org/apache/accumulo/start/Main.java
----------------------------------------------------------------------
diff --git a/start/src/main/java/org/apache/accumulo/start/Main.java b/start/src/main/java/org/apache/accumulo/start/Main.java
index a93a9d8..cbc90c6 100644
--- a/start/src/main/java/org/apache/accumulo/start/Main.java
+++ b/start/src/main/java/org/apache/accumulo/start/Main.java
@@ -72,9 +72,11 @@ public class Main {
} else if (args[0].equals("rfile-info")) {
runTMP = cl.loadClass("org.apache.accumulo.core.file.rfile.PrintInfo");
} else if (args[0].equals("login-info")) {
- runTMP = cl.loadClass("org.apache.accumulo.core.util.LoginProperties");
+ runTMP = cl.loadClass("org.apache.accumulo.server.util.LoginProperties");
} else if (args[0].equals("zookeeper")) {
runTMP = cl.loadClass("org.apache.accumulo.server.util.ZooKeeperMain");
+ } else if (args[0].equals("info")) {
+ runTMP = cl.loadClass("org.apache.accumulo.server.util.Info");
} else {
try {
runTMP = cl.loadClass(args[0]);
@@ -118,6 +120,6 @@ public class Main {
}
private static void printUsage() {
- System.out.println("accumulo init | master | tserver | monitor | shell | admin | gc | classpath | rfile-info | login-info | tracer | proxy | zookeeper | <accumulo class> args");
+ System.out.println("accumulo init | master | tserver | monitor | shell | admin | gc | classpath | rfile-info | login-info | tracer | proxy | zookeeper | info | version | <accumulo class> args");
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/622fc7d2/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java b/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
index 8a2a6ef..0c57250 100644
--- a/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
+++ b/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
@@ -328,6 +328,24 @@ public class ZooLockTest {
zl.unlock();
}
+ @Test(timeout = 10000)
+ public void testChangeData() throws Exception {
+ String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+ ZooKeeper zk = new ZooKeeper(accumulo.getZooKeepers(), 1000, null);
+ zk.addAuthInfo("digest", "secret".getBytes());
+ zk.create(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ ZooLock zl = new ZooLock(accumulo.getZooKeepers(), 1000, "digest", "secret".getBytes(), parent);
+
+ TestALW lw = new TestALW();
+
+ zl.lockAsync(lw, "test1".getBytes());
+ Assert.assertEquals("test1", new String(zk.getData(zl.getLockPath(), null, null)));
+
+ zl.replaceLockData("test2".getBytes());
+ Assert.assertEquals("test2", new String(zk.getData(zl.getLockPath(), null, null)));
+ }
+
@AfterClass
public static void tearDownMiniCluster() throws Exception {
accumulo.stop();
[2/3] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Posted by el...@apache.org.
Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/db8addb9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/db8addb9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/db8addb9
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: db8addb9fea71d1920c0328e4e5263343fe79b53
Parents: c04c9de 622fc7d
Author: Josh Elser <el...@apache.org>
Authored: Thu Dec 12 16:46:19 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Dec 12 16:46:19 2013 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------
[3/3] git commit: Merge branch '1.6.0-SNAPSHOT' of
http://git-wip-us.apache.org/repos/asf/accumulo into 1.6.0-SNAPSHOT
Posted by el...@apache.org.
Merge branch '1.6.0-SNAPSHOT' of http://git-wip-us.apache.org/repos/asf/accumulo into 1.6.0-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2610e821
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2610e821
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2610e821
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 2610e821bf2b3b4a7dd9b891ea248dbd1ec55008
Parents: db8addb 7f37d96
Author: Josh Elser <el...@apache.org>
Authored: Thu Dec 12 18:31:10 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Dec 12 18:31:10 2013 -0500
----------------------------------------------------------------------
test/pom.xml | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------