You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/06/04 03:01:03 UTC

[3/5] accumulo git commit: ACCUMULO-3887 Support optional sessionId in tserver stop.

ACCUMULO-3887 Support optional sessionId in tserver stop.

Also added more debuggin inside the master around the FATE
shutdownTServer op. New unit tests as well.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c5aa060e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c5aa060e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c5aa060e

Branch: refs/heads/master
Commit: c5aa060ee90fe5fa1fe9923973394d4b204f7f62
Parents: d764d27
Author: Josh Elser <el...@apache.org>
Authored: Wed Jun 3 20:08:46 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Jun 3 21:00:06 2015 -0400

----------------------------------------------------------------------
 .../accumulo/server/master/LiveTServerSet.java  | 27 +++++-
 .../org/apache/accumulo/server/util/Admin.java  | 45 +++++++++-
 .../server/master/LiveTServerSetTest.java       | 52 +++++++++++
 .../apache/accumulo/server/util/AdminTest.java  | 94 ++++++++++++++++++++
 .../master/MasterClientServiceHandler.java      |  5 ++
 .../master/tserverOps/ShutdownTServer.java      |  2 +-
 6 files changed, 219 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
index 0ea6b41..0c0cceb 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
@@ -367,10 +367,29 @@ public class LiveTServerSet implements Watcher {
   }
 
   public synchronized TServerInstance find(String tabletServer) {
-    HostAndPort addr = AddressUtil.parseAddress(tabletServer, false);
-    for (Entry<String,TServerInfo> entry : current.entrySet()) {
-      if (entry.getValue().instance.getLocation().equals(addr))
-        return entry.getValue().instance;
+    return find(current, tabletServer);
+  }
+
+  TServerInstance find(Map<String,TServerInfo> servers, String tabletServer) {
+    HostAndPort addr;
+    String sessionId = null;
+    if (']' == tabletServer.charAt(tabletServer.length() - 1)) {
+      int index = tabletServer.indexOf('[');
+      if (-1 == index) {
+        throw new IllegalArgumentException("Could not parse tabletserver '" + tabletServer + "'");
+      }
+      addr = AddressUtil.parseAddress(tabletServer.substring(0, index), false);
+      // Strip off the last bracket
+      sessionId = tabletServer.substring(index + 1, tabletServer.length() - 1);
+    } else {
+      addr = AddressUtil.parseAddress(tabletServer, false);
+    }
+    for (Entry<String,TServerInfo> entry : servers.entrySet()) {
+      if (entry.getValue().instance.getLocation().equals(addr)) {
+        // Return the instance if we have no desired session ID, or we match the desired session ID
+        if (null == sessionId || sessionId.equals(entry.getValue().instance.getSession()))
+          return entry.getValue().instance;
+      }
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index bdbc7f0..c7f9739 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -31,6 +31,7 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
@@ -54,12 +55,17 @@ import org.apache.accumulo.core.security.SystemPermission;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.cli.ClientOpts;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.security.SecurityUtil;
 import org.apache.accumulo.start.spi.KeywordExecutable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +73,7 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import com.google.auto.service.AutoService;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.net.HostAndPort;
 
@@ -362,9 +369,12 @@ public class Admin implements KeywordExecutable {
       log.info("No masters running. Not attempting safe unload of tserver.");
       return;
     }
+    final Instance instance = context.getInstance();
+    final String zTServerRoot = getTServersZkPath(instance);
+    final ZooCache zc = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
     for (String server : servers) {
       HostAndPort address = AddressUtil.parseAddress(server, context.getConfiguration().getPort(Property.TSERV_CLIENTPORT));
-      final String finalServer = address.toString();
+      final String finalServer = qualifyWithZooKeeperSessionId(zTServerRoot, zc, address.toString());
       log.info("Stopping server " + finalServer);
       MasterClient.execute(context, new ClientExec<MasterClientService.Client>() {
         @Override
@@ -375,6 +385,39 @@ public class Admin implements KeywordExecutable {
     }
   }
 
+  /**
+   * Get the parent ZNode for tservers for the given instance
+   *
+   * @param instance
+   *          The Instance
+   * @return The tservers znode for the instance
+   */
+  static String getTServersZkPath(Instance instance) {
+    Preconditions.checkNotNull(instance);
+    final String instanceRoot = ZooUtil.getRoot(instance);
+    return instanceRoot + Constants.ZTSERVERS;
+  }
+
+  /**
+   * Look up the TabletServers in ZooKeeper and try to find a sessionID for this server reference
+   *
+   * @param hostAndPort
+   *          The host and port for a TabletServer
+   * @return The host and port with the session ID in square-brackets appended, or the original value.
+   */
+  static String qualifyWithZooKeeperSessionId(String zTServerRoot, ZooCache zooCache, String hostAndPort) {
+    try {
+      long sessionId = ZooLock.getSessionId(zooCache, zTServerRoot + "/" + hostAndPort);
+      if (0 == sessionId) {
+        return hostAndPort;
+      }
+      return hostAndPort + "[" + sessionId + "]";
+    } catch (InterruptedException | KeeperException e) {
+      log.warn("Failed to communicate with ZooKeeper to find session ID for TabletServer.");
+      return hostAndPort;
+    }
+  }
+
   private static final String ACCUMULO_SITE_BACKUP_FILE = "accumulo-site.xml.bak";
   private static final String NS_FILE_SUFFIX = "_ns.cfg";
   private static final String USER_FILE_SUFFIX = "_user.cfg";

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/base/src/test/java/org/apache/accumulo/server/master/LiveTServerSetTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/LiveTServerSetTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/LiveTServerSetTest.java
new file mode 100644
index 0000000..bcdb832
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/LiveTServerSetTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.server.master.LiveTServerSet.Listener;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerInfo;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+public class LiveTServerSetTest {
+
+  @Test
+  public void testSessionIds() {
+    Map<String,TServerInfo> servers = new HashMap<>();
+    TServerConnection mockConn = EasyMock.createMock(TServerConnection.class);
+
+    TServerInfo server1 = new TServerInfo(new TServerInstance(HostAndPort.fromParts("localhost", 1234), "5555"), mockConn);
+    servers.put("server1", server1);
+
+    LiveTServerSet tservers = new LiveTServerSet(EasyMock.createMock(ClientContext.class), EasyMock.createMock(Listener.class));
+
+    assertEquals(server1.instance, tservers.find(servers, "localhost:1234"));
+    assertNull(tservers.find(servers, "localhost:4321"));
+    assertEquals(server1.instance, tservers.find(servers, "localhost:1234[5555]"));
+    assertNull(tservers.find(servers, "localhost:1234[55755]"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
new file mode 100644
index 0000000..9685b28
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.zookeeper.data.Stat;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+public class AdminTest {
+
+  @Test
+  public void testZooKeeperTserverPath() {
+    Instance instance = EasyMock.createMock(Instance.class);
+    String instanceId = UUID.randomUUID().toString();
+
+    EasyMock.expect(instance.getInstanceID()).andReturn(instanceId);
+
+    EasyMock.replay(instance);
+
+    assertEquals(Constants.ZROOT + "/" + instanceId + Constants.ZTSERVERS, Admin.getTServersZkPath(instance));
+
+    EasyMock.verify(instance);
+  }
+
+  @Test
+  public void testQualifySessionId() {
+    ZooCache zc = EasyMock.createMock(ZooCache.class);
+
+    String root = "/accumulo/id/tservers";
+    String server = "localhost:12345";
+    final long session = 123456789l;
+
+    String serverPath = root + "/" + server;
+    EasyMock.expect(zc.getChildren(serverPath)).andReturn(Collections.singletonList("child"));
+    EasyMock.expect(zc.get(EasyMock.eq(serverPath + "/child"), EasyMock.anyObject(Stat.class))).andAnswer(new IAnswer<byte[]>() {
+
+      @Override
+      public byte[] answer() throws Throwable {
+        Stat stat = (Stat) EasyMock.getCurrentArguments()[1];
+        stat.setEphemeralOwner(session);
+        return new byte[0];
+      }
+
+    });
+
+    EasyMock.replay(zc);
+
+    assertEquals(server + "[" + session + "]", Admin.qualifyWithZooKeeperSessionId(root, zc, server));
+
+    EasyMock.verify(zc);
+  }
+
+  @Test
+  public void testCannotQualifySessionId() {
+    ZooCache zc = EasyMock.createMock(ZooCache.class);
+
+    String root = "/accumulo/id/tservers";
+    String server = "localhost:12345";
+
+    String serverPath = root + "/" + server;
+    EasyMock.expect(zc.getChildren(serverPath)).andReturn(Collections.<String> emptyList());
+
+    EasyMock.replay(zc);
+
+    // A server that isn't in ZooKeeper. Can't qualify it, should return the original
+    assertEquals(server, Admin.qualifyWithZooKeeperSessionId(root, zc, server));
+
+    EasyMock.verify(zc);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
index 592d9ae..e65dcec 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
@@ -295,9 +295,14 @@ class MasterClientServiceHandler extends FateServiceHandler implements MasterCli
     }
 
     long tid = master.fate.startTransaction();
+
+    log.debug("Seeding FATE op to shutdown " + tabletServer + " with tid " + tid);
+
     master.fate.seedTransaction(tid, new TraceRepo<Master>(new ShutdownTServer(doomed, force)), false);
     master.fate.waitForCompletion(tid);
     master.fate.delete(tid);
+
+    log.debug("FATE op shutting down " + tabletServer + " finished");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java b/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
index 5330197..11cd91b 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java
@@ -77,7 +77,7 @@ public class ShutdownTServer extends MasterRepo {
           log.error("Error talking to tablet server " + server + ": " + ex);
         }
 
-        // If the connection was non-null and we could coomunicate with it
+        // If the connection was non-null and we could communicate with it
         // give the master some more time to tell it to stop and for the
         // tserver to ack the request and stop itself.
         return 1000;