You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:56:24 UTC
[45/54] [partial] ACCUMULO-658,
ACCUMULO-656 Split server into separate modules
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/security/handler/InsecureAuthenticator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/InsecureAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/InsecureAuthenticator.java
new file mode 100644
index 0000000..38574fa
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/InsecureAuthenticator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.NullToken;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+
+/**
+ * This is an Authenticator implementation that doesn't actually do any security. Any principal will authenticate if a NullToken is provided. It's existence is
+ * primarily for testing, but can also be used for any system where user space management is not a concern.
+ */
+public class InsecureAuthenticator implements Authenticator {
+
+ @Override
+ public void initialize(String instanceId, boolean initialize) {
+ return;
+ }
+
+ @Override
+ public boolean validSecurityHandlers(Authorizor auth, PermissionHandler pm) {
+ return true;
+ }
+
+ @Override
+ public void initializeSecurity(TCredentials credentials, String principal, byte[] token) throws AccumuloSecurityException {
+ return;
+ }
+
+ @Override
+ public boolean authenticateUser(String principal, AuthenticationToken token) {
+ return token instanceof NullToken;
+ }
+
+ @Override
+ public Set<String> listUsers() throws AccumuloSecurityException {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public void createUser(String principal, AuthenticationToken token) throws AccumuloSecurityException {
+ return;
+ }
+
+ @Override
+ public void dropUser(String user) throws AccumuloSecurityException {
+ return;
+ }
+
+ @Override
+ public void changePassword(String user, AuthenticationToken token) throws AccumuloSecurityException {
+ return;
+ }
+
+ @Override
+ public boolean userExists(String user) {
+ return true;
+ }
+
+ @Override
+ public boolean validTokenClass(String tokenClass) {
+ return tokenClass.equals(NullToken.class.getName());
+ }
+
+ @Override
+ public Set<Class<? extends AuthenticationToken>> getSupportedTokenTypes() {
+ Set<Class<? extends AuthenticationToken>> cs = new HashSet<Class<? extends AuthenticationToken>>();
+ cs.add(NullToken.class);
+ return cs;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/security/handler/InsecurePermHandler.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/InsecurePermHandler.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/InsecurePermHandler.java
new file mode 100644
index 0000000..b57abfe
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/InsecurePermHandler.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+
+/**
+ * This is a Permission Handler implementation that doesn't actually do any security. Use at your own risk.
+ */
+public class InsecurePermHandler implements PermissionHandler {
+
+ @Override
+ public void initialize(String instanceId, boolean initialize) {
+ return;
+ }
+
+ @Override
+ public boolean validSecurityHandlers(Authenticator authent, Authorizor author) {
+ return true;
+ }
+
+ @Override
+ public void initializeSecurity(TCredentials token, String rootuser) throws AccumuloSecurityException {
+ return;
+ }
+
+ @Override
+ public boolean hasSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ return true;
+ }
+
+ @Override
+ public boolean hasCachedSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ return true;
+ }
+
+ @Override
+ public boolean hasTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ return true;
+ }
+
+ @Override
+ public boolean hasCachedTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ return true;
+ }
+
+ @Override
+ public void grantSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ return;
+ }
+
+ @Override
+ public void revokeSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ return;
+ }
+
+ @Override
+ public void grantTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ return;
+ }
+
+ @Override
+ public void revokeTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ return;
+ }
+
+ @Override
+ public void cleanTablePermissions(String table) throws AccumuloSecurityException, TableNotFoundException {
+ return;
+ }
+
+ @Override
+ public void initUser(String user) throws AccumuloSecurityException {
+ return;
+ }
+
+ @Override
+ public void cleanUser(String user) throws AccumuloSecurityException {
+ return;
+ }
+
+ @Override
+ public void initTable(String table) throws AccumuloSecurityException {}
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/security/handler/PermissionHandler.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/PermissionHandler.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/PermissionHandler.java
new file mode 100644
index 0000000..72c64b5
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/PermissionHandler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+
+/**
+ * This interface is used for the system which will be used for getting a users permissions. If the implementation does not support configuration through
+ * Accumulo, it should throw an AccumuloSecurityException with the error code UNSUPPORTED_OPERATION
+ */
+public interface PermissionHandler {
+
+ /**
+ * Sets up the permission handler for a new instance of Accumulo
+ */
+ public void initialize(String instanceId, boolean initialize);
+
+ /**
+ * Used to validate that the Authorizor, Authenticator, and permission handler can coexist
+ */
+ public boolean validSecurityHandlers(Authenticator authent, Authorizor author);
+
+ /**
+ * Used to initialize security for the root user
+ */
+ public void initializeSecurity(TCredentials credentials, String rootuser) throws AccumuloSecurityException, ThriftSecurityException;
+
+ /**
+ * Used to get the system permission for the user
+ */
+ public boolean hasSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException;
+
+ /**
+ * Used to get the system permission for the user, with caching due to high frequency operation. NOTE: At this time, this method is unused but is included
+ * just in case we need it in the future.
+ */
+ public boolean hasCachedSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException;
+
+ /**
+ * Used to get the table permission of a user for a table
+ */
+ public boolean hasTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException;
+
+ /**
+ * Used to get the table permission of a user for a table, with caching. This method is for high frequency operations
+ */
+ public boolean hasCachedTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException;
+
+ /**
+ * Gives the user the given system permission
+ */
+ public void grantSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException;
+
+ /**
+ * Denies the user the given system permission
+ */
+ public void revokeSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException;
+
+ /**
+ * Gives the user the given table permission
+ */
+ public void grantTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException;
+
+ /**
+ * Denies the user the given table permission.
+ */
+ public void revokeTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException;
+
+ /**
+ * Cleans up the permissions for a table. Used when a table gets deleted.
+ */
+ public void cleanTablePermissions(String table) throws AccumuloSecurityException, TableNotFoundException;
+
+ /**
+ * Initializes a new user
+ */
+ public void initUser(String user) throws AccumuloSecurityException;
+
+ /**
+ * Initializes a new user
+ */
+ public void initTable(String table) throws AccumuloSecurityException;
+
+ /**
+ * Deletes a user
+ */
+ public void cleanUser(String user) throws AccumuloSecurityException;
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
new file mode 100644
index 0000000..4e327ec
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+// Utility class for adding all authentication info into ZK
+public final class ZKAuthenticator implements Authenticator {
+ static final Logger log = Logger.getLogger(ZKAuthenticator.class);
+ private static Authenticator zkAuthenticatorInstance = null;
+
+ private String ZKUserPath;
+ private final ZooCache zooCache;
+
+ public static synchronized Authenticator getInstance() {
+ if (zkAuthenticatorInstance == null)
+ zkAuthenticatorInstance = new ZKAuthenticator();
+ return zkAuthenticatorInstance;
+ }
+
+ public ZKAuthenticator() {
+ zooCache = new ZooCache();
+ }
+
+ @Override
+ public void initialize(String instanceId, boolean initialize) {
+ ZKUserPath = Constants.ZROOT + "/" + instanceId + "/users";
+ }
+
+ @Override
+ public void initializeSecurity(TCredentials credentials, String principal, byte[] token) throws AccumuloSecurityException {
+ try {
+ // remove old settings from zookeeper first, if any
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ synchronized (zooCache) {
+ zooCache.clear();
+ if (zoo.exists(ZKUserPath)) {
+ zoo.recursiveDelete(ZKUserPath, NodeMissingPolicy.SKIP);
+ log.info("Removed " + ZKUserPath + "/" + " from zookeeper");
+ }
+
+ // prep parent node of users with root username
+ zoo.putPersistentData(ZKUserPath, principal.getBytes(), NodeExistsPolicy.FAIL);
+
+ constructUser(principal, ZKSecurityTool.createPass(token));
+ }
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ } catch (AccumuloException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Sets up the user in ZK for the provided user. No checking for existence is done here, it should be done before calling.
+ */
+ private void constructUser(String user, byte[] pass) throws KeeperException, InterruptedException {
+ synchronized (zooCache) {
+ zooCache.clear();
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ zoo.putPrivatePersistentData(ZKUserPath + "/" + user, pass, NodeExistsPolicy.FAIL);
+ }
+ }
+
+ @Override
+ public Set<String> listUsers() {
+ return new TreeSet<String>(zooCache.getChildren(ZKUserPath));
+ }
+
+ /**
+ * Creates a user with no permissions whatsoever
+ */
+ @Override
+ public void createUser(String principal, AuthenticationToken token) throws AccumuloSecurityException {
+ try {
+ if (!(token instanceof PasswordToken))
+ throw new AccumuloSecurityException(principal, SecurityErrorCode.INVALID_TOKEN);
+ PasswordToken pt = (PasswordToken) token;
+ constructUser(principal, ZKSecurityTool.createPass(pt.getPassword()));
+ } catch (KeeperException e) {
+ if (e.code().equals(KeeperException.Code.NODEEXISTS))
+ throw new AccumuloSecurityException(principal, SecurityErrorCode.USER_EXISTS, e);
+ throw new AccumuloSecurityException(principal, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ } catch (AccumuloException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(principal, SecurityErrorCode.DEFAULT_SECURITY_ERROR, e);
+ }
+ }
+
+ @Override
+ public void dropUser(String user) throws AccumuloSecurityException {
+ try {
+ synchronized (zooCache) {
+ zooCache.clear();
+ ZooReaderWriter.getRetryingInstance().recursiveDelete(ZKUserPath + "/" + user, NodeMissingPolicy.FAIL);
+ }
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ } catch (KeeperException e) {
+ if (e.code().equals(KeeperException.Code.NONODE))
+ throw new AccumuloSecurityException(user, SecurityErrorCode.USER_DOESNT_EXIST, e);
+ log.error(e, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+ }
+ }
+
+ @Override
+ public void changePassword(String principal, AuthenticationToken token) throws AccumuloSecurityException {
+ if (!(token instanceof PasswordToken))
+ throw new AccumuloSecurityException(principal, SecurityErrorCode.INVALID_TOKEN);
+ PasswordToken pt = (PasswordToken) token;
+ if (userExists(principal)) {
+ try {
+ synchronized (zooCache) {
+ zooCache.clear(ZKUserPath + "/" + principal);
+ ZooReaderWriter.getRetryingInstance().putPrivatePersistentData(ZKUserPath + "/" + principal, ZKSecurityTool.createPass(pt.getPassword()),
+ NodeExistsPolicy.OVERWRITE);
+ }
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(principal, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ } catch (AccumuloException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(principal, SecurityErrorCode.DEFAULT_SECURITY_ERROR, e);
+ }
+ } else
+ throw new AccumuloSecurityException(principal, SecurityErrorCode.USER_DOESNT_EXIST); // user doesn't exist
+ }
+
+ /**
+ * Checks if a user exists
+ */
+ @Override
+ public boolean userExists(String user) {
+ return zooCache.get(ZKUserPath + "/" + user) != null;
+ }
+
+ @Override
+ public boolean validSecurityHandlers(Authorizor auth, PermissionHandler pm) {
+ return true;
+ }
+
+ @Override
+ public boolean authenticateUser(String principal, AuthenticationToken token) throws AccumuloSecurityException {
+ if (!(token instanceof PasswordToken))
+ throw new AccumuloSecurityException(principal, SecurityErrorCode.INVALID_TOKEN);
+ PasswordToken pt = (PasswordToken) token;
+ byte[] pass;
+ String zpath = ZKUserPath + "/" + principal;
+ pass = zooCache.get(zpath);
+ boolean result = ZKSecurityTool.checkPass(pt.getPassword(), pass);
+ if (!result) {
+ zooCache.clear(zpath);
+ pass = zooCache.get(zpath);
+ result = ZKSecurityTool.checkPass(pt.getPassword(), pass);
+ }
+ return result;
+ }
+
+ @Override
+ public Set<Class<? extends AuthenticationToken>> getSupportedTokenTypes() {
+ Set<Class<? extends AuthenticationToken>> cs = new HashSet<Class<? extends AuthenticationToken>>();
+ cs.add(PasswordToken.class);
+ return cs;
+ }
+
+ @Override
+ public boolean validTokenClass(String tokenClass) {
+ return tokenClass.equals(PasswordToken.class.getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
new file mode 100644
index 0000000..71274cc
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+public class ZKAuthorizor implements Authorizor {
+ private static final Logger log = Logger.getLogger(ZKAuthorizor.class);
+ private static Authorizor zkAuthorizorInstance = null;
+
+ private final String ZKUserAuths = "/Authorizations";
+
+ private String ZKUserPath;
+ private final ZooCache zooCache;
+
+ public static synchronized Authorizor getInstance() {
+ if (zkAuthorizorInstance == null)
+ zkAuthorizorInstance = new ZKAuthorizor();
+ return zkAuthorizorInstance;
+ }
+
+ public ZKAuthorizor() {
+ zooCache = new ZooCache();
+ }
+
+ public void initialize(String instanceId, boolean initialize) {
+ ZKUserPath = ZKSecurityTool.getInstancePath(instanceId) + "/users";
+ }
+
+ public Authorizations getCachedUserAuthorizations(String user) {
+ byte[] authsBytes = zooCache.get(ZKUserPath + "/" + user + ZKUserAuths);
+ if (authsBytes != null)
+ return ZKSecurityTool.convertAuthorizations(authsBytes);
+ return Authorizations.EMPTY;
+ }
+
+ @Override
+ public boolean validSecurityHandlers(Authenticator auth, PermissionHandler pm) {
+ return true;
+ }
+
+ @Override
+ public void initializeSecurity(TCredentials itw, String rootuser) throws AccumuloSecurityException {
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+
+ // create the root user with all system privileges, no table privileges, and no record-level authorizations
+ Set<SystemPermission> rootPerms = new TreeSet<SystemPermission>();
+ for (SystemPermission p : SystemPermission.values())
+ rootPerms.add(p);
+ Map<String,Set<TablePermission>> tablePerms = new HashMap<String,Set<TablePermission>>();
+ // Allow the root user to flush the !METADATA table
+ tablePerms.put(MetadataTable.ID, Collections.singleton(TablePermission.ALTER_TABLE));
+
+ try {
+ // prep parent node of users with root username
+ if (!zoo.exists(ZKUserPath))
+ zoo.putPersistentData(ZKUserPath, rootuser.getBytes(), NodeExistsPolicy.FAIL);
+
+ initUser(rootuser);
+ zoo.putPersistentData(ZKUserPath + "/" + rootuser + ZKUserAuths, ZKSecurityTool.convertAuthorizations(Authorizations.EMPTY), NodeExistsPolicy.FAIL);
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @param user
+ * @throws AccumuloSecurityException
+ */
+ public void initUser(String user) throws AccumuloSecurityException {
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ try {
+ zoo.putPersistentData(ZKUserPath + "/" + user, new byte[0], NodeExistsPolicy.SKIP);
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void dropUser(String user) throws AccumuloSecurityException {
+ try {
+ synchronized (zooCache) {
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserAuths, NodeMissingPolicy.SKIP);
+ zooCache.clear(ZKUserPath + "/" + user);
+ }
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ } catch (KeeperException e) {
+ log.error(e, e);
+ if (e.code().equals(KeeperException.Code.NONODE))
+ throw new AccumuloSecurityException(user, SecurityErrorCode.USER_DOESNT_EXIST, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+
+ }
+ }
+
+ @Override
+ public void changeAuthorizations(String user, Authorizations authorizations) throws AccumuloSecurityException {
+ try {
+ synchronized (zooCache) {
+ zooCache.clear();
+ ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserAuths, ZKSecurityTool.convertAuthorizations(authorizations),
+ NodeExistsPolicy.OVERWRITE);
+ }
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean isValidAuthorizations(String user, List<ByteBuffer> auths) throws AccumuloSecurityException {
+ Collection<ByteBuffer> userauths = getCachedUserAuthorizations(user).getAuthorizationsBB();
+ for (ByteBuffer auth : auths)
+ if (!userauths.contains(auth))
+ return false;
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
new file mode 100644
index 0000000..f219603
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+
+/**
+ *
+ */
+public class ZKPermHandler implements PermissionHandler {
+ private static final Logger log = Logger.getLogger(ZKAuthorizor.class);
+ private static PermissionHandler zkPermHandlerInstance = null;
+
+ private String ZKUserPath;
+ private String ZKTablePath;
+ private final ZooCache zooCache;
+ private final String ZKUserSysPerms = "/System";
+ private final String ZKUserTablePerms = "/Tables";
+
+ public static synchronized PermissionHandler getInstance() {
+ if (zkPermHandlerInstance == null)
+ zkPermHandlerInstance = new ZKPermHandler();
+ return zkPermHandlerInstance;
+ }
+
+ @Override
+ public void initialize(String instanceId, boolean initialize) {
+ ZKUserPath = ZKSecurityTool.getInstancePath(instanceId) + "/users";
+ ZKTablePath = ZKSecurityTool.getInstancePath(instanceId) + "/tables";
+ }
+
+ public ZKPermHandler() {
+ zooCache = new ZooCache();
+ }
+
+ @Override
+ public boolean hasTablePermission(String user, String table, TablePermission permission) throws TableNotFoundException {
+ byte[] serializedPerms;
+ try {
+ String path = ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table;
+ ZooReaderWriter.getRetryingInstance().sync(path);
+ serializedPerms = ZooReaderWriter.getRetryingInstance().getData(path, null);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ // maybe the table was just deleted?
+ try {
+ // check for existence:
+ ZooReaderWriter.getRetryingInstance().getData(ZKTablePath + "/" + table, null);
+ // it's there, you don't have permission
+ return false;
+ } catch (InterruptedException ex) {
+ log.warn("Unhandled InterruptedException, failing closed for table permission check", e);
+ return false;
+ } catch (KeeperException ex) {
+ // not there, throw an informative exception
+ if (e.code() == Code.NONODE) {
+ throw new TableNotFoundException(null, table, "while checking permissions");
+ }
+ log.warn("Unhandled InterruptedException, failing closed for table permission check", e);
+ }
+ return false;
+ }
+ log.warn("Unhandled KeeperException, failing closed for table permission check", e);
+ return false;
+ } catch (InterruptedException e) {
+ log.warn("Unhandled InterruptedException, failing closed for table permission check", e);
+ return false;
+ }
+ if (serializedPerms != null) {
+ return ZKSecurityTool.convertTablePermissions(serializedPerms).contains(permission);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean hasCachedTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ byte[] serializedPerms = zooCache.get(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table);
+ if (serializedPerms != null) {
+ return ZKSecurityTool.convertTablePermissions(serializedPerms).contains(permission);
+ }
+ return false;
+ }
+
+ @Override
+ public void grantSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ try {
+ byte[] permBytes = zooCache.get(ZKUserPath + "/" + user + ZKUserSysPerms);
+ Set<SystemPermission> perms;
+ if (permBytes == null) {
+ perms = new TreeSet<SystemPermission>();
+ } else {
+ perms = ZKSecurityTool.convertSystemPermissions(permBytes);
+ }
+
+ if (perms.add(permission)) {
+ synchronized (zooCache) {
+ zooCache.clear();
+ ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(perms),
+ NodeExistsPolicy.OVERWRITE);
+ }
+ }
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void grantTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException {
+ Set<TablePermission> tablePerms;
+ byte[] serializedPerms = zooCache.get(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table);
+ if (serializedPerms != null)
+ tablePerms = ZKSecurityTool.convertTablePermissions(serializedPerms);
+ else
+ tablePerms = new TreeSet<TablePermission>();
+
+ try {
+ if (tablePerms.add(permission)) {
+ synchronized (zooCache) {
+ zooCache.clear(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table);
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, ZKSecurityTool.convertTablePermissions(tablePerms),
+ NodeExistsPolicy.OVERWRITE);
+ }
+ }
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void revokeSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ byte[] sysPermBytes = zooCache.get(ZKUserPath + "/" + user + ZKUserSysPerms);
+
+ // User had no system permission, nothing to revoke.
+ if (sysPermBytes == null)
+ return;
+
+ Set<SystemPermission> sysPerms = ZKSecurityTool.convertSystemPermissions(sysPermBytes);
+
+ try {
+ if (sysPerms.remove(permission)) {
+ synchronized (zooCache) {
+ zooCache.clear();
+ ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(sysPerms),
+ NodeExistsPolicy.OVERWRITE);
+ }
+ }
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void revokeTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException {
+ byte[] serializedPerms = zooCache.get(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table);
+
+ // User had no table permission, nothing to revoke.
+ if (serializedPerms == null)
+ return;
+
+ Set<TablePermission> tablePerms = ZKSecurityTool.convertTablePermissions(serializedPerms);
+ try {
+ if (tablePerms.remove(permission)) {
+ zooCache.clear();
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ if (tablePerms.size() == 0)
+ zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, NodeMissingPolicy.SKIP);
+ else
+ zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, ZKSecurityTool.convertTablePermissions(tablePerms),
+ NodeExistsPolicy.OVERWRITE);
+ }
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void cleanTablePermissions(String table) throws AccumuloSecurityException {
+ try {
+ synchronized (zooCache) {
+ zooCache.clear();
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ for (String user : zooCache.getChildren(ZKUserPath))
+ zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, NodeMissingPolicy.SKIP);
+ }
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException("unknownUser", SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void initializeSecurity(TCredentials itw, String rootuser) throws AccumuloSecurityException {
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+
+ // create the root user with all system privileges, no table privileges, and no record-level authorizations
+ Set<SystemPermission> rootPerms = new TreeSet<SystemPermission>();
+ for (SystemPermission p : SystemPermission.values())
+ rootPerms.add(p);
+ Map<String,Set<TablePermission>> tablePerms = new HashMap<String,Set<TablePermission>>();
+ // Allow the root user to flush the system tables
+ tablePerms.put(RootTable.ID, Collections.singleton(TablePermission.ALTER_TABLE));
+ tablePerms.put(MetadataTable.ID, Collections.singleton(TablePermission.ALTER_TABLE));
+
+ try {
+ // prep parent node of users with root username
+ if (!zoo.exists(ZKUserPath))
+ zoo.putPersistentData(ZKUserPath, rootuser.getBytes(), NodeExistsPolicy.FAIL);
+
+ initUser(rootuser);
+ zoo.putPersistentData(ZKUserPath + "/" + rootuser + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(rootPerms), NodeExistsPolicy.FAIL);
+ for (Entry<String,Set<TablePermission>> entry : tablePerms.entrySet())
+ createTablePerm(rootuser, entry.getKey(), entry.getValue());
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @param user
+ * @throws AccumuloSecurityException
+ */
+ @Override
+ public void initUser(String user) throws AccumuloSecurityException {
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ try {
+ zoo.putPersistentData(ZKUserPath + "/" + user, new byte[0], NodeExistsPolicy.SKIP);
+ zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms, new byte[0], NodeExistsPolicy.SKIP);
+ } catch (KeeperException e) {
+ log.error(e, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Sets up a new table configuration for the provided user/table. No checking for existence is done here, it should be done before calling.
+ */
+ private void createTablePerm(String user, String table, Set<TablePermission> perms) throws KeeperException, InterruptedException {
+ synchronized (zooCache) {
+ zooCache.clear();
+ ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table,
+ ZKSecurityTool.convertTablePermissions(perms), NodeExistsPolicy.FAIL);
+ }
+ }
+
+ @Override
+ public void cleanUser(String user) throws AccumuloSecurityException {
+ try {
+ synchronized (zooCache) {
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserSysPerms, NodeMissingPolicy.SKIP);
+ zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms, NodeMissingPolicy.SKIP);
+ zooCache.clear(ZKUserPath + "/" + user);
+ }
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ throw new RuntimeException(e);
+ } catch (KeeperException e) {
+ log.error(e, e);
+ if (e.code().equals(KeeperException.Code.NONODE))
+ throw new AccumuloSecurityException(user, SecurityErrorCode.USER_DOESNT_EXIST, e);
+ throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
+
+ }
+ }
+
+ @Override
+ public boolean hasSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ byte[] perms;
+ try {
+ String path = ZKUserPath + "/" + user + ZKUserSysPerms;
+ ZooReaderWriter.getRetryingInstance().sync(path);
+ perms = ZooReaderWriter.getRetryingInstance().getData(path, null);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return false;
+ }
+ log.warn("Unhandled KeeperException, failing closed for table permission check", e);
+ return false;
+ } catch (InterruptedException e) {
+ log.warn("Unhandled InterruptedException, failing closed for table permission check", e);
+ return false;
+ }
+
+ if (perms == null)
+ return false;
+ return ZKSecurityTool.convertSystemPermissions(perms).contains(permission);
+ }
+
+ @Override
+ public boolean hasCachedSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ byte[] perms = zooCache.get(ZKUserPath + "/" + user + ZKUserSysPerms);
+ if (perms == null)
+ return false;
+ return ZKSecurityTool.convertSystemPermissions(perms).contains(permission);
+ }
+
+ @Override
+ public boolean validSecurityHandlers(Authenticator authent, Authorizor author) {
+ return true;
+ }
+
+ @Override
+ public void initTable(String table) throws AccumuloSecurityException {
+ // All proper housekeeping is done on delete and permission granting, no work needs to be done here
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java
new file mode 100644
index 0000000..3b9d8b2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKSecurityTool.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.handler;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.log4j.Logger;
+
+/**
+ * All the static too methods used for this class, so that we can separate out stuff that isn't using ZooKeeper. That way, we can check the synchronization
+ * model more easily, as we only need to check to make sure zooCache is cleared when things are written to ZooKeeper in methods that might use it. These won't,
+ * and so don't need to be checked.
+ */
+class ZKSecurityTool {
+ private static final Logger log = Logger.getLogger(ZKSecurityTool.class);
+ private static final int SALT_LENGTH = 8;
+
+ // Generates a byte array salt of length SALT_LENGTH
+ private static byte[] generateSalt() {
+ final SecureRandom random = new SecureRandom();
+ byte[] salt = new byte[SALT_LENGTH];
+ random.nextBytes(salt);
+ return salt;
+ }
+
+ private static byte[] hash(byte[] raw) throws NoSuchAlgorithmException {
+ MessageDigest md = MessageDigest.getInstance(Constants.PW_HASH_ALGORITHM);
+ md.update(raw);
+ return md.digest();
+ }
+
+ public static boolean checkPass(byte[] password, byte[] zkData) {
+ if (zkData == null)
+ return false;
+
+ byte[] salt = new byte[SALT_LENGTH];
+ System.arraycopy(zkData, 0, salt, 0, SALT_LENGTH);
+ byte[] passwordToCheck;
+ try {
+ passwordToCheck = convertPass(password, salt);
+ } catch (NoSuchAlgorithmException e) {
+ log.error("Count not create hashed password", e);
+ return false;
+ }
+ return java.util.Arrays.equals(passwordToCheck, zkData);
+ }
+
+ public static byte[] createPass(byte[] password) throws AccumuloException {
+ byte[] salt = generateSalt();
+ try {
+ return convertPass(password, salt);
+ } catch (NoSuchAlgorithmException e) {
+ log.error("Count not create hashed password", e);
+ throw new AccumuloException("Count not create hashed password", e);
+ }
+ }
+
+ private static byte[] convertPass(byte[] password, byte[] salt) throws NoSuchAlgorithmException {
+ byte[] plainSalt = new byte[password.length + SALT_LENGTH];
+ System.arraycopy(password, 0, plainSalt, 0, password.length);
+ System.arraycopy(salt, 0, plainSalt, password.length, SALT_LENGTH);
+ byte[] hashed = hash(plainSalt);
+ byte[] saltedHash = new byte[SALT_LENGTH + hashed.length];
+ System.arraycopy(salt, 0, saltedHash, 0, SALT_LENGTH);
+ System.arraycopy(hashed, 0, saltedHash, SALT_LENGTH, hashed.length);
+ return saltedHash; // contains salt+hash(password+salt)
+ }
+
+ public static Authorizations convertAuthorizations(byte[] authorizations) {
+ return new Authorizations(authorizations);
+ }
+
+ public static byte[] convertAuthorizations(Authorizations authorizations) {
+ return authorizations.getAuthorizationsArray();
+ }
+
+ public static byte[] convertSystemPermissions(Set<SystemPermission> systempermissions) {
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream(systempermissions.size());
+ DataOutputStream out = new DataOutputStream(bytes);
+ try {
+ for (SystemPermission sp : systempermissions)
+ out.writeByte(sp.getId());
+ } catch (IOException e) {
+ log.error(e, e);
+ throw new RuntimeException(e); // this is impossible with ByteArrayOutputStream; crash hard if this happens
+ }
+ return bytes.toByteArray();
+ }
+
+ public static Set<SystemPermission> convertSystemPermissions(byte[] systempermissions) {
+ ByteArrayInputStream bytes = new ByteArrayInputStream(systempermissions);
+ DataInputStream in = new DataInputStream(bytes);
+ Set<SystemPermission> toReturn = new HashSet<SystemPermission>();
+ try {
+ while (in.available() > 0)
+ toReturn.add(SystemPermission.getPermissionById(in.readByte()));
+ } catch (IOException e) {
+ log.error("User database is corrupt; error converting system permissions", e);
+ toReturn.clear();
+ }
+ return toReturn;
+ }
+
+ public static byte[] convertTablePermissions(Set<TablePermission> tablepermissions) {
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream(tablepermissions.size());
+ DataOutputStream out = new DataOutputStream(bytes);
+ try {
+ for (TablePermission tp : tablepermissions)
+ out.writeByte(tp.getId());
+ } catch (IOException e) {
+ log.error(e, e);
+ throw new RuntimeException(e); // this is impossible with ByteArrayOutputStream; crash hard if this happens
+ }
+ return bytes.toByteArray();
+ }
+
+ public static Set<TablePermission> convertTablePermissions(byte[] tablepermissions) {
+ Set<TablePermission> toReturn = new HashSet<TablePermission>();
+ for (byte b : tablepermissions)
+ toReturn.add(TablePermission.getPermissionById(b));
+ return toReturn;
+ }
+
+ public static String getInstancePath(String instanceId) {
+ return Constants.ZROOT + "/" + instanceId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
new file mode 100644
index 0000000..8a74d0b
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tables;
+
+import java.security.SecurityPermission;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.util.TablePropUtil;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+public class TableManager {
+ private static SecurityPermission TABLE_MANAGER_PERMISSION = new SecurityPermission("tableManagerPermission");
+
+ private static final Logger log = Logger.getLogger(TableManager.class);
+ private static final Set<TableObserver> observers = Collections.synchronizedSet(new HashSet<TableObserver>());
+ private static final Map<String,TableState> tableStateCache = Collections.synchronizedMap(new HashMap<String,TableState>());
+
+ private static TableManager tableManager = null;
+
+ private final Instance instance;
+ private ZooCache zooStateCache;
+
+ public static void prepareNewTableState(String instanceId, String tableId, String tableName, TableState state, NodeExistsPolicy existsPolicy)
+ throws KeeperException, InterruptedException {
+ // state gets created last
+ String zTablePath = Constants.ZROOT + "/" + instanceId + Constants.ZTABLES + "/" + tableId;
+ IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+ zoo.putPersistentData(zTablePath, new byte[0], existsPolicy);
+ zoo.putPersistentData(zTablePath + Constants.ZTABLE_CONF, new byte[0], existsPolicy);
+ zoo.putPersistentData(zTablePath + Constants.ZTABLE_NAME, tableName.getBytes(), existsPolicy);
+ zoo.putPersistentData(zTablePath + Constants.ZTABLE_STATE, state.name().getBytes(), existsPolicy);
+ zoo.putPersistentData(zTablePath + Constants.ZTABLE_FLUSH_ID, "0".getBytes(), existsPolicy);
+ zoo.putPersistentData(zTablePath + Constants.ZTABLE_COMPACT_ID, "0".getBytes(), existsPolicy);
+ zoo.putPersistentData(zTablePath + Constants.ZTABLE_COMPACT_CANCEL_ID, "0".getBytes(), existsPolicy);
+ }
+
+ public synchronized static TableManager getInstance() {
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(TABLE_MANAGER_PERMISSION);
+ }
+ if (tableManager == null)
+ tableManager = new TableManager();
+ return tableManager;
+ }
+
+ private TableManager() {
+ instance = HdfsZooInstance.getInstance();
+ zooStateCache = new ZooCache(new TableStateWatcher());
+ updateTableStateCache();
+ }
+
+ public TableState getTableState(String tableId) {
+ return tableStateCache.get(tableId);
+ }
+
+ public static class IllegalTableTransitionException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ final TableState oldState;
+ final TableState newState;
+
+ public IllegalTableTransitionException(TableState oldState, TableState newState) {
+ this.oldState = oldState;
+ this.newState = newState;
+ }
+
+ public TableState getOldState() {
+ return oldState;
+ }
+
+ public TableState getNewState() {
+ return newState;
+ }
+
+ }
+
+ public synchronized void transitionTableState(final String tableId, final TableState newState) {
+ String statePath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE;
+
+ try {
+ ZooReaderWriter.getRetryingInstance().mutate(statePath, (byte[]) newState.name().getBytes(), ZooUtil.PUBLIC, new Mutator() {
+ @Override
+ public byte[] mutate(byte[] oldData) throws Exception {
+ TableState oldState = TableState.UNKNOWN;
+ if (oldData != null)
+ oldState = TableState.valueOf(new String(oldData));
+ boolean transition = true;
+ // +--------+
+ // v |
+ // NEW -> (ONLINE|OFFLINE)+--- DELETING
+ switch (oldState) {
+ case NEW:
+ transition = (newState == TableState.OFFLINE || newState == TableState.ONLINE);
+ break;
+ case ONLINE: // fall-through intended
+ case UNKNOWN:// fall through intended
+ case OFFLINE:
+ transition = (newState != TableState.NEW);
+ break;
+ case DELETING:
+ // Can't transition to any state from DELETING
+ transition = false;
+ break;
+ }
+ if (!transition)
+ throw new IllegalTableTransitionException(oldState, newState);
+ log.debug("Transitioning state for table " + tableId + " from " + oldState + " to " + newState);
+ return newState.name().getBytes();
+ }
+ });
+ } catch (Exception e) {
+ log.fatal("Failed to transition table to state " + newState);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void updateTableStateCache() {
+ synchronized (tableStateCache) {
+ for (String tableId : zooStateCache.getChildren(ZooUtil.getRoot(instance) + Constants.ZTABLES))
+ if (zooStateCache.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE) != null)
+ updateTableStateCache(tableId);
+ }
+ }
+
+ public TableState updateTableStateCache(String tableId) {
+ synchronized (tableStateCache) {
+ TableState tState = TableState.UNKNOWN;
+ byte[] data = zooStateCache.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE);
+ if (data != null) {
+ String sState = new String(data);
+ try {
+ tState = TableState.valueOf(sState);
+ } catch (IllegalArgumentException e) {
+ log.error("Unrecognized state for table with tableId=" + tableId + ": " + sState);
+ }
+ tableStateCache.put(tableId, tState);
+ }
+ return tState;
+ }
+ }
+
+ public void addTable(String tableId, String tableName, NodeExistsPolicy existsPolicy) throws KeeperException, InterruptedException {
+ prepareNewTableState(instance.getInstanceID(), tableId, tableName, TableState.NEW, existsPolicy);
+ updateTableStateCache(tableId);
+ }
+
+ public void cloneTable(String srcTable, String tableId, String tableName, Map<String,String> propertiesToSet, Set<String> propertiesToExclude,
+ NodeExistsPolicy existsPolicy) throws KeeperException, InterruptedException {
+ prepareNewTableState(instance.getInstanceID(), tableId, tableName, TableState.NEW, existsPolicy);
+ String srcTablePath = Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + srcTable + Constants.ZTABLE_CONF;
+ String newTablePath = Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_CONF;
+ ZooReaderWriter.getRetryingInstance().recursiveCopyPersistent(srcTablePath, newTablePath, NodeExistsPolicy.OVERWRITE);
+
+ for (Entry<String,String> entry : propertiesToSet.entrySet())
+ TablePropUtil.setTableProperty(tableId, entry.getKey(), entry.getValue());
+
+ for (String prop : propertiesToExclude)
+ ZooReaderWriter.getRetryingInstance().recursiveDelete(
+ Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_CONF + "/" + prop, NodeMissingPolicy.SKIP);
+
+ updateTableStateCache(tableId);
+ }
+
+ public void removeTable(String tableId) throws KeeperException, InterruptedException {
+ synchronized (tableStateCache) {
+ tableStateCache.remove(tableId);
+ ZooReaderWriter.getRetryingInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE,
+ NodeMissingPolicy.SKIP);
+ ZooReaderWriter.getRetryingInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId, NodeMissingPolicy.SKIP);
+ }
+ }
+
+ public boolean addObserver(TableObserver to) {
+ synchronized (observers) {
+ synchronized (tableStateCache) {
+ to.initialize(Collections.unmodifiableMap(tableStateCache));
+ return observers.add(to);
+ }
+ }
+ }
+
+ public boolean removeObserver(TableObserver to) {
+ return observers.remove(to);
+ }
+
+ private class TableStateWatcher implements Watcher {
+ @Override
+ public void process(WatchedEvent event) {
+ if (log.isTraceEnabled())
+ log.trace(event);
+
+ final String zPath = event.getPath();
+ final EventType zType = event.getType();
+
+ String tablesPrefix = ZooUtil.getRoot(instance) + Constants.ZTABLES;
+ String tableId = null;
+
+ if (zPath != null && zPath.startsWith(tablesPrefix + "/")) {
+ String suffix = zPath.substring(tablesPrefix.length() + 1);
+ if (suffix.contains("/")) {
+ String[] sa = suffix.split("/", 2);
+ if (Constants.ZTABLE_STATE.equals("/" + sa[1]))
+ tableId = sa[0];
+ }
+ if (tableId == null) {
+ log.warn("Unknown path in " + event);
+ return;
+ }
+ }
+
+ switch (zType) {
+ case NodeChildrenChanged:
+ if (zPath != null && zPath.equals(tablesPrefix)) {
+ updateTableStateCache();
+ } else {
+ log.warn("Unexpected path " + zPath);
+ }
+ break;
+ case NodeCreated:
+ case NodeDataChanged:
+ // state transition
+ TableState tState = updateTableStateCache(tableId);
+ log.debug("State transition to " + tState + " @ " + event);
+ synchronized (observers) {
+ for (TableObserver to : observers)
+ to.stateChanged(tableId, tState);
+ }
+ break;
+ case NodeDeleted:
+ if (zPath != null
+ && tableId != null
+ && (zPath.equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_STATE) || zPath.equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_CONF) || zPath
+ .equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_NAME)))
+ tableStateCache.remove(tableId);
+ break;
+ case None:
+ switch (event.getState()) {
+ case Expired:
+ if (log.isTraceEnabled())
+ log.trace("Session expired " + event);
+ synchronized (observers) {
+ for (TableObserver to : observers)
+ to.sessionExpired();
+ }
+ break;
+ case SyncConnected:
+ default:
+ if (log.isTraceEnabled())
+ log.trace("Ignored " + event);
+ }
+ break;
+ default:
+ log.warn("Unandled " + event);
+ }
+ }
+ }
+
+ /*
+ * private static boolean verifyTabletAssignments(String tableId) { log.info( "Sending message to load balancer to verify assignment of tablets with tableId="
+ * + tableId); // Return true only if transitions to other states did not interrupt // this process. (like deleting the table) return true; }
+ *
+ * private static synchronized boolean unloadTable(String tableId) { int loadedTabletCount = 0; while (loadedTabletCount > 0) { // wait for tables to be
+ * unloaded } log.info("Table unloaded. tableId=" + tableId); return true; }
+ *
+ * private static void cleanupDeletedTable(String tableId) { log.info("Sending message to cleanup the deleted table with tableId=" + tableId); }
+ *
+ * switch (tState) { case NEW: // this should really only happen before the watcher // knows about the table log.error("Unexpected transition to " + tState +
+ * " @ " + event); break;
+ *
+ * case LOADING: // a table has started coming online or has pending // migrations (maybe?) if (verifyTabletAssignments(tableId))
+ * TableState.transition(instance, tableId, TableState.ONLINE); break; case ONLINE: log.trace("Table online with tableId=" + tableId); break;
+ *
+ * case DISABLING: if (unloadTable(tableId)) TableState.transition(instance, tableId, TableState.DISABLED); break; case DISABLED:
+ * log.trace("Table disabled with tableId=" + tableId); break;
+ *
+ * case UNLOADING: unloadTable(tableId); TableState.transition(instance, tableId, TableState.OFFLINE); case OFFLINE: break;
+ *
+ * case DELETING: unloadTable(tableId); cleanupDeletedTable(tableId); break;
+ *
+ * default: log.error("Unrecognized transition to " + tState + " @ " + event); }
+ */
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/tables/TableObserver.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tables/TableObserver.java b/server/base/src/main/java/org/apache/accumulo/server/tables/TableObserver.java
new file mode 100644
index 0000000..80aec3a
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/tables/TableObserver.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tables;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.master.state.tables.TableState;
+
+public interface TableObserver {
+ void initialize(Map<String,TableState> tableIdToStateMap);
+
+ void stateChanged(String tableId, TableState tState);
+
+ void sessionExpired();
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java
new file mode 100644
index 0000000..bd4ceae
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tablets;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.client.admin.TimeType;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.util.time.RelativeTime;
+
+public abstract class TabletTime {
+ public static final char LOGICAL_TIME_ID = 'L';
+ public static final char MILLIS_TIME_ID = 'M';
+
+ public static char getTimeID(TimeType timeType) {
+ switch (timeType) {
+ case LOGICAL:
+ return LOGICAL_TIME_ID;
+ case MILLIS:
+ return MILLIS_TIME_ID;
+ }
+
+ throw new IllegalArgumentException("Unknown time type " + timeType);
+ }
+
+ public abstract void useMaxTimeFromWALog(long time);
+
+ public abstract String getMetadataValue(long time);
+
+ public abstract String getMetadataValue();
+
+ // abstract long setUpdateTimes(Mutation mutation);
+ public abstract long setUpdateTimes(List<Mutation> mutations);
+
+ public abstract long getTime();
+
+ public abstract long getAndUpdateTime();
+
+ protected void setSystemTimes(Mutation mutation, long lastCommitTime) {
+ ServerMutation m = (ServerMutation) mutation;
+ m.setSystemTimestamp(lastCommitTime);
+ }
+
+ public static TabletTime getInstance(String metadataValue) {
+ if (metadataValue.charAt(0) == LOGICAL_TIME_ID) {
+ return new LogicalTime(Long.parseLong(metadataValue.substring(1)));
+ } else if (metadataValue.charAt(0) == MILLIS_TIME_ID) {
+ return new MillisTime(Long.parseLong(metadataValue.substring(1)));
+ }
+
+ throw new IllegalArgumentException("Time type unknown : " + metadataValue);
+
+ }
+
+ public static String maxMetadataTime(String mv1, String mv2) {
+ if (mv1 == null) {
+ checkType(mv2);
+ return mv2;
+ }
+
+ if (mv2 == null) {
+ checkType(mv1);
+ return mv1;
+ }
+
+ if (mv1.charAt(0) != mv2.charAt(0))
+ throw new IllegalArgumentException("Time types differ " + mv1 + " " + mv2);
+ checkType(mv1);
+
+ long t1 = Long.parseLong(mv1.substring(1));
+ long t2 = Long.parseLong(mv2.substring(1));
+
+ if (t1 < t2)
+ return mv2;
+ else
+ return mv1;
+
+ }
+
+ private static void checkType(String mv1) {
+ if (mv1.charAt(0) != LOGICAL_TIME_ID && mv1.charAt(0) != MILLIS_TIME_ID)
+ throw new IllegalArgumentException("Invalid time type " + mv1);
+ }
+
+ static class MillisTime extends TabletTime {
+
+ private long lastTime;
+ private long lastUpdateTime = 0;
+
+ public MillisTime(long time) {
+ this.lastTime = time;
+ }
+
+ @Override
+ public String getMetadataValue(long time) {
+ return MILLIS_TIME_ID + "" + time;
+ }
+
+ @Override
+ public String getMetadataValue() {
+ return getMetadataValue(lastTime);
+ }
+
+ @Override
+ public void useMaxTimeFromWALog(long time) {
+ if (time > lastTime)
+ lastTime = time;
+ }
+
+ @Override
+ public long setUpdateTimes(List<Mutation> mutations) {
+
+ long currTime = RelativeTime.currentTimeMillis();
+
+ synchronized (this) {
+ if (mutations.size() == 0)
+ return lastTime;
+
+ currTime = updateTime(currTime);
+ }
+
+ for (Mutation mutation : mutations)
+ setSystemTimes(mutation, currTime);
+
+ return currTime;
+ }
+
+ private long updateTime(long currTime) {
+ if (currTime < lastTime) {
+ if (currTime - lastUpdateTime > 0) {
+ // not in same millisecond as last call
+ // to this method so move ahead slowly
+ lastTime++;
+ }
+
+ lastUpdateTime = currTime;
+
+ currTime = lastTime;
+ } else {
+ lastTime = currTime;
+ }
+ return currTime;
+ }
+
+ @Override
+ public long getTime() {
+ return lastTime;
+ }
+
+ @Override
+ public long getAndUpdateTime() {
+ long currTime = RelativeTime.currentTimeMillis();
+
+ synchronized (this) {
+ currTime = updateTime(currTime);
+ }
+
+ return currTime;
+ }
+
+ }
+
+ static class LogicalTime extends TabletTime {
+ AtomicLong nextTime;
+
+ private LogicalTime(Long time) {
+ this.nextTime = new AtomicLong(time.longValue() + 1);
+ }
+
+ @Override
+ public void useMaxTimeFromWALog(long time) {
+ time++;
+
+ if (this.nextTime.get() < time) {
+ this.nextTime.set(time);
+ }
+ }
+
+ @Override
+ public String getMetadataValue() {
+ return getMetadataValue(getTime());
+ }
+
+ @Override
+ public String getMetadataValue(long time) {
+ return LOGICAL_TIME_ID + "" + time;
+ }
+
+ @Override
+ public long setUpdateTimes(List<Mutation> mutations) {
+ if (mutations.size() == 0)
+ return getTime();
+
+ long time = nextTime.getAndAdd(mutations.size());
+ for (Mutation mutation : mutations)
+ setSystemTimes(mutation, time++);
+
+ return time - 1;
+ }
+
+ @Override
+ public long getTime() {
+ return nextTime.get() - 1;
+ }
+
+ @Override
+ public long getAndUpdateTime() {
+ return nextTime.getAndIncrement();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
new file mode 100644
index 0000000..57b9b32
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tablets;
+
+import java.util.Random;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+
+/**
+ * Allocates unique names for an accumulo instance. The names are unique for the lifetime of the instance.
+ *
+ * This is useful for filenames because it makes caching easy.
+ *
+ */
+
+public class UniqueNameAllocator {
+ private long next = 0;
+ private long maxAllocated = 0;
+ private String nextNamePath;
+ private Random rand;
+
+ private UniqueNameAllocator() {
+ nextNamePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZNEXT_FILE;
+ rand = new Random();
+ }
+
+ public synchronized String getNextName() {
+
+ while (next >= maxAllocated) {
+ final int allocate = 100 + rand.nextInt(100);
+
+ try {
+ byte[] max = ZooReaderWriter.getRetryingInstance().mutate(nextNamePath, null, ZooUtil.PRIVATE, new ZooReaderWriter.Mutator() {
+ public byte[] mutate(byte[] currentValue) throws Exception {
+ long l = Long.parseLong(new String(currentValue), Character.MAX_RADIX);
+ l += allocate;
+ return Long.toString(l, Character.MAX_RADIX).getBytes();
+ }
+ });
+
+ maxAllocated = Long.parseLong(new String(max), Character.MAX_RADIX);
+ next = maxAllocated - allocate;
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return new String(FastFormat.toZeroPaddedString(next++, 7, Character.MAX_RADIX, new byte[0]));
+ }
+
+ private static UniqueNameAllocator instance = null;
+
+ public static synchronized UniqueNameAllocator getInstance() {
+ if (instance == null)
+ instance = new UniqueNameAllocator();
+
+ return instance;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
new file mode 100644
index 0000000..dd1a6ef
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/LargestFirstMemoryManager.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class LargestFirstMemoryManager implements MemoryManager {
+
+ private static final Logger log = Logger.getLogger(LargestFirstMemoryManager.class);
+ private static final int TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER = 2;
+
+ private long maxMemory = -1;
+ private int maxConcurrentMincs;
+ private int numWaitingMultiplier;
+ private long prevIngestMemory;
+ private double compactionThreshold;
+ private long maxObserved;
+ private HashMap<Text,Long> mincIdleThresholds;
+ private static final long zerotime = System.currentTimeMillis();
+ private ServerConfiguration config = null;
+
+ LargestFirstMemoryManager(long maxMemory, int maxConcurrentMincs, int numWaitingMultiplier) {
+ this();
+ this.maxMemory = maxMemory;
+ this.maxConcurrentMincs = maxConcurrentMincs;
+ this.numWaitingMultiplier = numWaitingMultiplier;
+ }
+
+ @Override
+ public void init(ServerConfiguration conf) {
+ this.config = conf;
+ maxMemory = conf.getConfiguration().getMemoryInBytes(Property.TSERV_MAXMEM);
+ maxConcurrentMincs = conf.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT);
+ numWaitingMultiplier = TSERV_MINC_MAXCONCURRENT_NUMWAITING_MULTIPLIER;
+ }
+
+ public LargestFirstMemoryManager() {
+ prevIngestMemory = 0;
+ compactionThreshold = 0.5;
+ maxObserved = 0;
+ mincIdleThresholds = new HashMap<Text,Long>();
+ }
+
+ @Override
+ public MemoryManagementActions getMemoryManagementActions(List<TabletState> tablets) {
+ if (maxMemory < 0)
+ throw new IllegalStateException("need to initialize Largst");
+ mincIdleThresholds.clear();
+ long ingestMemory = 0;
+ long compactionMemory = 0;
+ KeyExtent largestMemTablet = null;
+ long largestMemTableLoad = 0;
+ KeyExtent largestIdleMemTablet = null;
+ long largestIdleMemTableLoad = 0;
+ long mts;
+ long mcmts;
+ int numWaitingMincs = 0;
+ long idleTime;
+ long tml;
+ long ct = System.currentTimeMillis();
+
+ long largestMemTableIdleTime = -1, largestMemTableSize = -1;
+ long largestIdleMemTableIdleTime = -1, largestIdleMemTableSize = -1;
+
+ for (TabletState ts : tablets) {
+ mts = ts.getMemTableSize();
+ mcmts = ts.getMinorCompactingMemTableSize();
+ if (ts.getLastCommitTime() > 0)
+ idleTime = ct - ts.getLastCommitTime();
+ else
+ idleTime = ct - zerotime;
+ ingestMemory += mts;
+ tml = timeMemoryLoad(mts, idleTime);
+ if (mcmts == 0 && mts > 0) {
+ if (tml > largestMemTableLoad) {
+ largestMemTableLoad = tml;
+ largestMemTablet = ts.getExtent();
+ largestMemTableSize = mts;
+ largestMemTableIdleTime = idleTime;
+ }
+ Text tableId = ts.getExtent().getTableId();
+ if (!mincIdleThresholds.containsKey(tableId))
+ mincIdleThresholds.put(tableId, config.getTableConfiguration(tableId.toString()).getTimeInMillis(Property.TABLE_MINC_COMPACT_IDLETIME));
+ if (idleTime > mincIdleThresholds.get(tableId) && tml > largestIdleMemTableLoad) {
+ largestIdleMemTableLoad = tml;
+ largestIdleMemTablet = ts.getExtent();
+ largestIdleMemTableSize = mts;
+ largestIdleMemTableIdleTime = idleTime;
+ }
+ // log.debug("extent: "+ts.getExtent()+" idle threshold: "+mincIdleThresholds.get(tableId)+" idle time: "+idleTime+" memtable: "+mts+" compacting: "+mcmts);
+ }
+ // else {
+ // log.debug("skipping extent "+ts.getExtent()+", nothing in memory");
+ // }
+
+ compactionMemory += mcmts;
+ if (mcmts > 0)
+ numWaitingMincs++;
+ }
+
+ if (ingestMemory + compactionMemory > maxObserved) {
+ maxObserved = ingestMemory + compactionMemory;
+ }
+
+ long memoryChange = ingestMemory - prevIngestMemory;
+ prevIngestMemory = ingestMemory;
+
+ MemoryManagementActions mma = new MemoryManagementActions();
+ mma.tabletsToMinorCompact = new ArrayList<KeyExtent>();
+
+ boolean startMinC = false;
+
+ if (numWaitingMincs < maxConcurrentMincs * numWaitingMultiplier) {
+ // based on previous ingest memory increase, if we think that the next increase will
+ // take us over the threshold for non-compacting memory, then start a minor compaction
+ // or if the idle time of the chosen tablet is greater than the threshold, start a minor compaction
+ if (memoryChange >= 0 && ingestMemory + memoryChange > compactionThreshold * maxMemory) {
+ startMinC = true;
+ } else if (largestIdleMemTablet != null) {
+ startMinC = true;
+ // switch largestMemTablet to largestIdleMemTablet
+ largestMemTablet = largestIdleMemTablet;
+ largestMemTableLoad = largestIdleMemTableLoad;
+ largestMemTableSize = largestIdleMemTableSize;
+ largestMemTableIdleTime = largestIdleMemTableIdleTime;
+ log.debug("IDLE minor compaction chosen");
+ }
+ }
+
+ if (startMinC && largestMemTablet != null) {
+ mma.tabletsToMinorCompact.add(largestMemTablet);
+ log.debug(String.format("COMPACTING %s total = %,d ingestMemory = %,d", largestMemTablet.toString(), (ingestMemory + compactionMemory), ingestMemory));
+ log.debug(String.format("chosenMem = %,d chosenIT = %.2f load %,d", largestMemTableSize, largestMemTableIdleTime / 1000.0, largestMemTableLoad));
+ } else if (memoryChange < 0) {
+ // before idle mincs, starting a minor compaction meant that memoryChange >= 0.
+ // we thought we might want to remove the "else" if that changed,
+ // however it seems performing idle compactions shouldn't make the threshold
+ // change more often, so it is staying for now.
+ // also, now we have the case where memoryChange < 0 due to an idle compaction, yet
+ // we are still adjusting the threshold. should this be tracked and prevented?
+
+ // memory change < 0 means a minor compaction occurred
+ // we want to see how full the memory got during the compaction
+ // (the goal is for it to have between 80% and 90% memory utilization)
+ // and adjust the compactionThreshold accordingly
+
+ log.debug(String.format("BEFORE compactionThreshold = %.3f maxObserved = %,d", compactionThreshold, maxObserved));
+
+ if (compactionThreshold < 0.82 && maxObserved < 0.8 * maxMemory) {
+ // 0.82 * 1.1 is about 0.9, which is our desired max threshold
+ compactionThreshold *= 1.1;
+ } else if (compactionThreshold > 0.056 && maxObserved > 0.9 * maxMemory) {
+ // 0.056 * 0.9 is about 0.05, which is our desired min threshold
+ compactionThreshold *= 0.9;
+ }
+ maxObserved = 0;
+
+ log.debug(String.format("AFTER compactionThreshold = %.3f", compactionThreshold));
+ }
+
+ return mma;
+ }
+
+ @Override
+ public void tabletClosed(KeyExtent extent) {}
+
+ static long timeMemoryLoad(long mem, long time) {
+ double minutesIdle = time / 60000.0;
+
+ return (long) (mem * Math.pow(2, minutesIdle / 15.0));
+ }
+
+ public static void main(String[] args) {
+ for (int i = 0; i < 62; i++) {
+ System.out.printf("%d\t%d%n", i, timeMemoryLoad(1, i * 60000l));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManagementActions.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManagementActions.java b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManagementActions.java
new file mode 100644
index 0000000..3cbe25d
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/MemoryManagementActions.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.List;
+
+import org.apache.accumulo.core.data.KeyExtent;
+
+public class MemoryManagementActions {
+ public List<KeyExtent> tabletsToMinorCompact;
+}