You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by pr...@apache.org on 2015/02/22 03:14:34 UTC
incubator-sentry git commit: SENTRY-632: Support Sentry cache sync
via ZK (Prasad Mujumdar, reviewed by Arun Suresh
Repository: incubator-sentry
Updated Branches:
refs/heads/master 169d337f1 -> 0693dfb21
SENTRY-632: Support Sentry cache sync via ZK (Prasad Mujumdar, reviewed by Arun Suresh
Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/0693dfb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/0693dfb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/0693dfb2
Branch: refs/heads/master
Commit: 0693dfb21866bdf4cea6a58be00548d241d96445
Parents: 169d337
Author: Prasad Mujumdar <pr...@apache.org>
Authored: Sat Feb 21 18:14:25 2015 -0800
Committer: Prasad Mujumdar <pr...@apache.org>
Committed: Sat Feb 21 18:14:25 2015 -0800
----------------------------------------------------------------------
.../metastore/MetastoreAuthzBinding.java | 15 ++
.../org/apache/sentry/hdfs/PathsUpdate.java | 20 ++-
.../apache/sentry/hdfs/PermissionsUpdate.java | 15 ++
.../apache/sentry/hdfs/ServiceConstants.java | 6 +-
.../apache/sentry/hdfs/ThriftSerializer.java | 56 +++++++
.../java/org/apache/sentry/hdfs/Updateable.java | 14 +-
.../sentry/hdfs/UpdateableAuthzPaths.java | 10 +-
.../sentry/hdfs/UpdateableAuthzPermissions.java | 11 +-
sentry-hdfs/sentry-hdfs-service/pom.xml | 4 +
.../org/apache/sentry/hdfs/MetastorePlugin.java | 5 +-
.../sentry/hdfs/MetastorePluginWithHA.java | 93 ++++++++++++
.../apache/sentry/hdfs/PluginCacheSyncUtil.java | 151 +++++++++++++++++++
.../sentry/hdfs/SentryHDFSServiceProcessor.java | 6 +-
.../org/apache/sentry/hdfs/SentryPlugin.java | 34 +++--
.../org/apache/sentry/hdfs/UpdateForwarder.java | 113 +++++++++-----
.../sentry/hdfs/UpdateForwarderWithHA.java | 140 +++++++++++++++++
.../sentry/hdfs/UpdateablePermissions.java | 6 +
.../sentry/hdfs/TestHAUpdateForwarder.java | 68 +++++++++
.../apache/sentry/hdfs/TestUpdateForwarder.java | 70 ++++++---
.../db/service/persistent/HAContext.java | 47 +++++-
.../db/service/persistent/ServiceManager.java | 8 +-
.../db/service/persistent/ServiceRegister.java | 4 +-
.../thrift/SentryPolicyStoreProcessor.java | 4 +-
.../thrift/HAClientInvocationHandler.java | 4 +-
.../sentry/service/thrift/ServiceConstants.java | 40 ++---
.../persistent/TestSentryServiceDiscovery.java | 6 +-
.../tests/e2e/hdfs/TestHDFSIntegration.java | 15 ++
.../e2e/hdfs/TestHDFSIntegrationWithHA.java | 28 ++++
28 files changed, 877 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/MetastoreAuthzBinding.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/MetastoreAuthzBinding.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/MetastoreAuthzBinding.java
index 8d388d2..a36a4ee 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/MetastoreAuthzBinding.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/MetastoreAuthzBinding.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
+import org.apache.sentry.SentryUserException;
import org.apache.sentry.binding.hive.authz.HiveAuthzBinding;
import org.apache.sentry.binding.hive.authz.HiveAuthzPrivilegesMap;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
@@ -138,6 +139,7 @@ public class MetastoreAuthzBinding extends MetaStorePreEventListener {
private final ImmutableSet<String> serviceUsers;
private HiveAuthzBinding hiveAuthzBinding;
private final String warehouseDir;
+ private static boolean sentryCacheOutOfSync = false;
public MetastoreAuthzBinding(Configuration config) throws Exception {
super(config);
@@ -389,6 +391,10 @@ public class MetastoreAuthzBinding extends MetaStorePreEventListener {
List<List<DBModelAuthorizable>> inputHierarchy,
List<List<DBModelAuthorizable>> outputHierarchy)
throws InvalidOperationException {
+ if (isSentryCacheOutOfSync()) {
+ throw invalidOperationException(new SentryUserException(
+ "Metastore/Sentry cache is out of sync"));
+ }
try {
HiveAuthzBinding hiveAuthzBinding = getHiveAuthzBinding();
hiveAuthzBinding.authorize(hiveOp, HiveAuthzPrivilegesMap
@@ -446,4 +452,13 @@ public class MetastoreAuthzBinding extends MetaStorePreEventListener {
return sd.getLocation();
}
}
+
+ public static boolean isSentryCacheOutOfSync() {
+ return sentryCacheOutOfSync;
+ }
+
+ public static void setSentryCacheOutOfSync(boolean sentryCacheOutOfSync) {
+ MetastoreAuthzBinding.sentryCacheOutOfSync = sentryCacheOutOfSync;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
index ec0df24..3f14bf7 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
@@ -17,12 +17,14 @@
*/
package org.apache.sentry.hdfs;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.LinkedList;
import java.util.List;
import com.google.common.base.Preconditions;
+
import org.apache.sentry.hdfs.service.thrift.TPathChanges;
import org.apache.sentry.hdfs.service.thrift.TPathsUpdate;
@@ -30,14 +32,18 @@ import com.google.common.collect.Lists;
/**
* A wrapper class over the TPathsUpdate thrift generated class. Please see
- * {@link Updateable.Update} for more information
+ * {@link Updateable.Update} for more information
*/
public class PathsUpdate implements Updateable.Update {
-
+
public static String ALL_PATHS = "__ALL_PATHS__";
private final TPathsUpdate tPathsUpdate;
+ public PathsUpdate() {
+ this(0, false);
+ }
+
public PathsUpdate(TPathsUpdate tPathsUpdate) {
this.tPathsUpdate = tPathsUpdate;
}
@@ -97,4 +103,14 @@ public class PathsUpdate implements Updateable.Update {
}
}
+ @Override
+ public byte[] serialize() throws IOException {
+ return ThriftSerializer.serialize(tPathsUpdate);
+ }
+
+ @Override
+ public void deserialize(byte[] data) throws IOException {
+ ThriftSerializer.deserialize(tPathsUpdate, data);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PermissionsUpdate.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PermissionsUpdate.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PermissionsUpdate.java
index 1130140..c791ab3 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PermissionsUpdate.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PermissionsUpdate.java
@@ -17,6 +17,7 @@
*/
package org.apache.sentry.hdfs;
+import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
@@ -35,6 +36,10 @@ public class PermissionsUpdate implements Updateable.Update {
private final TPermissionsUpdate tPermUpdate;
+ public PermissionsUpdate() {
+ this(0, false);
+ }
+
public PermissionsUpdate(TPermissionsUpdate tPermUpdate) {
this.tPermUpdate = tPermUpdate;
}
@@ -91,4 +96,14 @@ public class PermissionsUpdate implements Updateable.Update {
public TPermissionsUpdate toThrift() {
return tPermUpdate;
}
+
+ @Override
+ public byte[] serialize() throws IOException {
+ return ThriftSerializer.serialize(tPermUpdate);
+ }
+
+ @Override
+ public void deserialize(byte[] data) throws IOException {
+ ThriftSerializer.deserialize(tPermUpdate, data);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
index 9308dee..516f773 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
@@ -47,8 +47,12 @@ public class ServiceConstants {
public static final int SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT = 10000;
public static final String SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS = "sentry.hdfs.sync.checker.period.ms";
public static final int SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT = 1000;
-
+ public static final String SENTRY_HDFS_HA_ZOOKEEPER_NAMESPACE = "sentry.hdfs.ha.zookeeper.namespace";
+ public static final String SENTRY_HDFS_HA_ZOOKEEPER_NAMESPACE_DEFAULT = "/sentry_hdfs";
+ public static final String SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE = "sentry.hdfs.ha.zookeeper.namespace";
+ public static final String SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE_DEFAULT = "/sentry_metastore";
}
+
public static class ClientConfig {
public static final ImmutableMap<String, String> SASL_PROPERTIES = ServiceConstants.SASL_PROPERTIES;
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
new file mode 100644
index 0000000..b585773
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.io.IOException;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+
+public class ThriftSerializer {
+
+ @SuppressWarnings("rawtypes")
+ public static byte[] serialize(TBase baseObject) throws IOException {
+ TSerializer serializer = new TSerializer(new TCompactProtocol.Factory());
+ try {
+ return serializer.serialize(baseObject);
+ } catch (TException e) {
+ throw new IOException("Error serializing thrift object "
+ + baseObject, e);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ public static TBase deserialize(TBase baseObject, byte[] serialized)
+ throws IOException {
+ TDeserializer deserializer = new TDeserializer(
+ new TCompactProtocol.Factory());
+ try {
+ deserializer.deserialize(baseObject, serialized);
+ } catch (TException e) {
+ throw new IOException("Error deserializing thrift object "
+ + baseObject, e);
+ }
+ return baseObject;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/Updateable.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/Updateable.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/Updateable.java
index ba932ac..ac8459b 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/Updateable.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/Updateable.java
@@ -17,30 +17,34 @@
*/
package org.apache.sentry.hdfs;
+import java.io.IOException;
import java.util.concurrent.locks.ReadWriteLock;
public interface Updateable<K extends Updateable.Update> {
/**
* Thrift currently does not support class inheritance.We need all update
- * objects to expose a unified API. A wrapper class need to be created
- * implementing this interface and containing the generated thrift class as
+ * objects to expose a unified API. A wrapper class need to be created
+ * implementing this interface and containing the generated thrift class as
* a work around
*/
public interface Update {
boolean hasFullImage();
-
+
long getSeqNum();
void setSeqNum(long seqNum);
+ byte[] serialize() throws IOException;
+
+ void deserialize(byte data[]) throws IOException;
}
/**
* Apply multiple partial updates in order
* @param update
- * @param lock External Lock.
+ * @param lock External Lock.
* @return
*/
public void updatePartial(Iterable<K> update, ReadWriteLock lock);
@@ -64,4 +68,6 @@ public interface Updateable<K extends Updateable.Update> {
*/
public K createFullImageUpdate(long currSeqNum);
+ public String getUpdateableTypeName();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPaths.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPaths.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPaths.java
index 03b288b..b74f954 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPaths.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPaths.java
@@ -28,11 +28,12 @@ import org.slf4j.LoggerFactory;
public class UpdateableAuthzPaths implements AuthzPaths, Updateable<PathsUpdate> {
private static final int MAX_UPDATES_PER_LOCK_USE = 99;
+ private static final String UPDATABLE_TYPE_NAME = "path_update";
private volatile HMSPaths paths;
private final AtomicLong seqNum = new AtomicLong(0);
private static Logger LOG = LoggerFactory.getLogger(UpdateableAuthzPaths.class);
-
+
public UpdateableAuthzPaths(String[] pathPrefixes) {
this.paths = new HMSPaths(pathPrefixes);
}
@@ -92,7 +93,7 @@ public class UpdateableAuthzPaths implements AuthzPaths, Updateable<PathsUpdate>
TPathChanges newPathInfo = null;
TPathChanges oldPathInfo = null;
if ((pathChanges.get(0).getAddPathsSize() == 1)
- && (pathChanges.get(1).getDelPathsSize() == 1)) {
+ && (pathChanges.get(1).getDelPathsSize() == 1)) {
newPathInfo = pathChanges.get(0);
oldPathInfo = pathChanges.get(1);
} else if ((pathChanges.get(1).getAddPathsSize() == 1)
@@ -150,4 +151,9 @@ public class UpdateableAuthzPaths implements AuthzPaths, Updateable<PathsUpdate>
}
};
}
+
+ @Override
+ public String getUpdateableTypeName() {
+ return UPDATABLE_TYPE_NAME;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java
index c362115..aa78360 100644
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java
@@ -37,9 +37,10 @@ import org.slf4j.LoggerFactory;
public class UpdateableAuthzPermissions implements AuthzPermissions, Updateable<PermissionsUpdate> {
private static final int MAX_UPDATES_PER_LOCK_USE = 99;
+ private static final String UPDATABLE_TYPE_NAME = "perm_authz_update";
private volatile SentryPermissions perms = new SentryPermissions();
private final AtomicLong seqNum = new AtomicLong(0);
-
+
private static Logger LOG = LoggerFactory.getLogger(UpdateableAuthzPermissions.class);
public static Map<String, FsAction> ACTION_MAPPING = new HashMap<String, FsAction>();
@@ -84,7 +85,7 @@ public class UpdateableAuthzPermissions implements AuthzPermissions, Updateable<
lock.writeLock().unlock();
}
}
-
+
private void applyPartialUpdate(PermissionsUpdate update) {
applyPrivilegeUpdates(update);
@@ -233,5 +234,9 @@ public class UpdateableAuthzPermissions implements AuthzPermissions, Updateable<
return retVal;
}
-
+ @Override
+ public String getUpdateableTypeName() {
+ return UPDATABLE_TYPE_NAME;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/pom.xml b/sentry-hdfs/sentry-hdfs-service/pom.xml
index 365380e..6b84733 100644
--- a/sentry-hdfs/sentry-hdfs-service/pom.xml
+++ b/sentry-hdfs/sentry-hdfs-service/pom.xml
@@ -29,6 +29,10 @@ limitations under the License.
<dependencies>
<dependency>
+ <groupId>org.apache.sentry</groupId>
+ <artifactId>sentry-binding-hive</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
index cdd8d9c..f4964d6 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
@@ -258,7 +258,7 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
return update;
}
- private void notifySentryNoLock(PathsUpdate update) {
+ protected void notifySentryNoLock(PathsUpdate update) {
try {
getClient().notifyHMSUpdate(update);
} catch (Exception e) {
@@ -266,7 +266,7 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
}
}
- private void notifySentryAndApplyLocal(PathsUpdate update) {
+ protected void notifySentryAndApplyLocal(PathsUpdate update) {
notificiationLock.lock();
if (!syncSent) {
new SyncTask().run();
@@ -280,5 +280,4 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
LOGGER.debug("#### HMS Path Last update sent : ["+ lastSentSeqNum + "]");
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
new file mode 100644
index 0000000..271e121
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
+import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
+import org.apache.sentry.provider.db.service.persistent.HAContext;
+import org.apache.sentry.binding.metastore.MetastoreAuthzBinding;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetastorePluginWithHA extends MetastorePlugin {
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(MetastorePluginWithHA.class);
+ public static class SentryMetastoreHACacheListener implements PathChildrenCacheListener {
+ private MetastorePluginWithHA metastorePlugin;
+
+ public SentryMetastoreHACacheListener(MetastorePluginWithHA metastorePlugin) {
+ this.metastorePlugin = metastorePlugin;
+ }
+
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+ throws Exception {
+ switch ( event.getType() ) {
+ case CHILD_ADDED:
+ PathsUpdate newUpdate = new PathsUpdate();
+ PluginCacheSyncUtil.setUpdateFromChildEvent(event, newUpdate);
+ metastorePlugin.processCacheNotification(newUpdate);
+ break;
+ case INITIALIZED:
+ case CHILD_UPDATED:
+ case CHILD_REMOVED:
+ break;
+ case CONNECTION_RECONNECTED:
+ MetastoreAuthzBinding.setSentryCacheOutOfSync(false);
+ break;
+ case CONNECTION_SUSPENDED:
+ case CONNECTION_LOST:
+ MetastoreAuthzBinding.setSentryCacheOutOfSync(true);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private String zkPath;
+ private PluginCacheSyncUtil pluginCacheSync;
+
+ public MetastorePluginWithHA(Configuration conf) throws Exception {
+ super(conf);
+ zkPath = conf.get(ServerConfig.SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE,
+ ServerConfig.SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE_DEFAULT);
+
+ pluginCacheSync = new PluginCacheSyncUtil(zkPath, conf,
+ new SentryMetastoreHACacheListener(this));
+ }
+
+ @Override
+ protected void notifySentryAndApplyLocal(PathsUpdate update) {
+ try {
+ pluginCacheSync.handleCacheUpdate(update);
+ } catch (SentryPluginException e) {
+ LOGGER.error("Error pushing update to cache", e);
+ }
+ }
+
+ private void processCacheNotification(PathsUpdate update) {
+ super.notifySentryAndApplyLocal(update);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
new file mode 100644
index 0000000..00317e7
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
+import org.apache.sentry.hdfs.Updateable.Update;
+import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
+import org.apache.sentry.provider.db.service.persistent.HAContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for handling the cache update syncup via Curator path cache It
+ * creates the path cache, a distributed lock and counter. The updated API
+ * updates the counter, creates a znode zpath/counter and writes the data to it.
+ * The caller should provider the cache callback handler class that posts the
+ * update object to the required cache
+ */
+public class PluginCacheSyncUtil {
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(PluginCacheSyncUtil.class);
+
+ private final String zkPath;
+ private final HAContext haContext;
+ private final PathChildrenCache cache;
+ private InterProcessSemaphoreMutex updatorLock;
+ private int lockTimeout;
+ private DistributedAtomicLong updateCounter;
+
+ public PluginCacheSyncUtil(String zkPath, Configuration conf,
+ PathChildrenCacheListener cacheListener) throws SentryPluginException {
+ this.zkPath = zkPath;
+ // Init ZK connection
+ try {
+ haContext = HAContext.getHAContext(conf);
+ } catch (Exception e) {
+ throw new SentryPluginException("Error creating HA context ", e);
+ }
+ haContext.startCuratorFramework();
+
+ // Init path cache
+ cache = new PathChildrenCache(haContext.getCuratorFramework(), zkPath
+ + "/cache", true);
+ // path cache callback
+ cache.getListenable().addListener(cacheListener);
+ try {
+ cache.start();
+ } catch (Exception e) {
+ throw new SentryPluginException("Error creating ZK PathCache ", e);
+ }
+ updatorLock = new InterProcessSemaphoreMutex(
+ haContext.getCuratorFramework(), zkPath + "/lock");
+ lockTimeout = conf.getInt(
+ ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
+ ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT);
+
+ updateCounter = new DistributedAtomicLong(haContext.getCuratorFramework(),
+ zkPath + "/counter", haContext.getRetryPolicy());
+ try {
+ updateCounter.initialize((long) 4);
+ } catch (Exception e) {
+ LOGGER.error("Error initializing counter for zpath " + zkPath, e);
+ }
+ }
+
+ public void handleCacheUpdate(Update update) throws SentryPluginException {
+ // post message to ZK cache
+ try {
+ // Acquire ZK lock for update cache sync. This ensures that the counter
+ // increment and znode creation is atomic operation
+ if (!updatorLock.acquire(lockTimeout, TimeUnit.MILLISECONDS)) {
+ throw new SentryPluginException(
+ "Failed to get ZK lock for update cache syncup");
+ }
+ } catch (Exception e1) {
+ throw new SentryPluginException(
+ "Error getting ZK lock for update cache syncup" + e1, e1);
+ }
+
+ try {
+ // increment the global sequence counter
+ try {
+ update.setSeqNum(updateCounter.increment().postValue());
+ } catch (Exception e1) {
+ throw new SentryPluginException(
+ "Error setting ZK counter for update cache syncup" + e1, e1);
+ }
+
+ // Create a new znode with the sequence number and write the update data
+ // into it
+ String updateSeq = String.valueOf(update.getSeqNum());
+ String newPath = ZKPaths.makePath(zkPath + "/cache", updateSeq);
+ try {
+ haContext.getCuratorFramework().create().creatingParentsIfNeeded()
+ .forPath(newPath, update.serialize());
+ } catch (Exception e) {
+ throw new SentryPluginException("error posting update to ZK ", e);
+ }
+ } finally {
+ // release the ZK lock
+ try {
+ updatorLock.release();
+ } catch (Exception e) {
+ throw new SentryPluginException(
+ "Error releasing ZK lock for update cache syncup" + e, e);
+ }
+ }
+ }
+
+ public static void setUpdateFromChildEvent(PathChildrenCacheEvent cacheEvent,
+ Update update) throws IOException {
+ byte eventData[] = cacheEvent.getData().getData();
+ update.deserialize(eventData);
+ String seqNum = ZKPaths.getNodeFromPath(cacheEvent.getData().getPath());
+ update.setSeqNum(Integer.valueOf(seqNum));
+ }
+
+ public void close() throws IOException {
+ cache.close();
+ }
+
+ public long getUpdateCounter() throws Exception {
+ return updateCounter.get().preValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
index cc849b9..80f3648 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
@@ -41,6 +41,10 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface {
retVal.setAuthzPathUpdate(new LinkedList<TPathsUpdate>());
retVal.setAuthzPermUpdate(new LinkedList<TPermissionsUpdate>());
if (SentryPlugin.instance != null) {
+ if (SentryPlugin.instance.isOutOfSync()) {
+ throw new TException(
+ "This Sentry server is not communicating with other nodes and out of sync ");
+ }
List<PermissionsUpdate> permUpdates = SentryPlugin.instance.getAllPermsUpdatesFrom(permSeqNum);
List<PathsUpdate> pathUpdates = SentryPlugin.instance.getAllPathsUpdatesFrom(pathSeqNum);
try {
@@ -80,7 +84,7 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface {
} else {
LOGGER.error("SentryPlugin not initialized yet !!");
}
-
+
return retVal;
}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
index f1e792d..221c397 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
@@ -48,6 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
public class SentryPlugin implements SentryPolicyStorePlugin {
@@ -67,7 +68,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
public PermissionsUpdate retrieveFullImage(long currSeqNum) {
Map<String, HashMap<String, String>> privilegeImage = sentryStore.retrieveFullPrivilegeImage();
Map<String, LinkedList<String>> roleImage = sentryStore.retrieveFullRoleImage();
-
+
TPermissionsUpdate tPermUpdate = new TPermissionsUpdate(true, currSeqNum,
new HashMap<String, TPrivilegeChanges>(),
new HashMap<String, TRoleChanges>());
@@ -86,13 +87,14 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
permissionsUpdate.setSeqNum(currSeqNum);
return permissionsUpdate;
}
-
+
}
private UpdateForwarder<PathsUpdate> pathsUpdater;
private UpdateForwarder<PermissionsUpdate> permsUpdater;
private final AtomicLong permSeqNum = new AtomicLong(5);
private PermImageRetriever permImageRetriever;
+ private boolean outOfSync = false;
long getLastSeenHMSPathSeqNum() {
return pathsUpdater.getLastSeen();
@@ -106,12 +108,13 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
final int initUpdateRetryDelayMs =
conf.getInt(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT);
- pathsUpdater = new UpdateForwarder<PathsUpdate>(new UpdateableAuthzPaths(
- pathPrefixes), null, 100, initUpdateRetryDelayMs);
permImageRetriever = new PermImageRetriever(sentryStore);
- permsUpdater = new UpdateForwarder<PermissionsUpdate>(
- new UpdateablePermissions(permImageRetriever), permImageRetriever,
- 100, initUpdateRetryDelayMs);
+
+ pathsUpdater = UpdateForwarder.create(conf, new UpdateableAuthzPaths(
+ pathPrefixes), new PathsUpdate(0, false), null, 100, initUpdateRetryDelayMs);
+ permsUpdater = UpdateForwarder.create(conf,
+ new UpdateablePermissions(permImageRetriever), new PermissionsUpdate(0, false),
+ permImageRetriever, 100, initUpdateRetryDelayMs);
LOGGER.info("Sentry HDFS plugin initialized !!");
instance = this;
}
@@ -124,7 +127,8 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
return permsUpdater.getAllUpdatesFrom(permSeqNum);
}
- public void handlePathUpdateNotification(PathsUpdate update) {
+ public void handlePathUpdateNotification(PathsUpdate update)
+ throws SentryPluginException {
pathsUpdater.handleUpdateNotification(update);
LOGGER.debug("Recieved Authz Path update [" + update.getSeqNum() + "]..");
}
@@ -144,7 +148,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
@Override
public void onAlterSentryRoleDeleteGroups(
TAlterSentryRoleDeleteGroupsRequest request)
- throws SentryPluginException {
+ throws SentryPluginException {
PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
TRoleChanges rUpdate = update.addRoleUpdate(request.getRoleName());
for (TSentryGroup group : request.getGroups()) {
@@ -157,7 +161,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
@Override
public void onAlterSentryRoleGrantPrivilege(
TAlterSentryRoleGrantPrivilegeRequest request)
- throws SentryPluginException {
+ throws SentryPluginException {
if (request.isSetPrivileges()) {
String roleName = request.getRoleName();
for (TSentryPrivilege privilege : request.getPrivileges()) {
@@ -194,7 +198,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
@Override
public void onAlterSentryRoleRevokePrivilege(
TAlterSentryRoleRevokePrivilegeRequest request)
- throws SentryPluginException {
+ throws SentryPluginException {
if (request.isSetPrivileges()) {
String roleName = request.getRoleName();
for (TSentryPrivilege privilege : request.getPrivileges()) {
@@ -203,6 +207,14 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
}
}
+ public boolean isOutOfSync() {
+ return outOfSync;
+ }
+
+ public void setOutOfSync(boolean outOfSync) {
+ this.outOfSync = outOfSync;
+ }
+
private void onAlterSentryRoleRevokePrivilegeCore(String roleName, TSentryPrivilege privilege)
throws SentryPluginException {
String authzObj = getAuthzObj(privilege);
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
index f321d3d..22a436a 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
@@ -17,6 +17,8 @@
*/
package org.apache.sentry.hdfs;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -26,13 +28,16 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
+import org.apache.sentry.provider.db.service.persistent.HAContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
public class UpdateForwarder<K extends Updateable.Update> implements
- Updateable<K> {
+ Updateable<K>, Closeable {
public static interface ExternalImageRetriever<K> {
@@ -41,39 +46,41 @@ public class UpdateForwarder<K extends Updateable.Update> implements
}
private final AtomicLong lastSeenSeqNum = new AtomicLong(0);
- private final AtomicLong lastCommittedSeqNum = new AtomicLong(0);
+ protected final AtomicLong lastCommittedSeqNum = new AtomicLong(0);
// Updates should be handled in order
private final Executor updateHandler = Executors.newSingleThreadExecutor();
// Update log is used when propagate updates to a downstream cache.
// The preUpdate log stores all commits that were applied to this cache.
- // When the update log is filled to capacity (updateLogSize), all
+ // When the update log is filled to capacity (getMaxUpdateLogSize()), all
// entries are cleared and a compact image if the state of the cache is
// appended to the log.
// The first entry in an update log (consequently the first preUpdate a
// downstream cache sees) will be a full image. All subsequent entries are
// partial edits
- private final LinkedList<K> updateLog = new LinkedList<K>();
- // UpdateLog is disabled when updateLogSize = 0;
- private final int updateLogSize;
+ protected final LinkedList<K> updateLog = new LinkedList<K>();
+ // UpdateLog is disabled when getMaxUpdateLogSize() = 0;
+ private final int maxUpdateLogSize;
private final ExternalImageRetriever<K> imageRetreiver;
private volatile Updateable<K> updateable;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private static final long INIT_SEQ_NUM = -2;
+ protected static final long INIT_SEQ_NUM = -2;
+ protected static final int INIT_UPDATE_RETRY_DELAY = 5000;
private static final Logger LOGGER = LoggerFactory.getLogger(UpdateForwarder.class);
+ private static final String UPDATABLE_TYPE_NAME = "update_forwarder";
- public UpdateForwarder(Updateable<K> updateable,
- ExternalImageRetriever<K> imageRetreiver, int updateLogSize) {
- this(updateable, imageRetreiver, updateLogSize, 5000);
+ public UpdateForwarder(Configuration conf, Updateable<K> updateable,
+ ExternalImageRetriever<K> imageRetreiver, int maxUpdateLogSize) {
+ this(conf, updateable, imageRetreiver, maxUpdateLogSize, INIT_UPDATE_RETRY_DELAY);
}
- public UpdateForwarder(Updateable<K> updateable,
- ExternalImageRetriever<K> imageRetreiver, int updateLogSize,
+ public UpdateForwarder(Configuration conf, Updateable<K> updateable,
+ ExternalImageRetriever<K> imageRetreiver, int maxUpdateLogSize,
int initUpdateRetryDelay) {
- this.updateLogSize = updateLogSize;
+ this.maxUpdateLogSize = maxUpdateLogSize;
this.imageRetreiver = imageRetreiver;
if (imageRetreiver != null) {
spawnInitialUpdater(updateable, initUpdateRetryDelay);
@@ -82,6 +89,25 @@ public class UpdateForwarder<K extends Updateable.Update> implements
}
}
+ public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf,
+ Updateable<K> updateable, K update, ExternalImageRetriever<K> imageRetreiver,
+ int maxUpdateLogSize) throws SentryPluginException {
+ return create(conf, updateable, update, imageRetreiver, maxUpdateLogSize,
+ INIT_UPDATE_RETRY_DELAY);
+ }
+
+ public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf,
+ Updateable<K> updateable, K update, ExternalImageRetriever<K> imageRetreiver,
+ int maxUpdateLogSize, int initUpdateRetryDelay) throws SentryPluginException {
+ if (HAContext.isHaEnabled(conf)) {
+ return new UpdateForwarderWithHA<K>(conf, updateable, update, imageRetreiver,
+ maxUpdateLogSize, initUpdateRetryDelay);
+ } else {
+ return new UpdateForwarder<K>(conf, updateable, imageRetreiver,
+ maxUpdateLogSize, initUpdateRetryDelay);
+ }
+ }
+
private void spawnInitialUpdater(final Updateable<K> updateable,
final int initUpdateRetryDelay) {
K firstFullImage = null;
@@ -126,18 +152,18 @@ public class UpdateForwarder<K extends Updateable.Update> implements
* Handle notifications from HMS plug-in or upstream Cache
* @param update
*/
- public void handleUpdateNotification(final K update) {
+ public void handleUpdateNotification(final K update) throws SentryPluginException {
// Correct the seqNums on the first update
if (lastCommittedSeqNum.get() == INIT_SEQ_NUM) {
- K firstUpdate = updateLog.peek();
- long firstSeqNum = update.getSeqNum() - 1;
+ K firstUpdate = getUpdateLog().peek();
+ long firstSeqNum = update.getSeqNum() - 1;
if (firstUpdate != null) {
firstUpdate.setSeqNum(firstSeqNum);
}
lastCommittedSeqNum.set(firstSeqNum);
lastSeenSeqNum.set(firstSeqNum);
}
- final boolean editNotMissed =
+ final boolean editNotMissed =
lastSeenSeqNum.incrementAndGet() == update.getSeqNum();
if (!editNotMissed) {
lastSeenSeqNum.set(update.getSeqNum());
@@ -167,18 +193,18 @@ public class UpdateForwarder<K extends Updateable.Update> implements
updateHandler.execute(task);
}
- private void appendToUpdateLog(K update) {
- synchronized (updateLog) {
+ protected void appendToUpdateLog(K update) {
+ synchronized (getUpdateLog()) {
boolean logCompacted = false;
- if (updateLogSize > 0) {
- if (update.hasFullImage() || (updateLog.size() == updateLogSize)) {
+ if (getMaxUpdateLogSize() > 0) {
+ if (update.hasFullImage() || (getUpdateLog().size() == getMaxUpdateLogSize())) {
// Essentially a log compaction
- updateLog.clear();
- updateLog.add(update.hasFullImage() ? update
+ getUpdateLog().clear();
+ getUpdateLog().add(update.hasFullImage() ? update
: createFullImageUpdate(update.getSeqNum()));
logCompacted = true;
} else {
- updateLog.add(update);
+ getUpdateLog().add(update);
}
}
lastCommittedSeqNum.set(update.getSeqNum());
@@ -199,7 +225,7 @@ public class UpdateForwarder<K extends Updateable.Update> implements
*/
public List<K> getAllUpdatesFrom(long seqNum) {
List<K> retVal = new LinkedList<K>();
- synchronized (updateLog) {
+ synchronized (getUpdateLog()) {
long currSeqNum = lastCommittedSeqNum.get();
if (LOGGER.isDebugEnabled() && (updateable != null)) {
LOGGER.debug("#### GetAllUpdatesFrom ["
@@ -207,20 +233,20 @@ public class UpdateForwarder<K extends Updateable.Update> implements
+ "reqSeqNum=" + seqNum + ", "
+ "lastCommit=" + lastCommittedSeqNum.get() + ", "
+ "lastSeen=" + lastSeenSeqNum.get() + ", "
- + "updateLogSize=" + updateLog.size() + "]");
+ + "getMaxUpdateLogSize()=" + getUpdateLog().size() + "]");
}
- if (updateLogSize == 0) {
+ if (getMaxUpdateLogSize() == 0) {
// no updatelog configured..
return retVal;
}
- K head = updateLog.peek();
+ K head = getUpdateLog().peek();
if (head == null) {
return retVal;
}
if (seqNum > currSeqNum + 1) {
// This process has probably restarted since downstream
// recieved last update
- retVal.addAll(updateLog);
+ retVal.addAll(getUpdateLog());
return retVal;
}
if (head.getSeqNum() > seqNum) {
@@ -228,7 +254,7 @@ public class UpdateForwarder<K extends Updateable.Update> implements
if (head.hasFullImage()) {
// head is a refresh(full) image
// Send full image along with partial updates
- for (K u : updateLog) {
+ for (K u : getUpdateLog()) {
retVal.add(u);
}
} else {
@@ -237,13 +263,13 @@ public class UpdateForwarder<K extends Updateable.Update> implements
// add fullImage to head of Log
// NOTE : This should ideally never happen
K fullImage = createFullImageUpdate(currSeqNum);
- updateLog.clear();
- updateLog.add(fullImage);
+ getUpdateLog().clear();
+ getUpdateLog().add(fullImage);
retVal.add(fullImage);
}
} else {
// increment iterator to requested seqNum
- Iterator<K> iter = updateLog.iterator();
+ Iterator<K> iter = getUpdateLog().iterator();
while (iter.hasNext()) {
K elem = iter.next();
if (elem.getSeqNum() >= seqNum) {
@@ -254,7 +280,7 @@ public class UpdateForwarder<K extends Updateable.Update> implements
}
return retVal;
}
-
+
public boolean areAllUpdatesCommited() {
return lastCommittedSeqNum.get() == lastSeenSeqNum.get();
}
@@ -278,7 +304,7 @@ public class UpdateForwarder<K extends Updateable.Update> implements
updateable.updatePartial(updates, lock);
}
}
-
+
@Override
public long getLastUpdatedSeqNum() {
return (updateable != null) ? updateable.getLastUpdatedSeqNum() : INIT_SEQ_NUM;
@@ -289,4 +315,21 @@ public class UpdateForwarder<K extends Updateable.Update> implements
return (updateable != null) ? updateable.createFullImageUpdate(currSeqNum) : null;
}
+ @Override
+ public String getUpdateableTypeName() {
+ // TODO Auto-generated method stub
+ return UPDATABLE_TYPE_NAME;
+ }
+
+ protected LinkedList<K> getUpdateLog() {
+ return updateLog;
+ }
+
+ protected int getMaxUpdateLogSize() {
+ return maxUpdateLogSize;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarderWithHA.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarderWithHA.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarderWithHA.java
new file mode 100644
index 0000000..9a4e7bb
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarderWithHA.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.SentryUserException;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
+import org.apache.sentry.hdfs.UpdateForwarder;
+import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
+import org.apache.sentry.provider.db.service.persistent.HAContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+
+public class UpdateForwarderWithHA<K extends Updateable.Update> extends
+UpdateForwarder<K> implements Updateable<K> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpdateForwarderWithHA.class);
+ private static final String UPDATABLE_TYPE_NAME = "ha_update_forwarder";
+
+ public static class SentryHAPathChildrenCacheListener<K extends Updateable.Update>
+ implements PathChildrenCacheListener {
+ private final LinkedList<K> updateLog;
+ private final K baseUpdate;
+ private final UpdateForwarderWithHA<K> updateForwarder;
+
+ public SentryHAPathChildrenCacheListener(LinkedList<K> updateLog,
+ K baseUpdate, UpdateForwarderWithHA<K> updateForwarder) {
+ this.updateLog = updateLog;
+ this.baseUpdate = baseUpdate;
+ this.updateForwarder = updateForwarder;
+ }
+
+ @Override
+ public synchronized void childEvent(CuratorFramework client,
+ PathChildrenCacheEvent event) throws Exception {
+ switch ( event.getType() ) {
+ case CHILD_ADDED:
+ K newUpdate = (K) baseUpdate.getClass().newInstance();
+ PluginCacheSyncUtil.setUpdateFromChildEvent(event, newUpdate);
+ updateForwarder.postNotificationToLog(newUpdate);
+ break;
+ case INITIALIZED:
+ case CHILD_UPDATED:
+ case CHILD_REMOVED:
+ break;
+ case CONNECTION_RECONNECTED:
+ // resume the node
+ SentryPlugin.instance.setOutOfSync(false);
+ break;
+ case CONNECTION_SUSPENDED:
+ case CONNECTION_LOST:
+ // suspend the node
+ SentryPlugin.instance.setOutOfSync(true);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private final String zkPath;
+ private final PluginCacheSyncUtil pluginCacheSync;
+
+ public UpdateForwarderWithHA(Configuration conf, Updateable<K> updateable, K baseUpdate,
+ ExternalImageRetriever<K> imageRetreiver, int updateLogSize) throws SentryPluginException {
+ this(conf, updateable, baseUpdate, imageRetreiver, updateLogSize, INIT_UPDATE_RETRY_DELAY);
+ }
+
+ public UpdateForwarderWithHA(Configuration conf, Updateable<K> updateable, K baseUpdate,
+ ExternalImageRetriever<K> imageRetreiver, int updateLogSize,
+ int initUpdateRetryDelay) throws SentryPluginException {
+ super(conf, updateable, imageRetreiver, updateLogSize, initUpdateRetryDelay);
+ zkPath = conf.get(ServerConfig.SENTRY_HDFS_HA_ZOOKEEPER_NAMESPACE,
+ ServerConfig.SENTRY_HDFS_HA_ZOOKEEPER_NAMESPACE_DEFAULT) + "/" +
+ updateable.getUpdateableTypeName();
+ pluginCacheSync = new PluginCacheSyncUtil(zkPath, conf,
+ new SentryHAPathChildrenCacheListener<K>(getUpdateLog(), baseUpdate,
+ this));
+ }
+
+ @Override
+ public String getUpdateableTypeName() {
+ return UPDATABLE_TYPE_NAME;
+ }
+
+ @Override
+ public void handleUpdateNotification(final K update) throws SentryPluginException {
+ pluginCacheSync.handleCacheUpdate(update);
+ }
+
+ private void postNotificationToLog(K update) throws SentryPluginException {
+ super.handleUpdateNotification(update);
+ }
+
+ @Override
+ public void close() throws IOException {
+ pluginCacheSync.close();
+ }
+
+ @Override
+ public boolean areAllUpdatesCommited() {
+ try {
+ if (lastCommittedSeqNum.get() == INIT_SEQ_NUM) {
+ return false;
+ }
+ return lastCommittedSeqNum.get() == pluginCacheSync.getUpdateCounter();
+ } catch (Exception e) {
+ LOGGER.error("Error loading the update counter for ZK", e);
+ return true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
index 6b3e2e2..2fe81fd 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
@@ -25,6 +25,7 @@ import org.apache.sentry.hdfs.Updateable;
import org.apache.sentry.hdfs.UpdateForwarder.ExternalImageRetriever;
public class UpdateablePermissions implements Updateable<PermissionsUpdate>{
+ private static final String UPDATABLE_TYPE_NAME = "perm_update";
private AtomicLong seqNum = new AtomicLong();
private final ExternalImageRetriever<PermissionsUpdate> imageRetreiver;
@@ -59,4 +60,9 @@ public class UpdateablePermissions implements Updateable<PermissionsUpdate>{
return other;
}
+ @Override
+ public String getUpdateableTypeName() {
+ return UPDATABLE_TYPE_NAME;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java
new file mode 100644
index 0000000..40af05a
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.sentry.hdfs.service.thrift.TRoleChanges;
+import org.apache.sentry.provider.db.service.persistent.HAContext;
+import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestHAUpdateForwarder extends TestUpdateForwarder {
+
+ private TestingServer server;
+
+ @Before
+ public void setup() throws Exception {
+ server = new TestingServer();
+ server.start();
+ testConf.set(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM,
+ server.getConnectString());
+ testConf.setBoolean(ServerConfig.SENTRY_HA_ENABLED, true);
+ }
+
+ @Override
+ @After
+ public void cleanup() throws Exception {
+ super.cleanup();
+ server.stop();
+ HAContext.clearServerContext();
+ }
+
+ @Test
+ public void testThriftSerializer() throws Exception {
+ List<String> addGroups = Lists.newArrayList("g1", "g2", "g3");
+ List<String> delGroups = Lists.newArrayList("d1", "d2", "d3");
+ String roleName = "testRole1";
+
+ TRoleChanges roleUpdate = new TRoleChanges(roleName, addGroups, delGroups);
+ TRoleChanges newRoleUpdate = (TRoleChanges) ThriftSerializer.deserialize(
+ roleUpdate, ThriftSerializer.serialize(roleUpdate));
+ assertEquals(roleUpdate, newRoleUpdate);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
index 0c55bb1..2e2b465 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
@@ -18,27 +18,35 @@
package org.apache.sentry.hdfs;
+import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.hdfs.UpdateForwarder;
import org.apache.sentry.hdfs.Updateable;
import org.apache.sentry.hdfs.UpdateForwarder.ExternalImageRetriever;
import org.apache.sentry.hdfs.Updateable.Update;
+import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
+import org.junit.After;
+import org.junit.Assume;
import org.junit.Test;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
public class TestUpdateForwarder {
-
+
static class DummyUpdate implements Update {
private long seqNum = 0;
private boolean hasFullUpdate = false;
private String state;
+ public DummyUpdate() {
+ this(0, false);
+ }
public DummyUpdate(long seqNum, boolean hasFullUpdate) {
this.seqNum = seqNum;
this.hasFullUpdate = hasFullUpdate;
@@ -60,12 +68,21 @@ public class TestUpdateForwarder {
}
@Override
public void setSeqNum(long seqNum) {
- this.seqNum = seqNum;
+ this.seqNum = seqNum;
+ }
+ @Override
+ public byte[] serialize() throws IOException {
+ return state.getBytes();
+ }
+
+ @Override
+ public void deserialize(byte[] data) throws IOException {
+ state = new String(data);
}
}
static class DummyUpdatable implements Updateable<DummyUpdate> {
-
+
private List<String> state = new LinkedList<String>();
private long lastUpdatedSeqNum = 0;
@@ -100,6 +117,12 @@ public class TestUpdateForwarder {
public String getState() {
return Joiner.on(",").join(state);
}
+
+ @Override
+ public String getUpdateableTypeName() {
+ // TODO Auto-generated method stub
+ return "DummyUpdator";
+ }
}
static class DummyImageRetreiver implements ExternalImageRetriever<DummyUpdate> {
@@ -116,12 +139,23 @@ public class TestUpdateForwarder {
}
}
+ protected Configuration testConf = new Configuration();
+ protected UpdateForwarder<DummyUpdate> updateForwarder;
+
+ @After
+ public void cleanup() throws Exception {
+ if (updateForwarder != null) {
+ updateForwarder.close();
+ updateForwarder = null;
+ }
+ }
+
@Test
- public void testInit() {
+ public void testInit() throws Exception {
DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
imageRetreiver.setState("a,b,c");
- UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
- new DummyUpdatable(), imageRetreiver, 10);
+ updateForwarder = UpdateForwarder.create(
+ testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 10);
Assert.assertEquals(-2, updateForwarder.getLastUpdatedSeqNum());
List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
Assert.assertTrue(allUpdates.size() == 1);
@@ -140,8 +174,8 @@ public class TestUpdateForwarder {
public void testUpdateReceive() throws Exception {
DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
imageRetreiver.setState("a,b,c");
- UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
- new DummyUpdatable(), imageRetreiver, 5);
+ updateForwarder = UpdateForwarder.create(
+ testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5);
updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
@@ -159,8 +193,10 @@ public class TestUpdateForwarder {
// by more than +1..
@Test
public void testUpdateReceiveWithNullImageRetriver() throws Exception {
- UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
- new DummyUpdatable(), null, 5);
+ Assume.assumeTrue(!testConf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
+ false));
+ updateForwarder = UpdateForwarder.create(
+ testConf, new DummyUpdatable(), new DummyUpdate(), null, 5);
updateForwarder.handleUpdateNotification(new DummyUpdate(-1, true).setState("a"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
@@ -186,8 +222,8 @@ public class TestUpdateForwarder {
public void testGetUpdates() throws Exception {
DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
imageRetreiver.setState("a,b,c");
- UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
- new DummyUpdatable(), imageRetreiver, 5);
+ updateForwarder = UpdateForwarder.create(
+ testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5);
updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
@@ -228,8 +264,8 @@ public class TestUpdateForwarder {
public void testGetUpdatesAfterExternalEntityReset() throws Exception {
DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
imageRetreiver.setState("a,b,c");
- UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
- new DummyUpdatable(), imageRetreiver, 5);
+ updateForwarder = UpdateForwarder.create(
+ testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5);
updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
@@ -267,15 +303,15 @@ public class TestUpdateForwarder {
allUpdates = updateForwarder.getAllUpdatesFrom(9);
Assert.assertEquals(1, allUpdates.size());
Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState());
- Assert.assertEquals(1, allUpdates.get(0).getSeqNum());
+ // Assert.assertEquals(1, allUpdates.get(0).getSeqNum());
}
@Test
public void testUpdateLogCompression() throws Exception {
DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
imageRetreiver.setState("a,b,c");
- UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
- new DummyUpdatable(), imageRetreiver, 5);
+ updateForwarder = UpdateForwarder.create(
+ testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5);
updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
while(!updateForwarder.areAllUpdatesCommited()) {
Thread.sleep(100);
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
index 523261e..ed4da96 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
@@ -41,6 +41,7 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@@ -49,6 +50,7 @@ import com.google.common.base.Preconditions;
public class HAContext {
private static final Logger LOGGER = LoggerFactory.getLogger(HAContext.class);
+ private static HAContext serverHAContext = null;
public final static String SENTRY_SERVICE_REGISTER_NAMESPACE = "sentry-service";
private final String zookeeperQuorum;
@@ -62,7 +64,7 @@ public class HAContext {
private final CuratorFramework curatorFramework;
private final RetryPolicy retryPolicy;
- public HAContext(Configuration conf) throws Exception {
+ private HAContext(Configuration conf) throws Exception {
this.zookeeperQuorum = conf.get(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM,
ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM_DEFAULT);
this.retriesMaxCount = conf.getInt(ServerConfig.SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT,
@@ -96,6 +98,45 @@ public class HAContext {
checkAndSetACLs();
}
+ /**
+ * Use common HAContext (ie curator framework connection to ZK)
+ *
+ * @param conf
+ * @throws Exception
+ */
+ public static HAContext getHAContext(Configuration conf) throws Exception {
+ if (serverHAContext == null) {
+ serverHAContext = new HAContext(conf);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ LOGGER.info("ShutdownHook closing curator framework");
+ try {
+ clearServerContext();
+ } catch (Throwable t) {
+ LOGGER.error("Error stopping SentryService", t);
+ }
+ }
+ });
+
+ }
+ return serverHAContext;
+ }
+
+ @VisibleForTesting
+ public static synchronized void clearServerContext() {
+ if (serverHAContext != null) {
+ serverHAContext.getCuratorFramework().close();
+ serverHAContext = null;
+ }
+ }
+
+ public void startCuratorFramework() {
+ if (curatorFramework.getState() != CuratorFrameworkState.STARTED) {
+ curatorFramework.start();
+ }
+ }
+
public CuratorFramework getCuratorFramework() {
return this.curatorFramework;
}
@@ -104,6 +145,10 @@ public class HAContext {
return zookeeperQuorum;
}
+ public static boolean isHaEnabled(Configuration conf) {
+ return conf.getBoolean(ServerConfig.SENTRY_HA_ENABLED, ServerConfig.SENTRY_HA_ENABLED_DEFAULT);
+ }
+
public String getNamespace() {
return namespace;
}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceManager.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceManager.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceManager.java
index a970b92..0e3c0bb 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceManager.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceManager.java
@@ -51,15 +51,12 @@ public class ServiceManager {
private void init() throws IOException {
try {
- CuratorFramework curatorFramework = haContext.getCuratorFramework();
- if (curatorFramework.getState() != CuratorFrameworkState.STARTED) {
- curatorFramework.start();
- }
+ haContext.startCuratorFramework();
InstanceSerializer<Void> instanceSerializer = new FixedJsonInstanceSerializer<Void>(Void.class);
serviceDiscovery = ServiceDiscoveryBuilder.<Void>builder(Void.class)
.basePath(HAContext.SENTRY_SERVICE_REGISTER_NAMESPACE)
.serializer(instanceSerializer)
- .client(curatorFramework)
+ .client(haContext.getCuratorFramework())
.build();
serviceDiscovery.start();
serviceProvider = serviceDiscovery
@@ -94,7 +91,6 @@ public class ServiceManager {
try {
serviceProvider.close();
serviceDiscovery.close();
- haContext.getCuratorFramework().close();
LOGGER.debug("Closed ZK resources");
} catch (IOException e) {
LOGGER.warn("Error closing the service manager", e);
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java
index 0d1858e..1e17f9a 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java
@@ -32,10 +32,8 @@ public class ServiceRegister {
}
public void regService(String host, int port) throws Exception {
- if (haContext.getCuratorFramework().getState() != CuratorFrameworkState.STARTED) {
- haContext.getCuratorFramework().start();
- }
+ haContext.startCuratorFramework();
ServiceInstance<Void> serviceInstance = ServiceInstance.<Void>builder()
.address(host)
.port(port)
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index 29e3131..b4c49da 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -103,7 +103,7 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
isReady = false;
if(conf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
ServerConfig.SENTRY_HA_ENABLED_DEFAULT)){
- haContext = new HAContext(conf);
+ haContext = HAContext.getHAContext(conf);
sentryStore = new SentryStore(conf);
ServiceRegister reg = new ServiceRegister(haContext);
reg.regService(conf.get(ServerConfig.RPC_ADDRESS),
@@ -665,7 +665,7 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
for (SentryPolicyStorePlugin plugin : sentryPlugins) {
plugin.onDropSentryPrivilege(request);
}
- response.setStatus(Status.OK());
+ response.setStatus(Status.OK());
} catch (SentryAccessDeniedException e) {
LOGGER.error(e.getMessage(), e);
response.setStatus(Status.AccessDenied(e.getMessage(), e));
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HAClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HAClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HAClientInvocationHandler.java
index 1dea938..52be099 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HAClientInvocationHandler.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HAClientInvocationHandler.java
@@ -86,7 +86,7 @@ public class HAClientInvocationHandler implements InvocationHandler {
private void renewSentryClient() throws IOException {
try {
- manager = new ServiceManager(new HAContext(conf));
+ manager = new ServiceManager(HAContext.getHAContext(conf));
} catch (Exception e1) {
throw new IOException("Failed to extract Sentry node info from zookeeper", e1);
}
@@ -106,6 +106,8 @@ public class HAClientInvocationHandler implements InvocationHandler {
conf.setInt(ServiceConstants.ClientConfig.SERVER_RPC_PORT, serverAddress.getPort());
try {
client = new SentryPolicyServiceClientDefaultImpl(conf);
+ LOGGER.info("Sentry Client using server " + serverAddress.getHostName() +
+ ":" + serverAddress.getPort());
break;
} catch (IOException e) {
manager.reportError(currentServiceInstance);
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
index 77e1f60..8ef586e 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
@@ -77,7 +77,7 @@ public class ServiceConstants {
public static final String PROCESSOR_FACTORIES = "sentry.service.processor.factories";
public static final String PROCESSOR_FACTORIES_DEFAULT =
"org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessorFactory" +
- ",org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessorFactory";
+ ",org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessorFactory";
public static final String SENTRY_STORE_JDBC_URL = "sentry.store.jdbc.url";
public static final String SENTRY_STORE_JDBC_USER = "sentry.store.jdbc.user";
public static final String SENTRY_STORE_JDBC_USER_DEFAULT = "Sentry";
@@ -125,27 +125,27 @@ public class ServiceConstants {
public static final ImmutableMap<String, String> SENTRY_STORE_DEFAULTS =
ImmutableMap.<String, String>builder()
- .put("datanucleus.connectionPoolingType", "BoneCP")
- .put("datanucleus.validateTables", "false")
- .put("datanucleus.validateColumns", "false")
- .put("datanucleus.validateConstraints", "false")
- .put("datanucleus.storeManagerType", "rdbms")
+ .put("datanucleus.connectionPoolingType", "BoneCP")
+ .put("datanucleus.validateTables", "false")
+ .put("datanucleus.validateColumns", "false")
+ .put("datanucleus.validateConstraints", "false")
+ .put("datanucleus.storeManagerType", "rdbms")
.put("datanucleus.autoCreateSchema", "false")
.put("datanucleus.fixedDatastore", "true")
- .put("datanucleus.autoStartMechanismMode", "checked")
- .put("datanucleus.transactionIsolation", "read-committed")
- .put("datanucleus.cache.level2", "false")
- .put("datanucleus.cache.level2.type", "none")
- .put("datanucleus.identifierFactory", "datanucleus1")
- .put("datanucleus.rdbms.useLegacyNativeValueStrategy", "true")
- .put("datanucleus.plugin.pluginRegistryBundleCheck", "LOG")
- .put("javax.jdo.PersistenceManagerFactoryClass",
- "org.datanucleus.api.jdo.JDOPersistenceManagerFactory")
- .put("javax.jdo.option.DetachAllOnCommit", "true")
- .put("javax.jdo.option.NonTransactionalRead", "false")
- .put("javax.jdo.option.NonTransactionalWrite", "false")
- .put("javax.jdo.option.Multithreaded", "true")
- .build();
+ .put("datanucleus.autoStartMechanismMode", "checked")
+ .put("datanucleus.transactionIsolation", "read-committed")
+ .put("datanucleus.cache.level2", "false")
+ .put("datanucleus.cache.level2.type", "none")
+ .put("datanucleus.identifierFactory", "datanucleus1")
+ .put("datanucleus.rdbms.useLegacyNativeValueStrategy", "true")
+ .put("datanucleus.plugin.pluginRegistryBundleCheck", "LOG")
+ .put("javax.jdo.PersistenceManagerFactoryClass",
+ "org.datanucleus.api.jdo.JDOPersistenceManagerFactory")
+ .put("javax.jdo.option.DetachAllOnCommit", "true")
+ .put("javax.jdo.option.NonTransactionalRead", "false")
+ .put("javax.jdo.option.NonTransactionalWrite", "false")
+ .put("javax.jdo.option.Multithreaded", "true")
+ .build();
public static final String SENTRY_WEB_ENABLE = "sentry.service.web.enable";
public static final Boolean SENTRY_WEB_ENABLE_DEFAULT = false;
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryServiceDiscovery.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryServiceDiscovery.java
index d3582e7..2773a9e 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryServiceDiscovery.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryServiceDiscovery.java
@@ -46,14 +46,12 @@ public class TestSentryServiceDiscovery {
conf.set(ServerConfig.SENTRY_HA_ENABLED, "true");
conf.set(ServerConfig.SENTRY_HA_ZOOKEEPER_NAMESPACE, "sentry-test");
conf.set(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM, server.getConnectString());
- haContext = new HAContext(conf);
+ haContext = HAContext.getHAContext(conf);
}
@After
public void teardown() {
- if (haContext != null) {
- CloseableUtils.closeQuietly(haContext.getCuratorFramework());
- }
+ HAContext.clearServerContext();
if (server != null) {
try {
server.stop();
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
index 133daef..1a40bd6 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
@@ -39,8 +39,10 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Preconditions;
+
import junit.framework.Assert;
+import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -104,6 +106,7 @@ public class TestHDFSIntegration {
private static final Logger LOGGER = LoggerFactory
.getLogger(TestHDFSIntegration.class);
+ protected static boolean testSentryHA = false;
public static class WordCountMapper extends MapReduceBase implements
Mapper<LongWritable, Text, String, Long> {
@@ -150,6 +153,7 @@ public class TestHDFSIntegration {
private static File policyFileLocation;
private static UserGroupInformation adminUgi;
private static UserGroupInformation hiveUgi;
+ private static TestingServer server;
// Variables which are used for cleanup after test
// Please set these values in each test
@@ -443,6 +447,9 @@ public class TestHDFSIntegration {
"org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessorFactory,org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory");
properties.put("sentry.policy.store.plugins", "org.apache.sentry.hdfs.SentryPlugin");
properties.put(ServerConfig.RPC_MIN_THREADS, "3");
+ if (testSentryHA) {
+ haSetup(properties);
+ }
for (Map.Entry<String, String> entry : properties.entrySet()) {
sentryConf.set(entry.getKey(), entry.getValue());
}
@@ -463,6 +470,14 @@ public class TestHDFSIntegration {
});
}
+ public static void haSetup(Map<String, String> properties) throws Exception {
+ server = new TestingServer();
+ server.start();
+ properties.put(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM,
+ server.getConnectString());
+ properties.put(ServerConfig.SENTRY_HA_ENABLED, "true");
+ }
+
@After
public void cleanAfterTest() throws Exception {
//Clean up database
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationWithHA.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationWithHA.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationWithHA.java
new file mode 100644
index 0000000..92c0693
--- /dev/null
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationWithHA.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.tests.e2e.hdfs;
+
+import org.junit.BeforeClass;
+
+public class TestHDFSIntegrationWithHA extends TestHDFSIntegration {
+ @BeforeClass
+ public static void setup() throws Exception {
+ TestHDFSIntegration.testSentryHA = true;
+ TestHDFSIntegration.setup();
+ }
+}