You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2012/07/13 22:34:49 UTC
svn commit: r1361382 [14/16] - in /accumulo/branches/ACCUMULO-259: ./ bin/
conf/examples/1GB/native-standalone/ conf/examples/1GB/standalone/
conf/examples/2GB/native-standalone/ conf/examples/2GB/standalone/
conf/examples/3GB/native-standalone/ conf/e...
Added: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java?rev=1361382&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java (added)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java Fri Jul 13 20:34:44 2012
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+
+/**
+ *
+ */
+public class ZKPermHandler implements PermissionHandler {
+ private static final Logger log = Logger.getLogger(ZKAuthorizor.class);
+ private static PermissionHandler zkPermHandlerInstance = null;
+
+ private String ZKUserPath;
+ private final ZooCache zooCache;
+ private final String ZKUserSysPerms = "/System";
+ private final String ZKUserTablePerms = "/Tables";
+
+ public static synchronized PermissionHandler getInstance() {
+ if (zkPermHandlerInstance == null)
+ zkPermHandlerInstance = new ZKPermHandler();
+ return zkPermHandlerInstance;
+ }
+
+ public void initialize(String instanceId) {
+ ZKUserPath = ZKSecurityTool.getInstancePath(instanceId) + "/users";
+ }
+
+ public ZKPermHandler() {
+ zooCache = new ZooCache();
+ }
+
+ @Override
+ public boolean hasTablePermission(String user, String table, TablePermission permission) {
+ byte[] serializedPerms;
+ try {
+ serializedPerms = ZooReaderWriter.getRetryingInstance().getData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, null);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return false;
+ }
+ log.warn("Unhandled KeeperException, failing closed for table permission check", e);
+ return false;
+ } catch (InterruptedException e) {
+ log.warn("Unhandled InterruptedException, failing closed for table permission check", e);
+ return false;
+ }
+ if (serializedPerms != null) {
+ return ZKSecurityTool.convertTablePermissions(serializedPerms).contains(permission);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean hasCachedTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ byte[] serializedPerms = zooCache.get(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table);
+ if (serializedPerms != null) {
+ return ZKSecurityTool.convertTablePermissions(serializedPerms).contains(permission);
+ }
+ return false;
+ }
+
+ @Override
+ public void grantSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ try {
+ byte[] permBytes = zooCache.get(ZKUserPath + "/" + user + ZKUserSysPerms);
+ Set<SystemPermission> perms;
+ if (permBytes == null) {
+ perms = new TreeSet<SystemPermission>();
+ } else {
+ perms = ZKSecurityTool.convertSystemPermissions(permBytes);
+ }
+
+ if (perms.add(permission)) {
+ synchronized (zooCache) {
+ zooCache.clear();
+ ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(perms),
+ NodeExistsPolicy.OVERWRITE);
+ }
+ }
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void grantTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException {
+ Set<TablePermission> tablePerms;
+ byte[] serializedPerms = zooCache.get(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table);
+ if (serializedPerms != null)
+ tablePerms = ZKSecurityTool.convertTablePermissions(serializedPerms);
+ else
+ tablePerms = new TreeSet<TablePermission>();
+
+ try {
+ if (tablePerms.add(permission)) {
+ synchronized (zooCache) {
+ zooCache.clear(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table);
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, ZKSecurityTool.convertTablePermissions(tablePerms),
+ NodeExistsPolicy.OVERWRITE);
+ }
+ }
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void revokeSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ byte[] sysPermBytes = zooCache.get(ZKUserPath + "/" + user + ZKUserSysPerms);
+
+ // User had no system permission, nothing to revoke.
+ if (sysPermBytes == null)
+ return;
+
+ Set<SystemPermission> sysPerms = ZKSecurityTool.convertSystemPermissions(sysPermBytes);
+
+ try {
+ if (sysPerms.remove(permission)) {
+ synchronized (zooCache) {
+ zooCache.clear();
+ ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(sysPerms),
+ NodeExistsPolicy.OVERWRITE);
+ }
+ }
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void revokeTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException {
+ byte[] serializedPerms = zooCache.get(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table);
+
+ // User had no table permission, nothing to revoke.
+ if (serializedPerms == null)
+ return;
+
+ Set<TablePermission> tablePerms = ZKSecurityTool.convertTablePermissions(serializedPerms);
+ try {
+ if (tablePerms.remove(permission)) {
+ zooCache.clear();
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ if (tablePerms.size() == 0)
+ zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, NodeMissingPolicy.SKIP);
+ else
+ zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, ZKSecurityTool.convertTablePermissions(tablePerms),
+ NodeExistsPolicy.OVERWRITE);
+ }
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void cleanTablePermissions(String table) throws AccumuloSecurityException {
+ try {
+ synchronized (zooCache) {
+ zooCache.clear();
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ for (String user : zooCache.getChildren(ZKUserPath))
+ zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, NodeMissingPolicy.SKIP);
+ }
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException("unknownUser", SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void initializeSecurity(String rootuser) throws AccumuloSecurityException {
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+
+ // create the root user with all system privileges, no table privileges, and no record-level authorizations
+ Set<SystemPermission> rootPerms = new TreeSet<SystemPermission>();
+ for (SystemPermission p : SystemPermission.values())
+ rootPerms.add(p);
+ Map<String,Set<TablePermission>> tablePerms = new HashMap<String,Set<TablePermission>>();
+ // Allow the root user to flush the !METADATA table
+ tablePerms.put(Constants.METADATA_TABLE_ID, Collections.singleton(TablePermission.ALTER_TABLE));
+
+ try {
+ initUser(rootuser);
+ zoo.putPersistentData(ZKUserPath + "/" + rootuser + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(rootPerms), NodeExistsPolicy.FAIL);
+ for (Entry<String,Set<TablePermission>> entry : tablePerms.entrySet())
+ createTablePerm(rootuser, entry.getKey(), entry.getValue());
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @param user
+ * @throws AccumuloSecurityException
+ */
+ public void initUser(String user) throws AccumuloSecurityException {
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ try {
+ zoo.putPersistentData(ZKUserPath + "/" + user, new byte[0], NodeExistsPolicy.SKIP);
+ zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms, new byte[0], NodeExistsPolicy.SKIP);
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Sets up a new table configuration for the provided user/table. No checking for existence is done here, it should be done before calling.
+ */
+ private void createTablePerm(String user, String table, Set<TablePermission> perms) throws KeeperException, InterruptedException {
+ synchronized (zooCache) {
+ zooCache.clear();
+ ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table,
+ ZKSecurityTool.convertTablePermissions(perms), NodeExistsPolicy.FAIL);
+ }
+ }
+
+ @Override
+ public void dropUser(String user) throws AccumuloSecurityException {
+ try {
+ synchronized (zooCache) {
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserSysPerms, NodeMissingPolicy.SKIP);
+ zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms, NodeMissingPolicy.SKIP);
+ zooCache.clear(ZKUserPath + "/" + user);
+ }
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ } catch (KeeperException e) {
+ log.error(e, e);
+ if (e.code().equals(KeeperException.Code.NONODE))
+ throw new AccumuloSecurityException(user, SecurityErrorCode.USER_DOESNT_EXIST, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+
+ }
+ }
+
+ @Override
+ public boolean hasSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ byte[] perms;
+ try {
+ perms = ZooReaderWriter.getRetryingInstance().getData(ZKUserPath + "/" + user + ZKUserSysPerms, null);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return false;
+ }
+ log.warn("Unhandled KeeperException, failing closed for table permission check", e);
+ return false;
+ } catch (InterruptedException e) {
+ log.warn("Unhandled InterruptedException, failing closed for table permission check", e);
+ return false;
+ }
+
+ if (perms == null)
+ return false;
+ return ZKSecurityTool.convertSystemPermissions(perms).contains(permission);
+ }
+
+ @Override
+ public boolean hasCachedSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ byte[] perms = zooCache.get(ZKUserPath + "/" + user + ZKUserSysPerms);
+ if (perms == null)
+ return false;
+ return ZKSecurityTool.convertSystemPermissions(perms).contains(permission);
+ }
+
+ @Override
+ public boolean validSecurityHandlers(Authenticator authent, Authorizor author) {
+ return true;
+ }
+}
Added: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java?rev=1361382&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java (added)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java Fri Jul 13 20:34:44 2012
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.log4j.Logger;
+
+/**
+ * All the static too methods used for this class, so that we can separate out stuff that isn't using ZooKeeper. That way, we can check the synchronization
+ * model more easily, as we only need to check to make sure zooCache is cleared when things are written to ZooKeeper in methods that might use it. These
+ * won't, and so don't need to be checked.
+ */
+class ZKSecurityTool {
+ private static final Logger log = Logger.getLogger(ZKSecurityTool.class);
+ private static final int SALT_LENGTH = 8;
+
+ // Generates a byte array salt of length SALT_LENGTH
+ private static byte[] generateSalt() {
+ final SecureRandom random = new SecureRandom();
+ byte[] salt = new byte[SALT_LENGTH];
+ random.nextBytes(salt);
+ return salt;
+ }
+
+ private static byte[] hash(byte[] raw) throws NoSuchAlgorithmException {
+ MessageDigest md = MessageDigest.getInstance(Constants.PW_HASH_ALGORITHM);
+ md.update(raw);
+ return md.digest();
+ }
+
+ public static boolean checkPass(byte[] password, byte[] zkData) {
+ if (zkData == null)
+ return false;
+
+ byte[] salt = new byte[SALT_LENGTH];
+ System.arraycopy(zkData, 0, salt, 0, SALT_LENGTH);
+ byte[] passwordToCheck;
+ try {
+ passwordToCheck = convertPass(password, salt);
+ } catch (NoSuchAlgorithmException e) {
+ log.error("Count not create hashed password", e);
+ return false;
+ }
+ return java.util.Arrays.equals(passwordToCheck, zkData);
+ }
+
+ public static byte[] createPass(byte[] password) throws AccumuloException {
+ byte[] salt = generateSalt();
+ try {
+ return convertPass(password, salt);
+ } catch (NoSuchAlgorithmException e) {
+ log.error("Count not create hashed password", e);
+ throw new AccumuloException("Count not create hashed password", e);
+ }
+ }
+
+ private static byte[] convertPass(byte[] password, byte[] salt) throws NoSuchAlgorithmException {
+ byte[] plainSalt = new byte[password.length + SALT_LENGTH];
+ System.arraycopy(password, 0, plainSalt, 0, password.length);
+ System.arraycopy(salt, 0, plainSalt, password.length, SALT_LENGTH);
+ byte[] hashed = hash(plainSalt);
+ byte[] saltedHash = new byte[SALT_LENGTH + hashed.length];
+ System.arraycopy(salt, 0, saltedHash, 0, SALT_LENGTH);
+ System.arraycopy(hashed, 0, saltedHash, SALT_LENGTH, hashed.length);
+ return saltedHash; // contains salt+hash(password+salt)
+ }
+
+ public static Authorizations convertAuthorizations(byte[] authorizations) {
+ return new Authorizations(authorizations);
+ }
+
+ public static byte[] convertAuthorizations(Authorizations authorizations) {
+ return authorizations.getAuthorizationsArray();
+ }
+
+ public static byte[] convertSystemPermissions(Set<SystemPermission> systempermissions) {
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream(systempermissions.size());
+ DataOutputStream out = new DataOutputStream(bytes);
+ try {
+ for (SystemPermission sp : systempermissions)
+ out.writeByte(sp.getId());
+ } catch (IOException e) {
+ log.error(e, e);
+ throw new RuntimeException(e); // this is impossible with ByteArrayOutputStream; crash hard if this happens
+ }
+ return bytes.toByteArray();
+ }
+
+ public static Set<SystemPermission> convertSystemPermissions(byte[] systempermissions) {
+ ByteArrayInputStream bytes = new ByteArrayInputStream(systempermissions);
+ DataInputStream in = new DataInputStream(bytes);
+ Set<SystemPermission> toReturn = new HashSet<SystemPermission>();
+ try {
+ while (in.available() > 0)
+ toReturn.add(SystemPermission.getPermissionById(in.readByte()));
+ } catch (IOException e) {
+ log.error("User database is corrupt; error converting system permissions", e);
+ toReturn.clear();
+ }
+ return toReturn;
+ }
+
+ public static byte[] convertTablePermissions(Set<TablePermission> tablepermissions) {
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream(tablepermissions.size());
+ DataOutputStream out = new DataOutputStream(bytes);
+ try {
+ for (TablePermission tp : tablepermissions)
+ out.writeByte(tp.getId());
+ } catch (IOException e) {
+ log.error(e, e);
+ throw new RuntimeException(e); // this is impossible with ByteArrayOutputStream; crash hard if this happens
+ }
+ return bytes.toByteArray();
+ }
+
+ public static Set<TablePermission> convertTablePermissions(byte[] tablepermissions) {
+ Set<TablePermission> toReturn = new HashSet<TablePermission>();
+ for (byte b : tablepermissions)
+ toReturn.add(TablePermission.getPermissionById(b));
+ return toReturn;
+ }
+
+ /**
+ * @param instanceId
+ * @return
+ */
+ public static String getInstancePath(String instanceId) {
+ return Constants.ZROOT + "/" + instanceId;
+ }
+}
\ No newline at end of file
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Fri Jul 13 20:34:44 2012
@@ -53,6 +53,7 @@ import java.util.concurrent.Cancellation
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -115,6 +116,8 @@ import org.apache.accumulo.core.tabletse
import org.apache.accumulo.core.tabletserver.thrift.ScanState;
import org.apache.accumulo.core.tabletserver.thrift.ScanType;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.ByteBufferUtil;
@@ -124,6 +127,7 @@ import org.apache.accumulo.core.util.Dae
import org.apache.accumulo.core.util.LoggingRunnable;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.Stat;
import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
@@ -145,6 +149,7 @@ import org.apache.accumulo.server.master
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletLocationState;
import org.apache.accumulo.server.master.state.TabletStateStore;
+import org.apache.accumulo.server.master.state.ZooTabletStateStore;
import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
import org.apache.accumulo.server.problems.ProblemReport;
import org.apache.accumulo.server.problems.ProblemReports;
@@ -184,6 +189,7 @@ import org.apache.accumulo.server.util.T
import org.apache.accumulo.server.util.TServerUtils.ServerPort;
import org.apache.accumulo.server.util.time.RelativeTime;
import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.accumulo.server.zookeeper.ZooCache;
import org.apache.accumulo.server.zookeeper.ZooLock;
@@ -2539,6 +2545,8 @@ public class TabletServer extends Abstra
private TServer server;
+ private DistributedWorkQueue bulkFailedCopyQ;
+
private static final String METRICS_PREFIX = "tserver";
private static ObjectName OBJECT_NAME = null;
@@ -2587,12 +2595,12 @@ public class TabletServer extends Abstra
}
// Connect to the master for posting asynchronous results
- private MasterClientService.Iface masterConnection(String address) {
+ private MasterClientService.Client masterConnection(String address) {
try {
if (address == null) {
return null;
}
- MasterClientService.Iface client = ThriftUtil.getClient(new MasterClientService.Client.Factory(), address, Property.MASTER_CLIENTPORT,
+ MasterClientService.Client client = ThriftUtil.getClient(new MasterClientService.Client.Factory(), address, Property.MASTER_CLIENTPORT,
Property.GENERAL_RPC_TIMEOUT, getSystemConfiguration());
// log.info("Listener API to master has been opened");
return client;
@@ -2602,14 +2610,14 @@ public class TabletServer extends Abstra
return null;
}
- private void returnMasterConnection(MasterClientService.Iface client) {
+ private void returnMasterConnection(MasterClientService.Client client) {
ThriftUtil.returnClient(client);
}
private int startTabletClientService() throws UnknownHostException {
// start listening for client connection last
- TabletClientService.Iface tch = TraceWrap.service(new ThriftClientHandler());
- TabletClientService.Processor processor = new TabletClientService.Processor(tch);
+ Iface tch = TraceWrap.service(new ThriftClientHandler());
+ Processor<Iface> processor = new Processor<Iface>(tch);
int port = startServer(getSystemConfiguration(), Property.TSERV_CLIENTPORT, processor, "Thrift Client Server");
log.info("port = " + port);
return port;
@@ -2678,13 +2686,23 @@ public class TabletServer extends Abstra
}
clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort);
announceExistence();
+
+ ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue");
+
+ bulkFailedCopyQ = new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZBULK_FAILED_COPYQ);
+ try {
+ bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(), distWorkQThreadPool);
+ } catch (Exception e1) {
+ throw new RuntimeException("Failed to start distributed work queue for copying ", e1);
+ }
+
try {
- logSorter.startWatchingForRecoveryLogs(getClientAddressString());
+ logSorter.startWatchingForRecoveryLogs(distWorkQThreadPool);
} catch (Exception ex) {
log.error("Error setting watches for recoveries");
throw new RuntimeException(ex);
}
-
+
try {
OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName());
// Do this because interface not in same package.
@@ -2700,7 +2718,7 @@ public class TabletServer extends Abstra
// send all of the pending messages
try {
MasterMessage mm = null;
- MasterClientService.Iface iface = null;
+ MasterClientService.Client iface = null;
try {
// wait until a message is ready to send, or a sever stop
@@ -2797,11 +2815,21 @@ public class TabletServer extends Abstra
private long totalMinorCompactions;
public static SortedMap<KeyExtent,Text> verifyTabletInformation(KeyExtent extent, TServerInstance instance, SortedMap<Key,Value> tabletsKeyValues,
- String clientAddress, ZooLock lock) throws AccumuloSecurityException {
+ String clientAddress, ZooLock lock) throws AccumuloSecurityException, DistributedStoreException {
for (int tries = 0; tries < 3; tries++) {
try {
log.debug("verifying extent " + extent);
if (extent.isRootTablet()) {
+ ZooTabletStateStore store = new ZooTabletStateStore();
+ if (!store.iterator().hasNext()) {
+ log.warn("Illegal state: location is not set in zookeeper");
+ return null;
+ }
+ TabletLocationState next = store.iterator().next();
+ if (!instance.equals(next.future)) {
+ log.warn("Future location is not to this server for the root tablet");
+ return null;
+ }
TreeMap<KeyExtent,Text> set = new TreeMap<KeyExtent,Text>();
set.put(extent, new Text(Constants.ZROOT_TABLET));
return set;
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Fri Jul 13 20:34:44 2012
@@ -25,8 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Random;
-import java.util.TimerTask;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.Constants;
@@ -37,65 +35,84 @@ import org.apache.accumulo.core.master.t
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.logger.LogFileKey;
import org.apache.accumulo.server.logger.LogFileValue;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
/**
*
*/
public class LogSorter {
+
private static final Logger log = Logger.getLogger(LogSorter.class);
FileSystem fs;
AccumuloConfiguration conf;
- private Map<String,Work> currentWork = new HashMap<String,Work>();
+ private Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<String,LogProcessor>());
- class Work implements Runnable {
- final String name;
- FSDataInputStream input;
- final String destPath;
- long bytesCopied = -1;
- long sortStart = 0;
- long sortStop = -1;
- private final LogSortNotifier cback;
+ class LogProcessor implements Processor {
- synchronized long getBytesCopied() throws IOException {
- return input == null ? bytesCopied : input.getPos();
+ private FSDataInputStream input;
+ private long bytesCopied = -1;
+ private long sortStart = 0;
+ private long sortStop = -1;
+
+ @Override
+ public Processor newProcessor() {
+ return new LogProcessor();
+ }
+
+ @Override
+ public void process(String child, byte[] data) {
+ String dest = Constants.getRecoveryDir(conf) + "/" + child;
+ String src = new String(data);
+ String name = new Path(src).getName();
+
+ synchronized (currentWork) {
+ if (currentWork.containsKey(name))
+ return;
+ currentWork.put(name, this);
+ }
+
+ try {
+ log.info("Copying " + src + " to " + dest);
+ sort(name, new Path(src), dest);
+ } finally {
+ currentWork.remove(name);
+ }
+
}
- Work(String name, FSDataInputStream input, String destPath, LogSortNotifier cback) {
- this.name = name;
- this.input = input;
- this.destPath = destPath;
- this.cback = cback;
- }
- synchronized boolean finished() {
- return input == null;
- }
- public void run() {
- sortStart = System.currentTimeMillis();
+ public void sort(String name, Path srcPath, String destPath) {
+
+ synchronized (this) {
+ sortStart = System.currentTimeMillis();
+ }
+
String formerThreadName = Thread.currentThread().getName();
int part = 0;
try {
+
+ // the following call does not throw an exception if the file/dir does not exist
+ fs.delete(new Path(destPath), true);
+
+ FSDataInputStream tmpInput = fs.open(srcPath);
+ synchronized (this) {
+ this.input = tmpInput;
+ }
+
final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
Thread.currentThread().setName("Sorting " + name + " for recovery");
while (true) {
- final ArrayList<Pair<LogFileKey, LogFileValue>> buffer = new ArrayList<Pair<LogFileKey, LogFileValue>>();
+ final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<Pair<LogFileKey,LogFileValue>>();
try {
long start = input.getPos();
while (input.getPos() - start < bufferSize) {
@@ -103,29 +120,26 @@ public class LogSorter {
LogFileValue value = new LogFileValue();
key.readFields(input);
value.readFields(input);
- buffer.add(new Pair<LogFileKey, LogFileValue>(key, value));
+ buffer.add(new Pair<LogFileKey,LogFileValue>(key, value));
}
- writeBuffer(buffer, part++);
+ writeBuffer(destPath, buffer, part++);
buffer.clear();
} catch (EOFException ex) {
- writeBuffer(buffer, part++);
+ writeBuffer(destPath, buffer, part++);
break;
}
}
fs.create(new Path(destPath, "finished")).close();
- log.debug("Log copy/sort of " + name + " complete");
+ log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part + " parts in " + getSortTime() + "ms");
} catch (Throwable t) {
try {
+ // parent dir may not exist
+ fs.mkdirs(new Path(destPath));
fs.create(new Path(destPath, "failed")).close();
} catch (IOException e) {
log.error("Error creating failed flag file " + name, e);
}
log.error(t, t);
- try {
- cback.notice(name, getBytesCopied(), part, getSortTime(), t.toString());
- } catch (Exception ex) {
- log.error("Strange error notifying the master of a logSort problem for file " + name);
- }
} finally {
Thread.currentThread().setName(formerThreadName);
try {
@@ -133,19 +147,13 @@ public class LogSorter {
} catch (IOException e) {
log.error("Error during cleanup sort/copy " + name, e);
}
- sortStop = System.currentTimeMillis();
- synchronized (currentWork) {
- currentWork.remove(name);
- }
- try {
- cback.notice(name, getBytesCopied(), part, getSortTime(), "");
- } catch (Exception ex) {
- log.error("Strange error reporting successful log sort " + name, ex);
+ synchronized (this) {
+ sortStop = System.currentTimeMillis();
}
}
}
- private void writeBuffer(ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+ private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
String path = destPath + String.format("/part-r-%05d", part++);
MapFile.Writer output = new MapFile.Writer(fs.getConf(), fs, path, LogFileKey.class, LogFileValue.class);
try {
@@ -162,7 +170,7 @@ public class LogSorter {
output.close();
}
}
-
+
synchronized void close() throws IOException {
bytesCopied = input.getPos();
input.close();
@@ -177,9 +185,13 @@ public class LogSorter {
}
return 0;
}
- };
+
+ synchronized long getBytesCopied() throws IOException {
+ return input == null ? bytesCopied : input.getPos();
+ }
+ }
- final ThreadPoolExecutor threadPool;
+ ThreadPoolExecutor threadPool;
private Instance instance;
public LogSorter(Instance instance, FileSystem fs, AccumuloConfiguration conf) {
@@ -189,132 +201,16 @@ public class LogSorter {
int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
}
-
- public void startWatchingForRecoveryLogs(final String serverName) throws KeeperException, InterruptedException {
- final String path = ZooUtil.getRoot(instance) + Constants.ZRECOVERY;
- final ZooReaderWriter zoo = ZooReaderWriter.getInstance();
- zoo.mkdirs(path);
- List<String> children = zoo.getChildren(path, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- switch (event.getType()) {
- case NodeChildrenChanged:
- if (event.getPath().equals(path))
- try {
- attemptRecoveries(zoo, serverName, path, zoo.getChildren(path, this));
- } catch (KeeperException e) {
- log.error("Unable to get recovery information", e);
- } catch (InterruptedException e) {
- log.info("Interrupted getting recovery information", e);
- }
- else
- log.info("Unexpected path for NodeChildrenChanged event " + event.getPath());
- break;
- case NodeCreated:
- case NodeDataChanged:
- case NodeDeleted:
- case None:
- log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path);
- break;
-
- }
- }
- });
- attemptRecoveries(zoo, serverName, path, children);
- Random r = new Random();
- // Add a little jitter to avoid all the tservers slamming zookeeper at once
- SimpleTimer.getInstance().schedule(new TimerTask() {
- @Override
- public void run() {
- try {
- attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
- } catch (KeeperException e) {
- log.error("Unable to get recovery information", e);
- } catch (InterruptedException e) {
- log.info("Interrupted getting recovery information", e);
- }
- }
- }, r.nextInt(1000), 60 * 1000);
- }
-
- private void attemptRecoveries(final ZooReaderWriter zoo, final String serverName, final String path, List<String> children) {
- if (children.size() == 0)
- return;
-
- if (threadPool.getQueue().size() > 1)
- return;
- log.debug("Zookeeper references " + children.size() + " recoveries, attempting locks");
- Random random = new Random();
- Collections.shuffle(children, random);
- try {
- for (String child : children) {
- final String childPath = path + "/" + child;
- log.debug("Attempting to lock " + child);
- ZooLock lock = new ZooLock(childPath);
- if (lock.tryLock(new LockWatcher() {
- @Override
- public void lostLock(LockLossReason reason) {
- log.info("Ignoring lost lock event, reason " + reason);
- }
- }, serverName.getBytes())) {
- // Great... we got the lock, but maybe we're too busy
- if (threadPool.getQueue().size() > 1) {
- lock.unlock();
- log.debug("got the lock, but thread pool is busy; released the lock on " + child);
- break;
- }
- log.debug("got lock for " + child);
- byte[] contents = zoo.getData(childPath, null);
- String destination = Constants.getRecoveryDir(conf) + "/" + child;
- startSort(new String(contents), destination, new LogSortNotifier() {
- @Override
- public void notice(String name, long bytes, int parts, long milliseconds, String error) {
- log.info("Finished log sort " + name + " " + bytes + " bytes " + parts + " parts in " + milliseconds + "ms");
- try {
- zoo.recursiveDelete(childPath, NodeMissingPolicy.SKIP);
- } catch (Exception e) {
- log.error("Error received when trying to delete recovery entry in zookeeper " + childPath);
- }
- try {
- attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
- } catch (KeeperException e) {
- log.error("Unable to get recovery information", e);
- } catch (InterruptedException e) {
- log.info("Interrupted getting recovery information", e);
- }
- }
- });
- } else {
- log.debug("failed to get the lock " + child);
- }
- }
- } catch (Throwable t) {
- log.error("Unexpected error", t);
- }
- }
-
- public interface LogSortNotifier {
- public void notice(String name, long bytes, int parts, long milliseconds, String error);
- }
-
- private void startSort(String src, String dest, LogSortNotifier cback) throws IOException {
- log.info("Copying " + src + " to " + dest);
- fs.delete(new Path(dest), true);
- Path srcPath = new Path(src);
- synchronized (currentWork) {
- Work work = new Work(srcPath.getName(), fs.open(srcPath), dest, cback);
- if (!currentWork.containsKey(srcPath.getName())) {
- threadPool.execute(work);
- currentWork.put(srcPath.getName(), work);
- }
- }
+ public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException {
+ this.threadPool = distWorkQThreadPool;
+ new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZRECOVERY).startProcessing(new LogProcessor(), this.threadPool);
}
public List<RecoveryStatus> getLogSorts() {
List<RecoveryStatus> result = new ArrayList<RecoveryStatus>();
synchronized (currentWork) {
- for (Entry<String,Work> entries : currentWork.entrySet()) {
+ for (Entry<String,LogProcessor> entries : currentWork.entrySet()) {
RecoveryStatus status = new RecoveryStatus();
status.name = entries.getKey();
try {
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/SplitReportMessage.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/SplitReportMessage.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/SplitReportMessage.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/SplitReportMessage.java Fri Jul 13 20:34:44 2012
@@ -19,6 +19,7 @@ package org.apache.accumulo.server.table
import java.util.Map;
import java.util.TreeMap;
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
import org.apache.accumulo.core.client.impl.Translator;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -48,7 +49,7 @@ public class SplitReportMessage implemen
TabletSplit split = new TabletSplit();
split.oldTablet = old_extent.toThrift();
split.newTablets = Translator.translate(extents.keySet(), Translator.KET);
- client.reportSplitExtent(null, credentials, serverName, split);
+ client.reportSplitExtent(Tracer.traceInfo(), credentials, serverName, split);
}
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/TabletStatusMessage.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/TabletStatusMessage.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/TabletStatusMessage.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/mastermessage/TabletStatusMessage.java Fri Jul 13 20:34:44 2012
@@ -16,6 +16,7 @@
*/
package org.apache.accumulo.server.tabletserver.mastermessage;
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.master.thrift.TabletLoadState;
import org.apache.accumulo.core.master.thrift.MasterClientService.Iface;
@@ -34,6 +35,6 @@ public class TabletStatusMessage impleme
}
public void send(AuthInfo auth, String serverName, Iface client) throws TException, ThriftSecurityException {
- client.reportTabletStatus(null, auth, serverName, status, extent.toThrift());
+ client.reportTabletStatus(Tracer.traceInfo(), auth, serverName, status, extent.toThrift());
}
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java Fri Jul 13 20:34:44 2012
@@ -19,6 +19,7 @@ package org.apache.accumulo.server.test;
import java.io.IOException;
import java.util.Map.Entry;
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
import org.apache.accumulo.core.client.impl.MasterClient;
import org.apache.accumulo.core.master.MasterNotRunningException;
import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -43,7 +44,7 @@ public class GetMasterStats {
MasterMonitorInfo stats = null;
try {
client = MasterClient.getConnectionWithRetry(HdfsZooInstance.getInstance());
- stats = client.getMasterStats(null, SecurityConstants.getSystemCredentials());
+ stats = client.getMasterStats(Tracer.traceInfo(), SecurityConstants.getSystemCredentials());
} finally {
if (client != null)
MasterClient.close(client);
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/WrongTabletTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/WrongTabletTest.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/WrongTabletTest.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/WrongTabletTest.java Fri Jul 13 20:34:44 2012
@@ -18,6 +18,7 @@ package org.apache.accumulo.server.test;
import java.nio.ByteBuffer;
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.thrift.AuthInfo;
@@ -39,7 +40,7 @@ public class WrongTabletTest {
Mutation mutation = new Mutation(new Text("row_0003750001"));
// mutation.set(new Text("colf:colq"), new Value("val".getBytes()));
mutation.putDelete(new Text("colf"), new Text("colq"));
- client.update(null, rootCredentials, new KeyExtent(new Text("test_ingest"), null, new Text("row_0003750000")).toThrift(), mutation.toThrift());
+ client.update(Tracer.traceInfo(), rootCredentials, new KeyExtent(new Text("test_ingest"), null, new Text("row_0003750000")).toThrift(), mutation.toThrift());
} catch (Exception e) {
throw new RuntimeException(e);
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousStatsCollector.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousStatsCollector.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousStatsCollector.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousStatsCollector.java Fri Jul 13 20:34:44 2012
@@ -22,6 +22,7 @@ import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
@@ -126,7 +127,7 @@ public class ContinuousStatsCollector {
MasterClientService.Iface client = null;
try {
client = MasterClient.getConnectionWithRetry(HdfsZooInstance.getInstance());
- MasterMonitorInfo stats = client.getMasterStats(null, SecurityConstants.getSystemCredentials());
+ MasterMonitorInfo stats = client.getMasterStats(Tracer.traceInfo(), SecurityConstants.getSystemCredentials());
TableInfo all = new TableInfo();
Map<String,TableInfo> tableSummaries = new HashMap<String,TableInfo>();
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/continuous/UndefinedAnalyzer.java Fri Jul 13 20:34:44 2012
@@ -80,46 +80,46 @@ public class UndefinedAnalyzer {
private void parseLog(File log) throws Exception {
BufferedReader reader = new BufferedReader(new FileReader(log));
-
String line;
TreeMap<Long,Long> tm = null;
-
- while ((line = reader.readLine()) != null) {
- if (!line.startsWith("UUID"))
- continue;
- String[] tokens = line.split("\\s");
- String time = tokens[1];
- String uuid = tokens[2];
-
- if (flushes.containsKey(uuid)) {
- System.err.println("WARN Duplicate uuid " + log);
+ try {
+ while ((line = reader.readLine()) != null) {
+ if (!line.startsWith("UUID"))
+ continue;
+ String[] tokens = line.split("\\s");
+ String time = tokens[1];
+ String uuid = tokens[2];
+
+ if (flushes.containsKey(uuid)) {
+ System.err.println("WARN Duplicate uuid " + log);
+ return;
+ }
+
+ tm = new TreeMap<Long,Long>(Collections.reverseOrder());
+ tm.put(0l, Long.parseLong(time));
+ flushes.put(uuid, tm);
+ break;
+
+ }
+ if (tm == null) {
+ System.err.println("WARN Bad ingest log " + log);
return;
}
- tm = new TreeMap<Long,Long>(Collections.reverseOrder());
- tm.put(0l, Long.parseLong(time));
- flushes.put(uuid, tm);
- break;
-
- }
-
- if (tm == null) {
- System.err.println("WARN Bad ingest log " + log);
- return;
- }
-
- while ((line = reader.readLine()) != null) {
- String[] tokens = line.split("\\s");
-
- if (!tokens[0].equals("FLUSH"))
- continue;
-
- String time = tokens[1];
- String count = tokens[4];
-
- tm.put(Long.parseLong(count), Long.parseLong(time));
+ while ((line = reader.readLine()) != null) {
+ String[] tokens = line.split("\\s");
+
+ if (!tokens[0].equals("FLUSH"))
+ continue;
+
+ String time = tokens[1];
+ String count = tokens[4];
+
+ tm.put(Long.parseLong(count), Long.parseLong(time));
+ }
+ } finally {
+ reader.close();
}
-
}
Iterator<Long> getTimes(String uuid, long count) {
@@ -172,45 +172,49 @@ public class UndefinedAnalyzer {
BufferedReader reader = new BufferedReader(new FileReader(masterLog));
String line;
- while ((line = reader.readLine()) != null) {
- if (line.contains("TABLET_LOADED")) {
- String[] tokens = line.split("\\s+");
- String tablet = tokens[8];
- String server = tokens[10];
-
- int pos1 = -1;
- int pos2 = -1;
- int pos3 = -1;
-
- for (int i = 0; i < tablet.length(); i++) {
- if (tablet.charAt(i) == '<' || tablet.charAt(i) == ';') {
- if (pos1 == -1) {
- pos1 = i;
- } else if (pos2 == -1) {
- pos2 = i;
- } else {
- pos3 = i;
+ try {
+ while ((line = reader.readLine()) != null) {
+ if (line.contains("TABLET_LOADED")) {
+ String[] tokens = line.split("\\s+");
+ String tablet = tokens[8];
+ String server = tokens[10];
+
+ int pos1 = -1;
+ int pos2 = -1;
+ int pos3 = -1;
+
+ for (int i = 0; i < tablet.length(); i++) {
+ if (tablet.charAt(i) == '<' || tablet.charAt(i) == ';') {
+ if (pos1 == -1) {
+ pos1 = i;
+ } else if (pos2 == -1) {
+ pos2 = i;
+ } else {
+ pos3 = i;
+ }
}
}
- }
-
- if (pos1 > 0 && pos2 > 0 && pos3 == -1) {
- String tid = tablet.substring(0, pos1);
- String endRow = tablet.charAt(pos1) == '<' ? "8000000000000000" : tablet.substring(pos1 + 1, pos2);
- String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2 + 1);
- if (tid.equals(tableId)) {
- // System.out.println(" "+server+" "+tid+" "+endRow+" "+prevEndRow);
- Date date = sdf.parse(tokens[0] + " " + tokens[1] + " " + currentYear + " " + currentMonth);
- // System.out.println(" "+date);
-
- assignments.add(new TabletAssignment(tablet, endRow, prevEndRow, server, date.getTime()));
-
+
+ if (pos1 > 0 && pos2 > 0 && pos3 == -1) {
+ String tid = tablet.substring(0, pos1);
+ String endRow = tablet.charAt(pos1) == '<' ? "8000000000000000" : tablet.substring(pos1 + 1, pos2);
+ String prevEndRow = tablet.charAt(pos2) == '<' ? "" : tablet.substring(pos2 + 1);
+ if (tid.equals(tableId)) {
+ // System.out.println(" "+server+" "+tid+" "+endRow+" "+prevEndRow);
+ Date date = sdf.parse(tokens[0] + " " + tokens[1] + " " + currentYear + " " + currentMonth);
+ // System.out.println(" "+date);
+
+ assignments.add(new TabletAssignment(tablet, endRow, prevEndRow, server, date.getTime()));
+
+ }
+ } else if (!tablet.startsWith("!0")) {
+ System.err.println("Cannot parse tablet " + tablet);
}
- } else if (!tablet.startsWith("!0")) {
- System.err.println("Cannot parse tablet " + tablet);
+
}
-
}
+ } finally {
+ reader.close();
}
}
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestReader.java Fri Jul 13 20:34:44 2012
@@ -70,6 +70,7 @@ public class CacheTestReader {
oos.writeObject(readData);
+ fos.close();
oos.close();
UtilWaitThread.sleep(20);
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/CacheTestWriter.java Fri Jul 13 20:34:44 2012
@@ -132,6 +132,7 @@ public class CacheTestWriter {
@SuppressWarnings("unchecked")
Map<String,String> readerMap = (Map<String,String>) ois.readObject();
+ fis.close();
ois.close();
System.out.println("read " + readerMap);
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java Fri Jul 13 20:34:44 2012
@@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Random;
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
import org.apache.accumulo.cloudtrace.thrift.TInfo;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
@@ -29,6 +30,8 @@ import org.apache.accumulo.core.master.t
import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
@@ -96,7 +99,7 @@ public class ZombieTServer {
TransactionWatcher watcher = new TransactionWatcher();
final ThriftClientHandler tch = new ThriftClientHandler(instance, watcher);
- TabletClientService.Processor processor = new TabletClientService.Processor(tch);
+ Processor<Iface> processor = new Processor<Iface>(tch);
ServerPort serverPort = TServerUtils.startTServer(port, processor, "ZombieTServer", "walking dead", 2, 1000);
InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost(), serverPort.port);
@@ -111,7 +114,7 @@ public class ZombieTServer {
@Override
public void lostLock(final LockLossReason reason) {
try {
- tch.halt(null, null, null);
+ tch.halt(Tracer.traceInfo(), null, null);
} catch (Exception ex) {
log.error(ex, ex);
System.exit(1);
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java Fri Jul 13 20:34:44 2012
@@ -47,6 +47,8 @@ import org.apache.accumulo.core.security
import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.client.ClientServiceHandler;
@@ -205,7 +207,7 @@ public class NullTserver {
TransactionWatcher watcher = new TransactionWatcher();
ThriftClientHandler tch = new ThriftClientHandler(HdfsZooInstance.getInstance(), watcher);
- TabletClientService.Processor processor = new TabletClientService.Processor(tch);
+ Processor<Iface> processor = new Processor<Iface>(tch);
TServerUtils.startTServer(port, processor, "NullTServer", "null tserver", 2, 1000);
InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost(), port);
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Framework.java Fri Jul 13 20:34:44 2012
@@ -103,7 +103,9 @@ public class Framework {
String module = args[3];
Properties props = new Properties();
- props.load(new FileInputStream(configDir + "/randomwalk.conf"));
+ FileInputStream fis = new FileInputStream(configDir + "/randomwalk.conf");
+ props.load(fis);
+ fis.close();
System.setProperty("localLog", localLogPath + "/" + logId);
System.setProperty("nfsLog", props.getProperty("NFS_LOGPATH") + "/" + logId);
Copied: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StartAll.java (from r1359316, accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StartAll.java)
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StartAll.java?p2=accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StartAll.java&p1=accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StartAll.java&r1=1359316&r2=1361382&rev=1361382&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StartAll.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StartAll.java Fri Jul 13 20:34:44 2012
@@ -10,7 +10,8 @@ public class StartAll extends Test {
@Override
public void visit(State state, Properties props) throws Exception {
log.info("Starting all servers");
- Runtime.getRuntime().exec(new String[]{System.getenv().get("ACCUMULO_HOME") + "/bin/start-all.sh"});
+ Process exec = Runtime.getRuntime().exec(new String[]{System.getenv().get("ACCUMULO_HOME") + "/bin/start-all.sh"});
+ exec.waitFor();
}
}
Copied: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StopTabletServer.java (from r1359316, accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StopTabletServer.java)
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StopTabletServer.java?p2=accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StopTabletServer.java&p1=accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StopTabletServer.java&r1=1359316&r2=1361382&rev=1361382&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StopTabletServer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/StopTabletServer.java Fri Jul 13 20:34:44 2012
@@ -2,36 +2,63 @@ package org.apache.accumulo.server.test.
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
-import org.apache.accumulo.core.conf.DefaultConfiguration;
-import org.apache.accumulo.server.master.LiveTServerSet;
-import org.apache.accumulo.server.master.LiveTServerSet.Listener;
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.test.randomwalk.State;
import org.apache.accumulo.server.test.randomwalk.Test;
+import org.apache.accumulo.server.util.AddressUtil;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
public class StopTabletServer extends Test {
+ Set<TServerInstance> getTServers(Instance instance) throws KeeperException, InterruptedException {
+ Set<TServerInstance> result = new HashSet<TServerInstance>();
+ ZooReader rdr = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
+ String base = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+ for (String child : rdr.getChildren(base)) {
+ try {
+ List<String> children = rdr.getChildren(base + "/" + child);
+ if (children.size() > 0) {
+ Collections.sort(children);
+ Stat stat = new Stat();
+ byte[] data = rdr.getData(base + "/" + child + "/" + children.get(0), stat);
+ if (!"master".equals(new String(data))) {
+ result.add(new TServerInstance(AddressUtil.parseAddress(child, Property.TSERV_CLIENTPORT), stat.getEphemeralOwner()));
+ }
+ }
+ } catch (KeeperException.NoNodeException ex) {
+ // someone beat us too it
+ }
+ }
+ return result;
+ }
+
@Override
public void visit(State state, Properties props) throws Exception {
- LiveTServerSet set = new LiveTServerSet(state.getInstance(), DefaultConfiguration.getDefaultConfiguration(), new Listener() {
- @Override
- public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
- log.info("Tablet server set changed: " + deleted + " deleted and " + added + " added");
- }
- });
- List<TServerInstance> currentServers = new ArrayList<TServerInstance>(set.getCurrentServers());
+ Instance instance = state.getInstance();
+
+ List<TServerInstance> currentServers = new ArrayList<TServerInstance>(getTServers(instance));
Collections.shuffle(currentServers);
Runtime runtime = Runtime.getRuntime();
if (currentServers.size() > 1) {
TServerInstance victim = currentServers.get(0);
log.info("Stopping " + victim.hostPort());
- runtime.exec(new String[] {System.getenv("ACCUMULO_HOME") + "/bin/accumulo", "admin", "stop", victim.hostPort()});
- if (set.getCurrentServers().contains(victim))
+ Process exec = runtime.exec(new String[] {System.getenv("ACCUMULO_HOME") + "/bin/accumulo", "admin", "stop", victim.hostPort()});
+ if (exec.waitFor() != 0)
+ throw new RuntimeException("admin stop returned a non-zero response: " + exec.exitValue());
+ Set<TServerInstance> set = getTServers(instance);
+ if (set.contains(victim))
throw new RuntimeException("Failed to stop " + victim);
}
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/SecurityHelper.java Fri Jul 13 20:34:44 2012
@@ -31,10 +31,6 @@ import org.apache.accumulo.server.test.r
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
-/**
- * @author jwvines
- *
- */
public class SecurityHelper {
protected final static Logger log = Logger.getLogger(SecurityHelper.class);
@@ -127,6 +123,9 @@ public class SecurityHelper {
public static void setTabPerm(State state, String userName, TablePermission tp, boolean value) {
log.debug((value ? "Gave" : "Took") + " the table permission " + tp.name() + (value ? " to" : " from") + " user " + userName);
state.set("Tab" + userName + tp.name(), Boolean.toString(value));
+ if (tp.equals(TablePermission.READ) || tp.equals(TablePermission.WRITE))
+ state.set("Tab" + userName + tp.name() + "time", System.currentTimeMillis());
+
}
public static boolean getSysPerm(State state, String userName, SystemPermission tp) {
@@ -194,4 +193,19 @@ public class SecurityHelper {
return fs;
}
+ /**
+ * @param state
+ * @param tabUserName
+ * @param tp
+ * @return
+ */
+ public static boolean inAmbiguousZone(State state, String userName, TablePermission tp) {
+ if (tp.equals(TablePermission.READ) || tp.equals(TablePermission.WRITE)) {
+ Long setTime = (Long) state.get("Tab" + userName + tp.name() + "time");
+ if (System.currentTimeMillis() < (setTime + 1000))
+ return true;
+ }
+ return false;
+ }
+
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/TableOp.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/TableOp.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/TableOp.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/TableOp.java Fri Jul 13 20:34:44 2012
@@ -78,11 +78,13 @@ public class TableOp extends Test {
boolean hasPerm = SecurityHelper.getTabPerm(state, SecurityHelper.getTabUserName(state), tp);
String tableName = state.getString("secTableName");
+ boolean ambiguousZone;
switch (tp) {
case READ:
Authorizations auths = SecurityHelper.getUserAuths(state, SecurityHelper.getTabUserName(state));
boolean canRead = SecurityHelper.getTabPerm(state, SecurityHelper.getTabUserName(state), TablePermission.READ);
+ ambiguousZone = SecurityHelper.inAmbiguousZone(state, SecurityHelper.getTabUserName(state), tp);
try {
Scanner scan = conn.createScanner(tableName, conn.securityOperations().getUserAuthorizations(SecurityHelper.getTabUserName(state)));
int seen = 0;
@@ -94,7 +96,7 @@ public class TableOp extends Test {
if (!auths.contains(k.getColumnVisibilityData()))
throw new AccumuloException("Got data I should not be capable of seeing: " + k + " table " + tableName);
}
- if (!canRead)
+ if (!canRead && !ambiguousZone)
throw new AccumuloException("Was able to read when I shouldn't have had the perm with connection user " + conn.whoami() + " table " + tableName);
for (Entry<String,Integer> entry : SecurityHelper.getAuthsMap(state).entrySet()) {
if (auths.contains(entry.getKey().getBytes()))
@@ -108,7 +110,7 @@ public class TableOp extends Test {
return;
} catch (AccumuloSecurityException ae) {
if (ae.getErrorCode().equals(SecurityErrorCode.PERMISSION_DENIED)) {
- if (canRead)
+ if (canRead && !ambiguousZone)
throw new AccumuloException("Table read permission out of sync with Accumulo: table " + tableName, ae);
else
return;
@@ -117,7 +119,7 @@ public class TableOp extends Test {
} catch (RuntimeException re) {
if (re.getCause() instanceof AccumuloSecurityException
&& ((AccumuloSecurityException) re.getCause()).getErrorCode().equals(SecurityErrorCode.PERMISSION_DENIED)) {
- if (canRead)
+ if (canRead && !ambiguousZone)
throw new AccumuloException("Table read permission out of sync with Accumulo: table " + tableName, re.getCause());
else
return;
@@ -127,6 +129,9 @@ public class TableOp extends Test {
break;
case WRITE:
+ // boolean canWrite = SecurityHelper.getTabPerm(state, SecurityHelper.getTabUserName(state), TablePermission.WRITE);
+ ambiguousZone = SecurityHelper.inAmbiguousZone(state, SecurityHelper.getTabUserName(state), tp);
+
String key = SecurityHelper.getLastKey(state) + "1";
Mutation m = new Mutation(new Text(key));
for (String s : SecurityHelper.getAuthsArray()) {
@@ -143,8 +148,20 @@ public class TableOp extends Test {
boolean works = true;
try {
writer.addMutation(m);
+ writer.close();
} catch (MutationsRejectedException mre) {
- throw new AccumuloException("Mutation exception!", mre);
+ // Currently no method for detecting reason for mre. Waiting on ACCUMULO-670
+ // For now, just wait a second and go again!
+
+ if (ambiguousZone) {
+ Thread.sleep(1000);
+ try {
+ writer.addMutation(m);
+ writer.close();
+ } catch (MutationsRejectedException mre2) {
+ throw new AccumuloException("Mutation exception!", mre2);
+ }
+ }
}
if (works)
for (String s : SecurityHelper.getAuthsArray())
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/test/scalability/Run.java Fri Jul 13 20:34:44 2012
@@ -52,8 +52,11 @@ public class Run {
Properties scaleProps = new Properties();
Properties testProps = new Properties();
try {
- scaleProps.load(new FileInputStream(sitePath));
- testProps.load(new FileInputStream(testPath));
+ FileInputStream fis = new FileInputStream(sitePath);
+ scaleProps.load(fis);
+ fis.close();
+ fis = new FileInputStream(testPath);
+ testProps.load(fis);
} catch (Exception e) {
System.out.println("Problem loading config file");
e.printStackTrace();
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java Fri Jul 13 20:34:44 2012
@@ -23,7 +23,8 @@ import java.util.TimerTask;
import org.apache.accumulo.cloudtrace.instrument.Span;
import org.apache.accumulo.cloudtrace.thrift.RemoteSpan;
-import org.apache.accumulo.cloudtrace.thrift.SpanReceiver;
+import org.apache.accumulo.cloudtrace.thrift.SpanReceiver.Processor;
+import org.apache.accumulo.cloudtrace.thrift.SpanReceiver.Iface;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
@@ -110,7 +111,7 @@ public class TraceServer implements Watc
}
}
- class Receiver implements SpanReceiver.Iface {
+ class Receiver implements Iface {
@Override
public void span(RemoteSpan s) throws TException {
String idString = Long.toHexString(s.traceId);
@@ -172,7 +173,7 @@ public class TraceServer implements Watc
sock.bind(new InetSocketAddress(port));
final TServerTransport transport = new TServerSocket(sock);
TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
- options.processor(new SpanReceiver.Processor(new Receiver()));
+ options.processor(new Processor<Iface>(new Receiver()));
server = new TThreadPoolServer(options);
final InetSocketAddress address = new InetSocketAddress(hostname, sock.getLocalPort());
registerInZooKeeper(AddressUtil.toString(address));
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/Admin.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/Admin.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/Admin.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/Admin.java Fri Jul 13 20:34:44 2012
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import jline.ConsoleReader;
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.impl.ClientExec;
@@ -119,10 +120,10 @@ public class Admin {
}
private static void stopServer(final AuthInfo credentials, final boolean tabletServersToo) throws AccumuloException, AccumuloSecurityException {
- MasterClient.execute(HdfsZooInstance.getInstance(), new ClientExec<MasterClientService.Iface>() {
+ MasterClient.execute(HdfsZooInstance.getInstance(), new ClientExec<MasterClientService.Client>() {
@Override
- public void execute(MasterClientService.Iface client) throws Exception {
- client.shutdown(null, credentials, tabletServersToo);
+ public void execute(MasterClientService.Client client) throws Exception {
+ client.shutdown(Tracer.traceInfo(), credentials, tabletServersToo);
}
});
}
@@ -130,10 +131,10 @@ public class Admin {
private static void stopTabletServer(String server, final boolean force) throws AccumuloException, AccumuloSecurityException {
InetSocketAddress address = AddressUtil.parseAddress(server, Property.TSERV_CLIENTPORT);
final String finalServer = org.apache.accumulo.core.util.AddressUtil.toString(address);
- MasterClient.execute(HdfsZooInstance.getInstance(), new ClientExec<MasterClientService.Iface>() {
+ MasterClient.execute(HdfsZooInstance.getInstance(), new ClientExec<MasterClientService.Client>() {
@Override
- public void execute(MasterClientService.Iface client) throws Exception {
- client.shutdownTabletServer(null, SecurityConstants.getSystemCredentials(), finalServer, force);
+ public void execute(MasterClientService.Client client) throws Exception {
+ client.shutdownTabletServer(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), finalServer, force);
}
});
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java Fri Jul 13 20:34:44 2012
@@ -60,11 +60,12 @@ public class FileSystemMonitor {
BufferedReader br = new BufferedReader(fr);
String line;
-
+ try {
while ((line = br.readLine()) != null)
mounts.add(new Mount(line));
-
- br.close();
+ } finally {
+ br.close();
+ }
return mounts;
}
Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java?rev=1361382&r1=1361381&r2=1361382&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java Fri Jul 13 20:34:44 2012
@@ -20,7 +20,6 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
-import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.util.Random;
@@ -43,10 +42,10 @@ import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
@@ -148,7 +147,12 @@ public class TServerUtils {
metrics.add(ThriftMetrics.idle, (now - idleStart));
}
try {
- return other.process(in, out);
+ try {
+ return other.process(in, out);
+ } catch (NullPointerException ex) {
+ // THRIFT-1447 - remove with thrift 0.9
+ return true;
+ }
} finally {
if (metrics.isEnabled()) {
idleStart = System.currentTimeMillis();
@@ -173,34 +177,6 @@ public class TServerUtils {
}
}
- public static class THsHaServer extends org.apache.thrift.server.THsHaServer {
- public THsHaServer(Args args) {
- super(args);
- }
-
- protected Runnable getRunnable(FrameBuffer frameBuffer) {
- return new Invocation(frameBuffer);
- }
-
- private class Invocation implements Runnable {
-
- private final FrameBuffer frameBuffer;
-
- public Invocation(final FrameBuffer frameBuffer) {
- this.frameBuffer = frameBuffer;
- }
-
- public void run() {
- if (frameBuffer.trans_ instanceof TNonblockingSocket) {
- TNonblockingSocket tsock = (TNonblockingSocket) frameBuffer.trans_;
- Socket sock = tsock.getSocketChannel().socket();
- clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
- }
- frameBuffer.invoke();
- }
- }
- }
-
public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, final int numThreads,
long timeBetweenThreadChecks) throws TTransportException {
TNonblockingServerSocket transport = new TNonblockingServerSocket(port);