You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/06/04 03:01:03 UTC
[3/5] accumulo git commit: ACCUMULO-3887 Support optional sessionId
in tserver stop.
ACCUMULO-3887 Support optional sessionId in tserver stop.
Also added more debuggin inside the master around the FATE
shutdownTServer op. New unit tests as well.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c5aa060e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c5aa060e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c5aa060e
Branch: refs/heads/master
Commit: c5aa060ee90fe5fa1fe9923973394d4b204f7f62
Parents: d764d27
Author: Josh Elser <el...@apache.org>
Authored: Wed Jun 3 20:08:46 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Jun 3 21:00:06 2015 -0400
----------------------------------------------------------------------
.../accumulo/server/master/LiveTServerSet.java | 27 +++++-
.../org/apache/accumulo/server/util/Admin.java | 45 +++++++++-
.../server/master/LiveTServerSetTest.java | 52 +++++++++++
.../apache/accumulo/server/util/AdminTest.java | 94 ++++++++++++++++++++
.../master/MasterClientServiceHandler.java | 5 ++
.../master/tserverOps/ShutdownTServer.java | 2 +-
6 files changed, 219 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
index 0ea6b41..0c0cceb 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
@@ -367,10 +367,29 @@ public class LiveTServerSet implements Watcher {
}
public synchronized TServerInstance find(String tabletServer) {
- HostAndPort addr = AddressUtil.parseAddress(tabletServer, false);
- for (Entry<String,TServerInfo> entry : current.entrySet()) {
- if (entry.getValue().instance.getLocation().equals(addr))
- return entry.getValue().instance;
+ return find(current, tabletServer);
+ }
+
+ TServerInstance find(Map<String,TServerInfo> servers, String tabletServer) {
+ HostAndPort addr;
+ String sessionId = null;
+ if (']' == tabletServer.charAt(tabletServer.length() - 1)) {
+ int index = tabletServer.indexOf('[');
+ if (-1 == index) {
+ throw new IllegalArgumentException("Could not parse tabletserver '" + tabletServer + "'");
+ }
+ addr = AddressUtil.parseAddress(tabletServer.substring(0, index), false);
+ // Strip off the last bracket
+ sessionId = tabletServer.substring(index + 1, tabletServer.length() - 1);
+ } else {
+ addr = AddressUtil.parseAddress(tabletServer, false);
+ }
+ for (Entry<String,TServerInfo> entry : servers.entrySet()) {
+ if (entry.getValue().instance.getLocation().equals(addr)) {
+ // Return the instance if we have no desired session ID, or we match the desired session ID
+ if (null == sessionId || sessionId.equals(entry.getValue().instance.getSession()))
+ return entry.getValue().instance;
+ }
}
return null;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index bdbc7f0..c7f9739 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -31,6 +31,7 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
@@ -54,12 +55,17 @@ import org.apache.accumulo.core.security.SystemPermission;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.start.spi.KeywordExecutable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +73,7 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.auto.service.AutoService;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
@@ -362,9 +369,12 @@ public class Admin implements KeywordExecutable {
log.info("No masters running. Not attempting safe unload of tserver.");
return;
}
+ final Instance instance = context.getInstance();
+ final String zTServerRoot = getTServersZkPath(instance);
+ final ZooCache zc = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
for (String server : servers) {
HostAndPort address = AddressUtil.parseAddress(server, context.getConfiguration().getPort(Property.TSERV_CLIENTPORT));
- final String finalServer = address.toString();
+ final String finalServer = qualifyWithZooKeeperSessionId(zTServerRoot, zc, address.toString());
log.info("Stopping server " + finalServer);
MasterClient.execute(context, new ClientExec<MasterClientService.Client>() {
@Override
@@ -375,6 +385,39 @@ public class Admin implements KeywordExecutable {
}
}
+ /**
+ * Get the parent ZNode for tservers for the given instance
+ *
+ * @param instance
+ * The Instance
+ * @return The tservers znode for the instance
+ */
+ static String getTServersZkPath(Instance instance) {
+ Preconditions.checkNotNull(instance);
+ final String instanceRoot = ZooUtil.getRoot(instance);
+ return instanceRoot + Constants.ZTSERVERS;
+ }
+
+ /**
+ * Look up the TabletServers in ZooKeeper and try to find a sessionID for this server reference
+ *
+ * @param hostAndPort
+ * The host and port for a TabletServer
+ * @return The host and port with the session ID in square-brackets appended, or the original value.
+ */
+ static String qualifyWithZooKeeperSessionId(String zTServerRoot, ZooCache zooCache, String hostAndPort) {
+ try {
+ long sessionId = ZooLock.getSessionId(zooCache, zTServerRoot + "/" + hostAndPort);
+ if (0 == sessionId) {
+ return hostAndPort;
+ }
+ return hostAndPort + "[" + sessionId + "]";
+ } catch (InterruptedException | KeeperException e) {
+ log.warn("Failed to communicate with ZooKeeper to find session ID for TabletServer.");
+ return hostAndPort;
+ }
+ }
+
private static final String ACCUMULO_SITE_BACKUP_FILE = "accumulo-site.xml.bak";
private static final String NS_FILE_SUFFIX = "_ns.cfg";
private static final String USER_FILE_SUFFIX = "_user.cfg";
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/base/src/test/java/org/apache/accumulo/server/master/LiveTServerSetTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/LiveTServerSetTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/LiveTServerSetTest.java
new file mode 100644
index 0000000..bcdb832
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/LiveTServerSetTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.server.master.LiveTServerSet.Listener;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerInfo;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+public class LiveTServerSetTest {
+
+ @Test
+ public void testSessionIds() {
+ Map<String,TServerInfo> servers = new HashMap<>();
+ TServerConnection mockConn = EasyMock.createMock(TServerConnection.class);
+
+ TServerInfo server1 = new TServerInfo(new TServerInstance(HostAndPort.fromParts("localhost", 1234), "5555"), mockConn);
+ servers.put("server1", server1);
+
+ LiveTServerSet tservers = new LiveTServerSet(EasyMock.createMock(ClientContext.class), EasyMock.createMock(Listener.class));
+
+ assertEquals(server1.instance, tservers.find(servers, "localhost:1234"));
+ assertNull(tservers.find(servers, "localhost:4321"));
+ assertEquals(server1.instance, tservers.find(servers, "localhost:1234[5555]"));
+ assertNull(tservers.find(servers, "localhost:1234[55755]"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
new file mode 100644
index 0000000..9685b28
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.zookeeper.data.Stat;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+public class AdminTest {
+
+ @Test
+ public void testZooKeeperTserverPath() {
+ Instance instance = EasyMock.createMock(Instance.class);
+ String instanceId = UUID.randomUUID().toString();
+
+ EasyMock.expect(instance.getInstanceID()).andReturn(instanceId);
+
+ EasyMock.replay(instance);
+
+ assertEquals(Constants.ZROOT + "/" + instanceId + Constants.ZTSERVERS, Admin.getTServersZkPath(instance));
+
+ EasyMock.verify(instance);
+ }
+
+ @Test
+ public void testQualifySessionId() {
+ ZooCache zc = EasyMock.createMock(ZooCache.class);
+
+ String root = "/accumulo/id/tservers";
+ String server = "localhost:12345";
+ final long session = 123456789l;
+
+ String serverPath = root + "/" + server;
+ EasyMock.expect(zc.getChildren(serverPath)).andReturn(Collections.singletonList("child"));
+ EasyMock.expect(zc.get(EasyMock.eq(serverPath + "/child"), EasyMock.anyObject(Stat.class))).andAnswer(new IAnswer<byte[]>() {
+
+ @Override
+ public byte[] answer() throws Throwable {
+ Stat stat = (Stat) EasyMock.getCurrentArguments()[1];
+ stat.setEphemeralOwner(session);
+ return new byte[0];
+ }
+
+ });
+
+ EasyMock.replay(zc);
+
+ assertEquals(server + "[" + session + "]", Admin.qualifyWithZooKeeperSessionId(root, zc, server));
+
+ EasyMock.verify(zc);
+ }
+
+ @Test
+ public void testCannotQualifySessionId() {
+ ZooCache zc = EasyMock.createMock(ZooCache.class);
+
+ String root = "/accumulo/id/tservers";
+ String server = "localhost:12345";
+
+ String serverPath = root + "/" + server;
+ EasyMock.expect(zc.getChildren(serverPath)).andReturn(Collections.<String> emptyList());
+
+ EasyMock.replay(zc);
+
+ // A server that isn't in ZooKeeper. Can't qualify it, should return the original
+ assertEquals(server, Admin.qualifyWithZooKeeperSessionId(root, zc, server));
+
+ EasyMock.verify(zc);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
index 592d9ae..e65dcec 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
@@ -295,9 +295,14 @@ class MasterClientServiceHandler extends FateServiceHandler implements MasterCli
}
long tid = master.fate.startTransaction();
+
+ log.debug("Seeding FATE op to shutdown " + tabletServer + " with tid " + tid);
+
master.fate.seedTransaction(tid, new TraceRepo<Master>(new ShutdownTServer(doomed, force)), false);
master.fate.waitForCompletion(tid);
master.fate.delete(tid);
+
+ log.debug("FATE op shutting down " + tabletServer + " finished");
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java b/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
index 5330197..11cd91b 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
@@ -77,7 +77,7 @@ public class ShutdownTServer extends MasterRepo {
log.error("Error talking to tablet server " + server + ": " + ex);
}
- // If the connection was non-null and we could coomunicate with it
+ // If the connection was non-null and we could communicate with it
// give the master some more time to tell it to stop and for the
// tserver to ack the request and stop itself.
return 1000;