You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by sr...@apache.org on 2016/07/12 20:10:52 UTC
[2/2] sentry git commit: SENTRY-1317: Implement fencing required for
active/standby (Colin P. McCabe ,
Reviewed by: Hao Hao and Sravya Tirukkovalur)
SENTRY-1317: Implement fencing required for active/standby (Colin P. McCabe , Reviewed by: Hao Hao and Sravya Tirukkovalur)
New fencing and active/passive code
- Activator to store the state about whether the daemon is active or not, as well as manage fencing
- Create Fencer to implement SQL fencing.
- Add SqlAccessor to talk directly to SQL databases
LeaderStatus: generate shorter incarnation IDs by using base64.
LeaderStatusAdaptor: implement close()
Remove old code which is no longer used
- HAContext
- PluginCacheSyncUtil
- TestHAUpdateForwarder
- ServiceRegister
SentryStore
- move DataNucleus properties setup into a utility function
- Remove unused DEFAULT_DATA_DIR variable (it's not used anywhere in the code)
- SentryStore should maintain a reference to the Activator
Add SentryStandbyException to indicate that the daemon is currently standby
Move SENTRY_ZK_JAAS_NAME from HAContext to SentryConstants
DelegateSentryStore: make some fields final
Change-Id: Ic41711ecbc218bb21e3ca3120998866d65e16493
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/a70cff99
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/a70cff99
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/a70cff99
Branch: refs/heads/sentry-ha-redesign
Commit: a70cff999180486183e68fd9229818970f6b8fc1
Parents: de7c26a
Author: Sravya Tirukkovalur <sr...@apache.org>
Authored: Tue Jul 12 13:03:23 2016 -0700
Committer: Sravya Tirukkovalur <sr...@apache.org>
Committed: Tue Jul 12 13:10:38 2016 -0700
----------------------------------------------------------------------
.../exception/SentryStandbyException.java | 33 ++
.../core/common/utils/SentryConstants.java | 3 +
.../apache/sentry/hdfs/PluginCacheSyncUtil.java | 251 ---------------
.../sentry/hdfs/SentryHdfsMetricsUtil.java | 8 -
.../sentry/hdfs/TestHAUpdateForwarder.java | 66 ----
.../provider/db/service/persistent/Fencer.java | 242 +++++++++++++++
.../db/service/persistent/HAContext.java | 262 ----------------
.../db/service/persistent/SentryStore.java | 32 +-
.../db/service/persistent/ServiceRegister.java | 52 ----
.../db/service/persistent/SqlAccessor.java | 309 +++++++++++++++++++
.../thrift/SentryPolicyStoreProcessor.java | 31 +-
.../apache/sentry/service/thrift/Activator.java | 112 +++++++
.../sentry/service/thrift/Activators.java | 69 +++++
.../sentry/service/thrift/LeaderStatus.java | 31 +-
.../service/thrift/LeaderStatusAdaptor.java | 41 ++-
.../sentry/service/thrift/SentryService.java | 33 +-
.../sentry/service/thrift/ServiceConstants.java | 4 +
.../persistent/SentryStoreIntegrationBase.java | 15 +
.../TestPrivilegeOperatePersistence.java | 22 +-
.../db/service/persistent/TestSentryStore.java | 14 +
.../persistent/TestSentryStoreImportExport.java | 16 +-
.../service/persistent/TestSentryVersion.java | 18 ++
.../thrift/SentryServiceIntegrationBase.java | 3 +-
.../sentry/service/thrift/TestLeaderStatus.java | 26 ++
24 files changed, 982 insertions(+), 711 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java
new file mode 100644
index 0000000..73c7e4e
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryStandbyException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.exception;
+
+/**
+ * An exception which indicates that the current server is standby.
+ */
+public class SentryStandbyException extends SentryUserException {
+ private static final long serialVersionUID = 2162010615815L;
+
+ public SentryStandbyException(String msg) {
+ super(msg);
+ }
+
+ public SentryStandbyException(String msg, String reason) {
+ super(msg, reason);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java
index 3da4906..c094058 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/SentryConstants.java
@@ -40,4 +40,7 @@ public class SentryConstants {
public static final String RESOURCE_WILDCARD_VALUE_ALL = "ALL";
public static final String RESOURCE_WILDCARD_VALUE_SOME = "+";
public static final String ACCESS_ALLOW_URI_PER_DB_POLICYFILE = "sentry.allow.uri.db.policyfile";
+
+ public static final String SENTRY_ZK_JAAS_NAME = "Sentry";
+ public static final String CURRENT_INCARNATION_ID_KEY = "current.incarnation.key";
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
deleted file mode 100644
index 4ce16c7..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import java.io.IOException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import com.codahale.metrics.Timer;
-import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
-import org.apache.sentry.hdfs.Updateable.Update;
-import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
-import org.apache.sentry.provider.db.service.persistent.HAContext;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Utility class for handling the cache update syncup via Curator path cache It
- * creates the path cache, a distributed lock and counter. The updated API
- * updates the counter, creates a znode zpath/counter and writes the data to it.
- * The caller should provider the cache callback handler class that posts the
- * update object to the required cache
- */
-public class PluginCacheSyncUtil {
- private static final Logger LOGGER = LoggerFactory
- .getLogger(PluginCacheSyncUtil.class);
- public static final long CACHE_GC_SIZE_THRESHOLD_HWM = 100;
- public static final long CACHE_GC_SIZE_THRESHOLD_LWM = 50;
- public static final long CACHE_GC_SIZE_MAX_CLEANUP = 1000;
- public static final long ZK_COUNTER_INIT_VALUE = 4;
- public static final long GC_COUNTER_INIT_VALUE = ZK_COUNTER_INIT_VALUE + 1;
-
- private final String zkPath;
- private final HAContext haContext;
- private final PathChildrenCache cache;
- private InterProcessSemaphoreMutex updatorLock, gcLock;
- private int lockTimeout;
- private DistributedAtomicLong updateCounter, gcCounter;
- private final ScheduledExecutorService gcSchedulerForZk = Executors
- .newScheduledThreadPool(1);
-
- public PluginCacheSyncUtil(String zkPath, final Configuration conf,
- PathChildrenCacheListener cacheListener) throws SentryPluginException {
- this.zkPath = zkPath;
- // Init ZK connection
- try {
- haContext = HAContext.getHAContext(conf);
- } catch (Exception e) {
- throw new SentryPluginException("Error creating HA context ", e);
- }
- haContext.startCuratorFramework();
-
- // Init path cache
- cache = new PathChildrenCache(haContext.getCuratorFramework(), zkPath
- + "/cache", true);
- // path cache callback
- cache.getListenable().addListener(cacheListener);
- try {
- cache.start();
- } catch (Exception e) {
- throw new SentryPluginException("Error creating ZK PathCache ", e);
- }
- updatorLock = new InterProcessSemaphoreMutex(
- haContext.getCuratorFramework(), zkPath + "/lock");
- lockTimeout = conf.getInt(
- ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
- ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT);
- gcLock = new InterProcessSemaphoreMutex(
- haContext.getCuratorFramework(), zkPath + "/gclock");
-
- updateCounter = new DistributedAtomicLong(haContext.getCuratorFramework(),
- zkPath + "/counter", haContext.getRetryPolicy());
- try {
- updateCounter.initialize(ZK_COUNTER_INIT_VALUE);
- } catch (Exception e) {
- LOGGER.error("Error initializing counter for zpath " + zkPath, e);
- }
-
- // GC setup
- gcCounter = new DistributedAtomicLong(haContext.getCuratorFramework(),
- zkPath + "/gccounter", haContext.getRetryPolicy());
- try {
- gcCounter.initialize(GC_COUNTER_INIT_VALUE);
- } catch (Exception e) {
- LOGGER.error("Error initializing counter for zpath " + zkPath, e);
- }
- final Runnable gcRunner = new Runnable() {
- public void run() {
- gcPluginCache(conf);
- }
- };
- gcSchedulerForZk.scheduleAtFixedRate(gcRunner, 10, 10, TimeUnit.MINUTES);
- }
-
- public void handleCacheUpdate(Update update) throws SentryPluginException {
- final Timer.Context timerContext = SentryHdfsMetricsUtil.getCacheSyncToZKTimer.time();
- // post message to ZK cache
- try {
- // Acquire ZK lock for update cache sync. This ensures that the counter
- // increment and znode creation is atomic operation
- if (!updatorLock.acquire(lockTimeout, TimeUnit.MILLISECONDS)) {
- throw new SentryPluginException(
- "Failed to get ZK lock for update cache syncup");
- }
- } catch (Exception e1) {
- // Stop timer in advance
- timerContext.stop();
- SentryHdfsMetricsUtil.getFailedCacheSyncToZK.inc();
- throw new SentryPluginException(
- "Error getting ZK lock for update cache syncup" + e1, e1);
- }
- boolean failed = false;
- try {
- try {
- // increment the global sequence counter if this is not a full update
- if (!update.hasFullImage()) {
- update.setSeqNum(updateCounter.increment().postValue());
- } else {
- if (updateCounter.get().preValue() < update.getSeqNum()) {
- updateCounter.add(update.getSeqNum() - updateCounter.get().preValue());
- }
- }
- } catch (Exception e1) {
- failed = true;
- throw new SentryPluginException(
- "Error setting ZK counter for update cache syncup" + e1, e1);
- }
-
- // Create a new znode with the sequence number and write the update data
- // into it
- String updateSeq = String.valueOf(update.getSeqNum());
- String newPath = ZKPaths.makePath(zkPath + "/cache", updateSeq);
- try {
- haContext.getCuratorFramework().create().creatingParentsIfNeeded()
- .forPath(newPath, update.serialize());
- } catch (Exception e) {
- failed = true;
- throw new SentryPluginException("error posting update to ZK ", e);
- }
- } finally {
- // release the ZK lock
- try {
- updatorLock.release();
- } catch (Exception e) {
- // Stop timer in advance
- timerContext.stop();
- SentryHdfsMetricsUtil.getFailedCacheSyncToZK.inc();
- throw new SentryPluginException(
- "Error releasing ZK lock for update cache syncup" + e, e);
- }
- timerContext.stop();
- if (failed) {
- SentryHdfsMetricsUtil.getFailedCacheSyncToZK.inc();
- }
- }
- }
-
- public static void setUpdateFromChildEvent(PathChildrenCacheEvent cacheEvent,
- Update update) throws IOException {
- byte eventData[] = cacheEvent.getData().getData();
- update.deserialize(eventData);
- String seqNum = ZKPaths.getNodeFromPath(cacheEvent.getData().getPath());
- update.setSeqNum(Integer.valueOf(seqNum));
- }
-
- public void close() throws IOException {
- cache.close();
- }
-
- public long getUpdateCounter() throws Exception {
- return updateCounter.get().preValue();
- }
-
- /**
- * Cleanup old znode of the plugin cache. The last cleaned and last created
- * node counters are stored in ZK. If the number of available nodes are more
- * than the high water mark, then we delete the old nodes till we reach low
- * water mark. The scheduler periodically runs the cleanup routine
- * @param conf
- */
- @VisibleForTesting
- public void gcPluginCache(Configuration conf) {
- try {
- // If we can acquire gc lock, then continue with znode cleanup
- if (!gcLock.acquire(500, TimeUnit.MILLISECONDS)) {
- return;
- }
-
- // If we have passed the High watermark, then start the cleanup
- Long updCount = updateCounter.get().preValue();
- Long gcCount = gcCounter.get().preValue();
- if (updCount - gcCount > CACHE_GC_SIZE_THRESHOLD_HWM) {
- Long numNodesToClean = Math.min(updCount - gcCount
- - CACHE_GC_SIZE_THRESHOLD_LWM, CACHE_GC_SIZE_MAX_CLEANUP);
- for (Long nodeNum = gcCount; nodeNum < gcCount + numNodesToClean; nodeNum++) {
- String pathToDelete = ZKPaths.makePath(zkPath + "/cache",
- Long.toString(nodeNum));
- try {
- haContext.getCuratorFramework().delete().forPath(pathToDelete);
- gcCounter.increment();
- LOGGER.debug("Deleted znode " + pathToDelete);
- } catch (NoNodeException eN) {
- // We might have endup with holes in the node counter due to network/ZK errors
- // Ignore the delete error if the node doesn't exist and move on
- gcCounter.increment();
- } catch (Exception e) {
- LOGGER.info("Error cleaning up node " + pathToDelete, e);
- break;
- }
- }
- }
- } catch (Exception e) {
- LOGGER.warn("Error cleaning the cache", e);
- } finally {
- if (gcLock.isAcquiredInThisProcess()) {
- try {
- gcLock.release();
- } catch (Exception e) {
- LOGGER.warn("Error releasing gc lock", e);
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
index 5bf2f6e..e68c708 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
@@ -91,14 +91,6 @@ public class SentryHdfsMetricsUtil {
MetricRegistry.name(MetastorePlugin.class, "apply-local-update",
"path-change-size"));
- // Metrics for handleCacheUpdate to ZK in PluginCacheSyncUtil
- // The time used for each handleCacheUpdate
- public static final Timer getCacheSyncToZKTimer = sentryMetrics.getTimer(
- MetricRegistry.name(PluginCacheSyncUtil.class, "cache-sync-to-zk"));
- // The number of failed handleCacheUpdate
- public static final Counter getFailedCacheSyncToZK = sentryMetrics.getCounter(
- MetricRegistry.name(PluginCacheSyncUtil.class, "cache-sync-to-zk", "failed-num"));
-
private SentryHdfsMetricsUtil() {
// Make constructor private to avoid instantiation
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java
deleted file mode 100644
index 5246e05..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-
-import org.apache.curator.test.TestingServer;
-import org.apache.sentry.hdfs.service.thrift.TRoleChanges;
-import org.apache.sentry.provider.db.service.persistent.HAContext;
-import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class TestHAUpdateForwarder extends TestUpdateForwarder {
-
- private TestingServer server;
-
- @Before
- public void setup() throws Exception {
- server = new TestingServer();
- server.start();
- testConf.set(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM,
- server.getConnectString());
- testConf.setBoolean(ServerConfig.SENTRY_HA_ENABLED, true);
- }
-
- @Override
- @After
- public void cleanup() throws Exception {
- super.cleanup();
- server.stop();
- HAContext.clearServerContext();
- }
-
- @Test
- public void testThriftSerializer() throws Exception {
- List<String> addGroups = Lists.newArrayList("g1", "g2", "g3");
- List<String> delGroups = Lists.newArrayList("d1", "d2", "d3");
- String roleName = "testRole1";
-
- TRoleChanges roleUpdate = new TRoleChanges(roleName, addGroups, delGroups);
- TRoleChanges newRoleUpdate = (TRoleChanges) ThriftSerializer.deserialize(
- roleUpdate, ThriftSerializer.serialize(roleUpdate));
- assertEquals(roleUpdate, newRoleUpdate);
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/Fencer.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/Fencer.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/Fencer.java
new file mode 100644
index 0000000..14cdde3
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/Fencer.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import java.util.List;
+
+import com.google.common.base.Joiner;
+
+import javax.jdo.JDOException;
+import javax.jdo.JDOFatalDataStoreException;
+import javax.jdo.PersistenceManager;
+import javax.jdo.PersistenceManagerFactory;
+import javax.jdo.Query;
+import javax.jdo.Transaction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Fences the SQL database.<p/>
+ *
+ * Fencing ensures that any SQL requests that were sent by a previously active
+ * (but now standby) sentry daemon will not be honored. It also ensures that if
+ * users start up multiple non-HA sentry daemons, only one can become
+ * active.<p/>
+ *
+ * The fencer uses a special SQL table, the SENTRY_FENCE table. When a sentry
+ * process becomes active, it renames this table so that the name contains the
+ * current "incarnation ID." The incarnation ID is a randomly generated 128-bit
+ * ID, which changes each time the process is restarted. From that point
+ * forward, the sentry process includes a SELECT query for the SENTRY_FENCE
+ * table in all update transactions. This ensures that if the SENTRY_FENCE
+ * table is subsequently renamed again, those update transactions will not
+ * succeed.<p/>
+ *
+ * It is important to distinguish between fencing and leader election.
+ * ZooKeeper is responsible for leader election and ensures that there is only
+ * ever one active sentry daemon at any one time. However, sentry exists in an
+ * asynchronous network where requests from a previously active daemon may be
+ * arbitrarily delayed before reaching the SQL databse. There is also a delay
+ * between a process being "de-leadered" by ZooKeeper, and the process itself
+ * becoming aware of this situation. Java's garbage collection pauses tend to
+ * expose these kinds of race conditions. The SQL database must be prepared to
+ * reject these stale updates.<p/>
+ *
+ * Given that we need this SQL fencing, why bother with ZooKeeper at all?
+ * ZooKeeper detects when nodes have stopped responding, and elects a new
+ * leader. The SQL fencing code cannot do that.<p/>
+ */
+public class Fencer {
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(Fencer.class);
+
+ /**
+ * The base name of the sentry fencer table.<p/>
+ *
+ * We will append the incarnation ID on to this base name to make the final
+ * table name.
+ */
+ private final static String SENTRY_FENCE_TABLE_BASE = "SENTRY_FENCE";
+
+ /**
+ * The update log table name, including the incarnation ID.
+ */
+ private final String tableIncarnationName;
+
+ /**
+ * The SQL accessor that we're using.
+ */
+ private final SqlAccessor sql;
+
+ /**
+ * Create an accessor for the update log.
+ *
+ * @param incarnationId The ID of the current sentry daemon incarnation.
+ * @param pmf The PersistenceManagerFactory to use.
+ */
+ public Fencer(String incarnationId, PersistenceManagerFactory pmf) {
+ this.tableIncarnationName = String.
+ format("%s_%s", SENTRY_FENCE_TABLE_BASE, incarnationId);
+ this.sql = SqlAccessor.get(pmf);
+ }
+
+ /**
+ * Finds the name of the fencing table.<p/>
+ *
+ * The name of the update log table will always begin with SENTRY_UPDATE_LOG,
+ * but it may have the ID of a previous sentry incarnation tacked on to it.
+ *
+ * @return the current name of the update log table, or null if there is none.
+ *
+ * @throws JDOFatalDataStoreException If there is more than one sentry
+ * fencing table.
+ * JDOException If there was a JDO error.
+ */
+ private String findFencingTable(PersistenceManagerFactory pmf) {
+ // Perform a SQL query to find the name of the update log table.
+ PersistenceManager pm = pmf.getPersistenceManager();
+ Query query = pm.newQuery(SqlAccessor.JDO_SQL_ESCAPE,
+ sql.getFindTableByPrefixSql(SENTRY_FENCE_TABLE_BASE));
+ Transaction tx = pm.currentTransaction();
+ try {
+ tx.begin();
+ List<Object> results = (List<Object>) query.execute();
+ if (results.isEmpty()) {
+ return null;
+ } else if (results.size() != 1) {
+ throw new JDOFatalDataStoreException(
+ "Found more than one table whose name begins with " +
+ "SENTRY_UPDATE_LOG: " + Joiner.on(",").join(results));
+ }
+ String tableName = (String)results.get(0);
+ if (!tableName.startsWith(SENTRY_FENCE_TABLE_BASE)) {
+ throw new JDOFatalDataStoreException(
+ "The result of our attempt to locate the update log table was " +
+ "a table name which did not begin with " +
+ SENTRY_FENCE_TABLE_BASE + ", named " + tableName);
+ }
+ LOGGER.info("Found sentry update log table named " + tableName);
+ tx.commit();
+ return tableName;
+ } finally {
+ if (tx.isActive()) {
+ tx.rollback();
+ }
+ query.closeAll();
+ }
+ }
+
+ /**
+ * Creates the fencing table.
+ *
+ * @param pmf The PersistenceManagerFactory to use.
+ *
+ * @throws JDOException If there was a JDO error.
+ */
+ private void createFenceTable(PersistenceManagerFactory pmf) {
+ PersistenceManager pm = pmf.getPersistenceManager();
+ Transaction tx = pm.currentTransaction();
+ Query query = null;
+ try {
+ tx.begin();
+ query = pm.newQuery(SqlAccessor.JDO_SQL_ESCAPE,
+ sql.getCreateTableSql(tableIncarnationName));
+ query.execute();
+ tx.commit();
+ } finally {
+ if (query != null) {
+ query.closeAll();
+ }
+ if (tx.isActive()) {
+ tx.rollback();
+ }
+ pm.close();
+ }
+ }
+
+ /**
+ * Renames one table to another.
+ *
+ * @param pmf The PersistenceManagerFactory to use.
+ * @param src The table to rename
+ * @param dst The new name of the table.
+ *
+ * @throws JDOException If there was a JDO error.
+ */
+ private void renameTable(PersistenceManagerFactory pmf, String src,
+ String dst) {
+ boolean success = false;
+ PersistenceManager pm = pmf.getPersistenceManager();
+ Transaction tx = pm.currentTransaction();
+ Query query = null;
+ try {
+ tx.begin();
+ query = pm.newQuery(SqlAccessor.JDO_SQL_ESCAPE,
+ sql.getRenameTableSql(src, dst));
+ query.execute();
+ tx.commit();
+ success = true;
+ } finally {
+ if (query != null) {
+ query.closeAll();
+ }
+ if (!success) {
+ LOGGER.info("Failed to rename table " + src + " to " + dst);
+ tx.rollback();
+ }
+ pm.close();
+ }
+ }
+
+ /**
+ * Renames the update log table so that only this incarnation can modify it.
+ *
+ * @param pmf The PersistenceManagerFactory to use.
+ *
+ * @throws JDOException If there was a JDO error.
+ */
+ public void fence(PersistenceManagerFactory pmf) {
+ String curTableName = findFencingTable(pmf);
+ if (curTableName == null) {
+ createFenceTable(pmf);
+ LOGGER.info("Created sentry fence table.");
+ } else if (curTableName.equals(tableIncarnationName)) {
+ LOGGER.info("Sentry fence table is already named " +
+ tableIncarnationName);
+ } else {
+ renameTable(pmf, curTableName, tableIncarnationName);
+ LOGGER.info("Renamed sentry fence table " + curTableName + " to " +
+ tableIncarnationName);
+ }
+ }
+
+ /**
+ * Attempt to append an UpdateLogEntry to the update log.
+ */
+ void verify(PersistenceManager pm) {
+ Query query = pm.newQuery(SqlAccessor.JDO_SQL_ESCAPE,
+ sql.getFetchAllRowsSql(tableIncarnationName));
+ query.execute();
+ }
+
+ String getTableIncarnationName() {
+ return tableIncarnationName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
deleted file mode 100644
index cacc29f..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.provider.db.service.persistent;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.ACLProvider;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.imps.DefaultACLProvider;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.sentry.service.thrift.JaasConfiguration;
-import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
-import org.apache.zookeeper.ZooDefs.Perms;
-import org.apache.zookeeper.client.ZooKeeperSaslClient;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-/**
- * Stores the HA related context
- */
-public class HAContext {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(HAContext.class);
- private static volatile HAContext serverHAContext = null;
- private static boolean aclChecked = false;
-
- public final static String SENTRY_SERVICE_REGISTER_NAMESPACE = "sentry-service";
- public static final String SENTRY_ZK_JAAS_NAME = "SentryClient";
- private final String zookeeperQuorum;
- private final int retriesMaxCount;
- private final int sleepMsBetweenRetries;
- private final String namespace;
-
- private final boolean zkSecure;
- private List<ACL> saslACL;
-
- private final CuratorFramework curatorFramework;
- private final RetryPolicy retryPolicy;
-
- protected HAContext(Configuration conf) throws Exception {
- this.zookeeperQuorum = conf.get(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM,
- ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM_DEFAULT);
- this.retriesMaxCount = conf.getInt(ServerConfig.SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT,
- ServerConfig.SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT_DEFAULT);
- this.sleepMsBetweenRetries = conf.getInt(ServerConfig.SENTRY_HA_ZOOKEEPER_SLEEP_BETWEEN_RETRIES_MS,
- ServerConfig.SENTRY_HA_ZOOKEEPER_SLEEP_BETWEEN_RETRIES_MS_DEFAULT);
- this.namespace = conf.get(ServerConfig.SENTRY_HA_ZOOKEEPER_NAMESPACE,
- ServerConfig.SENTRY_HA_ZOOKEEPER_NAMESPACE_DEFAULT);
- this.zkSecure = conf.getBoolean(ServerConfig.SENTRY_HA_ZOOKEEPER_SECURITY,
- ServerConfig.SENTRY_HA_ZOOKEEPER_SECURITY_DEFAULT);
- ACLProvider aclProvider;
- validateConf();
- if (zkSecure) {
- LOGGER.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs");
- setJaasConfiguration(conf);
- System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
- SENTRY_ZK_JAAS_NAME);
- saslACL = Lists.newArrayList();
- saslACL.add(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal(conf,
- ServerConfig.PRINCIPAL))));
- saslACL.add(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal(conf,
- ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_PRINCIPAL))));
- aclProvider = new SASLOwnerACLProvider();
- String allowConnect = conf.get(ServerConfig.ALLOW_CONNECT);
-
- if (!Strings.isNullOrEmpty(allowConnect)) {
- for (String principal : Arrays.asList(allowConnect.split("\\s*,\\s*"))) {
- LOGGER.info("Adding acls for " + principal);
- saslACL.add(new ACL(Perms.ALL, new Id("sasl", principal)));
- }
- }
- } else {
- LOGGER.info("Connecting to ZooKeeper without authentication");
- aclProvider = new DefaultACLProvider();
- }
-
- retryPolicy = new RetryNTimes(retriesMaxCount, sleepMsBetweenRetries);
- this.curatorFramework = CuratorFrameworkFactory.builder()
- .namespace(this.namespace)
- .connectString(this.zookeeperQuorum)
- .retryPolicy(retryPolicy)
- .aclProvider(aclProvider)
- .build();
- startCuratorFramework();
- }
-
- /**
- * Use common HAContext (ie curator framework connection to ZK)
- *
- * @param conf
- * @throws Exception
- */
- public static HAContext getHAContext(Configuration conf) throws Exception {
- if (serverHAContext == null) {
- serverHAContext = new HAContext(conf);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- LOGGER.info("ShutdownHook closing curator framework");
- try {
- clearServerContext();
- } catch (Throwable t) {
- LOGGER.error("Error stopping SentryService", t);
- }
- }
- });
-
- }
- return serverHAContext;
- }
-
- // HA context for server which verifies the ZK ACLs on namespace
- public static HAContext getHAServerContext(Configuration conf) throws Exception {
- HAContext serverContext = getHAContext(conf);
- serverContext.checkAndSetACLs();
- return serverContext;
- }
-
- @VisibleForTesting
- public static synchronized void clearServerContext() {
- if (serverHAContext != null) {
- serverHAContext.getCuratorFramework().close();
- serverHAContext = null;
- }
- }
-
- public void startCuratorFramework() {
- if (curatorFramework.getState() != CuratorFrameworkState.STARTED) {
- curatorFramework.start();
- }
- }
-
- public CuratorFramework getCuratorFramework() {
- return this.curatorFramework;
- }
-
- public String getZookeeperQuorum() {
- return zookeeperQuorum;
- }
-
- public static boolean isHaEnabled(Configuration conf) {
- return conf.getBoolean(ServerConfig.SENTRY_HA_ENABLED, ServerConfig.SENTRY_HA_ENABLED_DEFAULT);
- }
-
- public String getNamespace() {
- return namespace;
- }
-
- public RetryPolicy getRetryPolicy() {
- return retryPolicy;
- }
-
- private void validateConf() {
- Preconditions.checkNotNull(zookeeperQuorum, "Zookeeper Quorum should not be null.");
- Preconditions.checkNotNull(namespace, "Zookeeper namespace should not be null.");
- }
-
- protected String getServicePrincipal(Configuration conf, String confProperty)
- throws IOException {
- String principal = conf.get(confProperty);
- Preconditions.checkNotNull(principal);
- Preconditions.checkArgument(principal.length() != 0, "Server principal is not right.");
- return principal.split("[/@]")[0];
- }
-
- private void checkAndSetACLs() throws Exception {
- if (zkSecure && !aclChecked) {
- // If znodes were previously created without security enabled, and now it is, we need to go through all existing znodes
- // and set the ACLs for them. This is done just once at the startup
- // We can't get the namespace znode through curator; have to go through zk client
- startCuratorFramework();
- String newNamespace = "/" + curatorFramework.getNamespace();
- if (curatorFramework.getZookeeperClient().getZooKeeper().exists(newNamespace, null) != null) {
- List<ACL> acls = curatorFramework.getZookeeperClient().getZooKeeper().getACL(newNamespace, new Stat());
- if (acls.isEmpty() || !acls.get(0).getId().getScheme().equals("sasl")) {
- LOGGER.info("'sasl' ACLs not set; setting...");
- List<String> children = curatorFramework.getZookeeperClient().getZooKeeper().getChildren(newNamespace, null);
- for (String child : children) {
- checkAndSetACLs("/" + child);
- }
- curatorFramework.getZookeeperClient().getZooKeeper().setACL(newNamespace, saslACL, -1);
- }
- }
- aclChecked = true;
-
- }
- }
-
- private void checkAndSetACLs(String path) throws Exception {
- LOGGER.info("Setting acls on " + path);
- List<String> children = curatorFramework.getChildren().forPath(path);
- for (String child : children) {
- checkAndSetACLs(path + "/" + child);
- }
- curatorFramework.setACL().withACL(saslACL).forPath(path);
- }
-
- // This gets ignored during most tests, see ZKXTestCaseWithSecurity#setupZKServer()
- private void setJaasConfiguration(Configuration conf) throws IOException {
- if ("false".equalsIgnoreCase(conf.get(
- ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE,
- ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE_DEFAULT))) {
- String keytabFile = conf.get(ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_KEYTAB);
- Preconditions.checkArgument(keytabFile.length() != 0, "Keytab File is not right.");
- String principal = conf.get(ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_PRINCIPAL);
- principal = SecurityUtil.getServerPrincipal(principal,
- conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT));
- Preconditions.checkArgument(principal.length() != 0, "Kerberos principal is not right.");
-
- // This is equivalent to writing a jaas.conf file and setting the system property, "java.security.auth.login.config", to
- // point to it (but this way we don't have to write a file, and it works better for the tests)
- JaasConfiguration.addEntryForKeytab(SENTRY_ZK_JAAS_NAME, principal, keytabFile);
- } else {
- // Create jaas conf for ticket cache
- JaasConfiguration.addEntryForTicketCache(SENTRY_ZK_JAAS_NAME);
- }
- javax.security.auth.login.Configuration.setConfiguration(JaasConfiguration.getInstance());
- }
-
- public class SASLOwnerACLProvider implements ACLProvider {
- @Override
- public List<ACL> getDefaultAcl() {
- return saslACL;
- }
-
- @Override
- public List<ACL> getAclForPath(String path) {
- return saslACL;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 7dad496..6e367e5 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -71,6 +71,8 @@ import org.apache.sentry.provider.db.service.thrift.TSentryMappingData;
import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
import org.apache.sentry.provider.db.service.thrift.TSentryPrivilegeMap;
import org.apache.sentry.provider.db.service.thrift.TSentryRole;
+import org.apache.sentry.service.thrift.Activator;
+import org.apache.sentry.service.thrift.Activators;
import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope;
import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
import org.datanucleus.store.rdbms.exceptions.MissingTableException;
@@ -102,7 +104,6 @@ public class SentryStore {
public static final String NULL_COL = "__NULL__";
public static int INDEX_GROUP_ROLES_MAP = 0;
public static int INDEX_USER_ROLES_MAP = 1;
- static final String DEFAULT_DATA_DIR = "sentry_policy_db";
private static final Set<String> ALL_ACTIONS = Sets.newHashSet(AccessConstants.ALL,
AccessConstants.SELECT, AccessConstants.INSERT, AccessConstants.ALTER,
@@ -116,6 +117,11 @@ public class SentryStore {
AccessConstants.ACTION_ALL.toLowerCase(), AccessConstants.SELECT, AccessConstants.INSERT);
/**
+ * The activator object which tells us whether the current daemon is active.
+ */
+ private final Activator act;
+
+ /**
* Commit order sequence id. This is used by notification handlers
* to know the order in which events where committed to the database.
* This instance variable is incremented in incrementGetSequenceId
@@ -128,10 +134,8 @@ public class SentryStore {
private PrivCleaner privCleaner = null;
private Thread privCleanerThread = null;
- public SentryStore(Configuration conf) throws SentryNoSuchObjectException,
- SentryAccessDeniedException, SentrySiteConfigurationException, IOException {
- commitSequenceId = 0;
- this.conf = conf;
+ public static Properties getDataNucleusProperties(Configuration conf)
+ throws SentrySiteConfigurationException, IOException {
Properties prop = new Properties();
prop.putAll(ServerConfig.SENTRY_STORE_DEFAULTS);
String jdbcUrl = conf.get(ServerConfig.SENTRY_STORE_JDBC_URL, "").trim();
@@ -164,8 +168,19 @@ public class SentryStore {
prop.setProperty(key, entry.getValue());
}
}
+ // Disallow operations outside of transactions
+ prop.setProperty("datanucleus.NontransactionalRead", "false");
+ prop.setProperty("datanucleus.NontransactionalWrite", "false");
+ return prop;
+ }
-
+ public SentryStore(Configuration conf)
+ throws SentryNoSuchObjectException, SentryAccessDeniedException,
+ SentrySiteConfigurationException, IOException {
+ this.act = Activators.INSTANCE.get(conf);
+ commitSequenceId = 0;
+ this.conf = conf;
+ Properties prop = getDataNucleusProperties(conf);
boolean checkSchemaVersion = conf.get(
ServerConfig.SENTRY_VERIFY_SCHEM_VERSION,
ServerConfig.SENTRY_VERIFY_SCHEM_VERSION_DEFAULT).equalsIgnoreCase(
@@ -175,11 +190,6 @@ public class SentryStore {
prop.setProperty("datanucleus.autoCreateSchema", "true");
prop.setProperty("datanucleus.fixedDatastore", "false");
}
-
- // Disallow operations outside of transactions
- prop.setProperty("datanucleus.NontransactionalRead", "false");
- prop.setProperty("datanucleus.NontransactionalWrite", "false");
-
pmf = JDOHelper.getPersistenceManagerFactory(prop);
verifySentryStoreSchema(checkSchemaVersion);
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java
deleted file mode 100644
index 79dfe48..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.provider.db.service.persistent;
-
-import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
-import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.curator.x.discovery.details.InstanceSerializer;
-
-public class ServiceRegister {
-
- private HAContext haContext;
-
- public ServiceRegister(HAContext haContext) {
- this.haContext = haContext;
- }
-
- public void regService(String host, int port) throws Exception {
-
- haContext.startCuratorFramework();
- ServiceInstance<Void> serviceInstance = ServiceInstance.<Void>builder()
- .address(host)
- .port(port)
- .name(HAContext.SENTRY_SERVICE_REGISTER_NAMESPACE)
- .build();
-
- InstanceSerializer<Void> instanceSerializer = new FixedJsonInstanceSerializer<Void>(Void.class);
- ServiceDiscoveryBuilder.builder(Void.class)
- .basePath(HAContext.SENTRY_SERVICE_REGISTER_NAMESPACE)
- .client(haContext.getCuratorFramework())
- .serializer(instanceSerializer)
- .thisInstance(serviceInstance)
- .build()
- .start();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SqlAccessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SqlAccessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SqlAccessor.java
new file mode 100644
index 0000000..9879e67
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SqlAccessor.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import java.sql.Connection;
+
+import javax.jdo.PersistenceManager;
+import javax.jdo.PersistenceManagerFactory;
+import javax.jdo.datastore.JDOConnection;
+
+/**
+ * An accessor for a SQL database.
+ *
+ * SqlAccessor objects generate raw SQL statements in a variety of dialects.
+ * We use this to do stuff that the DataNucleus architects didn't anticipate,
+ * like rename tables or search for tables by a prefix name.<p/>
+ *
+ * This class exists only to implement fencing. While it's theoretically
+ * possible to do other things with it, it is almost always better to use the
+ * functionality provided by DataNucleus if it is at all possible.<p/>
+ *
+ * Note: do NOT pass any untrusted user input into these functions. You must
+ * NOT create SQL statements from unsanitized user input because they may expose
+ * you to SQL injection attacks. Use prepared statements if you need to do that
+ * (yes, it's possible via DataNucleus.)<p/>
+ */
+abstract class SqlAccessor {
+ /**
+ * The string which we can use with PersistenceManager#newQuery to perform raw
+ * SQL operations.
+ */
+ final static String JDO_SQL_ESCAPE = "javax.jdo.query.SQL";
+
+ /**
+ * Get an accessor for the SQL database that we're using.
+ *
+ * @return The singleton accessor instance for the SQL database we are using.
+ *
+ * @throws RuntimeException If there was an error loading the SqlAccessor.
+ * This could happen because we don't know the
+ * type of database that we're using. In theory
+ * it could also happen because JDO is being run
+ * against something that is not a SQL databse at
+ * all.
+ */
+ static SqlAccessor get(PersistenceManagerFactory pmf) {
+ String productName = getProductNameString(pmf).toLowerCase();
+ if (productName.contains("postgresql")) {
+ return PostgresSqlAccessor.INSTANCE;
+ } else if (productName.contains("mysql")) {
+ return MySqlSqlAccessor.INSTANCE;
+ } else if (productName.contains("oracle")) {
+ return OracleSqlAccessor.INSTANCE;
+ } else if (productName.contains("derby")) {
+ return DerbySqlAccessor.INSTANCE;
+ } else if (productName.contains("db2")) {
+ return Db2SqlAccessor.INSTANCE;
+ } else {
+ throw new RuntimeException("Unknown database type " +
+ "'" + productName + "'. Supported database types are " +
+ "postgres, mysql, oracle, mssql, and derby.");
+ }
+ }
+
+ /**
+ * @return An string describing the type of database that we're using.
+ */
+ static private String getProductNameString(PersistenceManagerFactory pmf) {
+ PersistenceManager pm = pmf.getPersistenceManager();
+ JDOConnection jdoConn = pm.getDataStoreConnection();
+ try {
+ return ((Connection)jdoConn.getNativeConnection()).getMetaData().
+ getDatabaseProductName();
+ } catch (Throwable t) {
+ throw new RuntimeException("Error retrieving database product " +
+ "name", t);
+ } finally {
+ // We must release the connection before we call other pm methods.
+ jdoConn.close();
+ }
+ }
+
+ /**
+ * Get the name of this database.
+ *
+ * @return The name of this databse.
+ */
+ abstract String getDatabaseName();
+
+ /**
+ * Get the SQL for finding a table that starts with the given prefix.
+ *
+ * @param prefix The prefix of the table to find.
+ * @return The SQL.
+ */
+ abstract String getFindTableByPrefixSql(String prefix);
+
+ /**
+ * Get the SQL for creating a table with the given name.
+ *
+ * @param name The name of the table to create.
+ * @return The SQL.
+ */
+ abstract String getCreateTableSql(String name);
+
+ /**
+ * Get the SQL for renaming a table.
+ *
+ * @param src The name of the table to rename.
+ * @param dst The new name to give to the table.
+ * @return The SQL.
+ */
+ abstract String getRenameTableSql(String src, String dst);
+
+ /**
+ * Get the SQL for fetching all rows from the given table.
+ *
+ * @param name The table name.
+ * @return The SQL.
+ */
+ abstract String getFetchAllRowsSql(String name);
+
+ /**
+ * The postgres database type.<p/>
+ *
+ * Postgres is case-senstitive, but will translate all identifiers to
+ * lowercase unless you quote them. So we quote all identifiers when using
+ * postgres.
+ */
+ private static class PostgresSqlAccessor extends SqlAccessor {
+ static final PostgresSqlAccessor INSTANCE = new PostgresSqlAccessor();
+
+ @Override
+ String getDatabaseName() {
+ return "postgres";
+ }
+
+ @Override
+ String getFindTableByPrefixSql(String prefix) {
+ return "SELECT table_name FROM information_schema.tables " +
+ "WHERE table_name LIKE '" + prefix + "%'";
+ }
+
+ @Override
+ String getCreateTableSql(String name) {
+ return "CREATE TABLE \"" + name + "\" (\"VAL\" VARCHAR(512))";
+ }
+
+ @Override
+ String getRenameTableSql(String src, String dst) {
+ return "ALTER TABLE \"" + src + "\" RENAME TO \"" + dst + "\"";
+ }
+
+ @Override
+ String getFetchAllRowsSql(String tableName) {
+ return "SELECT * FROM \"" + tableName + "\"";
+ }
+ }
+
+ /**
+ * The MySQL database type.<p/>
+ *
+ * MySQL can't handle quotes unless specifically configured to accept them.
+ */
+ private static class MySqlSqlAccessor extends SqlAccessor {
+ static final MySqlSqlAccessor INSTANCE = new MySqlSqlAccessor();
+
+ @Override
+ String getDatabaseName() {
+ return "mysql";
+ }
+
+ @Override
+ String getFindTableByPrefixSql(String prefix) {
+ return "SELECT table_name FROM information_schema.tables " +
+ "WHERE table_name LIKE " + prefix + "%";
+ }
+
+ @Override
+ String getCreateTableSql(String name) {
+ return "CREATE TABLE " + name + " (VAL VARCHAR(512))";
+ }
+
+ @Override
+ String getRenameTableSql(String src, String dst) {
+ return "RENAME TABLE " + src + " TO " + dst;
+ }
+
+ @Override
+ String getFetchAllRowsSql(String tableName) {
+ return "SELECT * FROM " + tableName;
+ }
+ }
+
+ /**
+ * The Oracle database type.<p/>
+ */
+ private static class OracleSqlAccessor extends SqlAccessor {
+ static final OracleSqlAccessor INSTANCE = new OracleSqlAccessor();
+
+ @Override
+ String getDatabaseName() {
+ return "oracle";
+ }
+
+ @Override
+ String getFindTableByPrefixSql(String prefix) {
+ return "SELECT table_name FROM all_tables " +
+ "WHERE table_name LIKE " + prefix + "%";
+ }
+
+ @Override
+ String getCreateTableSql(String name) {
+ return "CREATE TABLE " + name + " (VAL VARCHAR(512))";
+ }
+
+ @Override
+ String getRenameTableSql(String src, String dst) {
+ return "RENAME TABLE " + src + " TO " + dst;
+ }
+
+ @Override
+ String getFetchAllRowsSql(String tableName) {
+ return "SELECT * FROM " + tableName;
+ }
+ }
+
+ /**
+ * The Derby database type.</p>
+ */
+ private static class DerbySqlAccessor extends SqlAccessor {
+ static final DerbySqlAccessor INSTANCE = new DerbySqlAccessor();
+
+ @Override
+ String getFindTableByPrefixSql(String prefix) {
+ return "SELECT tablename FROM SYS.SYSTABLES " +
+ "WHERE tablename LIKE '" + prefix + "%'";
+ }
+
+ @Override
+ String getCreateTableSql(String name) {
+ return "CREATE TABLE " + name + " (VAL VARCHAR(512))";
+ }
+
+ @Override
+ String getRenameTableSql(String src, String dst) {
+ return "RENAME TABLE " + src + " TO " + dst;
+ }
+
+ @Override
+ String getDatabaseName() {
+ return "derby";
+ }
+
+ @Override
+ String getFetchAllRowsSql(String tableName) {
+ return "SELECT * FROM " + tableName;
+ }
+ }
+
+ /**
+ * The DB2 database type.</p>
+ */
+ private static class Db2SqlAccessor extends SqlAccessor {
+ static final Db2SqlAccessor INSTANCE = new Db2SqlAccessor();
+
+ @Override
+ String getFindTableByPrefixSql(String prefix) {
+ return "SELECT tablename FROM SYS.SYSTABLES " +
+ "WHERE tablename LIKE '" + prefix + "%'";
+ }
+
+ @Override
+ String getCreateTableSql(String name) {
+ return "CREATE TABLE " + name + " (VAL VARCHAR(512))";
+ }
+
+ @Override
+ String getRenameTableSql(String src, String dst) {
+ return "RENAME TABLE " + src + " TO " + dst;
+ }
+
+ @Override
+ String getDatabaseName() {
+ return "db2";
+ }
+
+ @Override
+ String getFetchAllRowsSql(String tableName) {
+ return "SELECT * FROM " + tableName;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index 3de1f65..19daa75 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -46,9 +46,7 @@ import org.apache.sentry.provider.db.log.entity.JsonLogEntity;
import org.apache.sentry.provider.db.log.entity.JsonLogEntityFactory;
import org.apache.sentry.provider.db.log.util.Constants;
import org.apache.sentry.provider.db.service.persistent.CommitContext;
-import org.apache.sentry.provider.db.service.persistent.HAContext;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.apache.sentry.provider.db.service.persistent.ServiceRegister;
import org.apache.sentry.provider.db.service.thrift.PolicyStoreConstants.PolicyStoreServerConfig;
import org.apache.sentry.service.thrift.SentryServiceUtil;
import org.apache.sentry.service.thrift.ServiceConstants;
@@ -84,30 +82,18 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
private final SentryStore sentryStore;
private final NotificationHandlerInvoker notificationHandlerInvoker;
private final ImmutableSet<String> adminGroups;
- private boolean isReady;
SentryMetrics sentryMetrics;
- private HAContext haContext;
private List<SentryPolicyStorePlugin> sentryPlugins = new LinkedList<SentryPolicyStorePlugin>();
- public SentryPolicyStoreProcessor(String name, Configuration conf) throws Exception {
+ public SentryPolicyStoreProcessor(String name,
+ Configuration conf) throws Exception {
super();
this.name = name;
this.conf = conf;
this.notificationHandlerInvoker = new NotificationHandlerInvoker(conf,
createHandlers(conf));
- isReady = false;
- if (conf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
- ServerConfig.SENTRY_HA_ENABLED_DEFAULT)) {
- haContext = HAContext.getHAServerContext(conf);
- sentryStore = new SentryStore(conf);
- ServiceRegister reg = new ServiceRegister(haContext);
- reg.regService(conf.get(ServerConfig.RPC_ADDRESS),
- conf.getInt(ServerConfig.RPC_PORT,ServerConfig.RPC_PORT_DEFAULT));
- } else {
- sentryStore = new SentryStore(conf);
- }
- isReady = true;
+ sentryStore = new SentryStore(conf);
adminGroups = ImmutableSet.copyOf(toTrimedLower(Sets.newHashSet(conf.getStrings(
ServerConfig.ADMIN_GROUPS, new String[]{}))));
Iterable<String> pluginClasses = ConfUtilties.CLASS_SPLITTER
@@ -149,16 +135,7 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
}
public void stop() {
- if (isReady) {
- sentryStore.stop();
- }
- if (haContext != null) {
- try {
- haContext.getCuratorFramework().close();
- } catch (Exception e) {
- LOGGER.warn("Error in stopping processor", e);
- }
- }
+ sentryStore.stop();
}
public void registerPlugin(SentryPolicyStorePlugin plugin) throws SentryPluginException {
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java
new file mode 100644
index 0000000..0b7ddf5
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activator.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.provider.db.service.persistent.Fencer;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jdo.JDOHelper;
+import javax.jdo.PersistenceManagerFactory;
+
+/**
+ * The activator is used to access and modify the activation state of the sentry daemon.<p/>
+ */
+public class Activator implements Closeable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Activator.class);
+
+ /**
+ * The DataNucleus PersistenceManagerFactory to use.
+ */
+ private final PersistenceManagerFactory pmf;
+
+ /**
+ * The handler for LeaderStatus callbacks.
+ */
+ private final TransitionHandler handler;
+
+ /**
+ * LeaderStatus generates callbacks to let us know when we are active or
+ * standby. When HA is enabled, it manages ZK sessions.
+ */
+ private final LeaderStatus leaderStatus;
+
+ /**
+ * The fencer object.
+ */
+ private final Fencer fencer;
+
+ /**
+ * True if the Activator is active.
+ */
+ private boolean active;
+
+ public Activator(Configuration conf) throws Exception {
+ Properties props = SentryStore.getDataNucleusProperties(conf);
+ this.pmf = JDOHelper.getPersistenceManagerFactory(props);
+ this.handler = new TransitionHandler();
+ this.leaderStatus = new LeaderStatus(handler, conf);
+ this.fencer = new Fencer(this.leaderStatus.getIncarnationId(), pmf);
+ this.active = false;
+ this.leaderStatus.start();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.leaderStatus.close();
+ this.pmf.close();
+ }
+
+ private class TransitionHandler implements LeaderStatus.Listener {
+ @Override
+ public void becomeActive() throws Exception {
+ synchronized (Activator.this) {
+ if (!active) {
+ LOGGER.info("Activating " + leaderStatus.getIncarnationId());
+ fencer.fence(pmf);
+ active = true;
+ }
+ }
+ }
+
+ @Override
+ public void becomeStandby() {
+ synchronized (Activator.this) {
+ if (active) {
+ LOGGER.info("Deactivating " + leaderStatus.getIncarnationId());
+ active = false;
+ }
+ }
+ }
+ }
+
+ synchronized boolean isActive() {
+ return active;
+ }
+
+ public synchronized String getIncarnationId() {
+ return leaderStatus.getIncarnationId();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java
new file mode 100644
index 0000000..37b0219
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/Activators.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.utils.SentryConstants;
+
+/**
+ * A global map from incarnation IDs to Activator objects.<p/>
+ *
+ * This is used to access the current global Activator. Normally there will
+ * only be one Activator used in a sentry process. There may be multiple
+ * Activator objects in the case where we are running unit tests.
+ */
+public enum Activators {
+ INSTANCE;
+
+ private final HashMap<String, Activator> acts = new HashMap<String, Activator>();
+
+ Activators() {}
+
+ public synchronized void put(Activator act) {
+ acts.put(act.getIncarnationId(), act);
+ }
+
+ public Activator get(Configuration conf) {
+ String key = conf.get(SentryConstants.CURRENT_INCARNATION_ID_KEY);
+ if (key == null) {
+ throw new RuntimeException("No " +
+ SentryConstants.CURRENT_INCARNATION_ID_KEY + "set.");
+ }
+ return get(key);
+ }
+
+ public synchronized Activator get(String incarnationId) {
+ Activator act = acts.get(incarnationId);
+ if (act == null) {
+ throw new RuntimeException("No activator found with " +
+ "incarnationId " + incarnationId);
+ }
+ return act;
+ }
+
+ public synchronized void remove(Activator act) {
+ Activator removed = acts.remove(act.getIncarnationId());
+ if (removed == null) {
+ throw new RuntimeException("No activator found with " +
+ "incarnationId " + act.getIncarnationId());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java
index e846766..e32e1db 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatus.java
@@ -16,10 +16,10 @@
*/
package org.apache.sentry.service.thrift;
-import org.apache.commons.codec.binary.Hex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.datanucleus.util.Base64;
import java.io.Closeable;
import java.io.IOException;
@@ -79,13 +79,36 @@ final class LeaderStatus implements Closeable {
private final AtomicBoolean closed = new AtomicBoolean(false);
/**
- * Generate a 128-bit random ID.
+ * Generate a very long random ID.
+ *
+ * We want a name that doesn't start with a number, and which
+ * contains only letters and numbers. This is important because
+ * the incarnation ID gets used in SQL databases to name a table.
*/
static String generateIncarnationId() {
SecureRandom srand = new SecureRandom();
- byte[] buf = new byte[32];
+ byte[] buf = new byte[33];
srand.nextBytes(buf);
- return "sentry_" + Hex.encodeHexString(buf);
+ char[] cbuf = Base64.encode(buf);
+ StringBuilder bld = new StringBuilder();
+ for (int i = 0; i < cbuf.length; i++) {
+ boolean safe;
+ if (i == 0) {
+ // Some databases can't handle identiifers that start with numbers,
+ // so always start with a letter. Also replace '+' or '/' with
+ // something safe.
+ safe = Character.isLetter(cbuf[i]);
+ } else {
+ // Replace '+' or '/' with something safe.
+ safe = Character.isLetterOrDigit(cbuf[i]);
+ }
+ if (!safe) {
+ bld.append((char)('a' + srand.nextInt(26)));
+ } else {
+ bld.append(cbuf[i]);
+ }
+ }
+ return bld.toString();
}
LeaderStatus(Listener listener, Configuration conf) throws Exception {
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java
index 80a6571..33a5e7b 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/LeaderStatusAdaptor.java
@@ -81,6 +81,11 @@ final class LeaderStatusAdaptor
private long becomeLeaderCount = 0;
/**
+ * True only if this LeaderStatusAdaptor is closed.
+ */
+ private boolean isClosed = false;
+
+ /**
* True only if this incarnation is currently active.
*/
private boolean isActive = false;
@@ -112,9 +117,36 @@ final class LeaderStatusAdaptor
this.leaderSelector.start();
}
+ /**
+ * Shut down the LeaderStatusAdaptor and wait for it to transition to
+ * standby.
+ */
@Override
public void close() throws IOException {
+ // If the adaptor is already closed, calling close again is a no-op.
+ // Setting isClosed also prevents activation after this point.
+ lock.lock();
+ try {
+ if (isClosed) {
+ return;
+ }
+ isClosed = true;
+ } finally {
+ lock.unlock();
+ }
+
+ // Shut down our Curator hooks.
leaderSelector.close();
+
+ // Wait for the adaptor to transition to standby state.
+ lock.lock();
+ try {
+ while (isActive) {
+ cond.awaitUninterruptibly();
+ }
+ } finally {
+ lock.unlock();
+ }
}
/**
@@ -148,9 +180,14 @@ final class LeaderStatusAdaptor
public void takeLeadership(CuratorFramework client) throws Exception {
lock.lock();
try {
+ if (isClosed) {
+ LOG.info("LeaderStatusAdaptor: can't become active because the " +
+ "adaptor is closed.");
+ return;
+ }
isActive = true;
becomeLeaderCount++;
- LOG.info("SentryLeaderSelectorClient: becoming active. " +
+ LOG.info("LeaderStatusAdaptor: becoming active. " +
"becomeLeaderCount=" + becomeLeaderCount);
listener.becomeActive();
while (isActive) {
@@ -158,7 +195,7 @@ final class LeaderStatusAdaptor
}
} finally {
isActive = false;
- LOG.info("SentryLeaderSelectorClient: becoming standby");
+ LOG.info("LeaderStatusAdaptor: becoming standby");
try {
listener.becomeStandby();
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index 809af06..531ab35 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.sentry.Command;
+import org.apache.sentry.core.common.utils.SentryConstants;
import org.apache.sentry.provider.db.service.thrift.SentryHealthCheckServletContextListener;
import org.apache.sentry.provider.db.service.thrift.SentryMetrics;
import org.apache.sentry.provider.db.service.thrift.SentryMetricsServletContextListener;
@@ -95,11 +96,10 @@ public class SentryService implements Callable {
private SentryWebServer sentryWebServer;
private long maxMessageSize;
private final boolean isHA;
- private volatile boolean isActive = false;
+ private final Activator act;
SentryMetrics sentryMetrics;
- private final LeaderStatus leaderStatus;
- public SentryService(Configuration conf) {
+ public SentryService(Configuration conf) throws Exception {
this.conf = conf;
int port = conf
.getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT);
@@ -153,25 +153,10 @@ public class SentryService implements Callable {
+ (count++));
}
});
- try {
- leaderStatus = new LeaderStatus(
- new LeaderStatus.Listener() {
- @Override
- public void becomeActive() throws Exception {
- LOGGER.info("Activating " + leaderStatus.getIncarnationId());
- isActive = true;
- }
-
- @Override
- public void becomeStandby() {
- LOGGER.info("Deactivating " + leaderStatus.getIncarnationId());
- isActive = false;
- }
- }, conf);
- leaderStatus.start(); // TODO: move this into call?
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ this.act = new Activator(conf);
+ conf.set(SentryConstants.CURRENT_INCARNATION_ID_KEY,
+ this.act.getIncarnationId());
+ Activators.INSTANCE.put(act);
webServerPort = conf.getInt(ServerConfig.SENTRY_WEB_PORT, ServerConfig.SENTRY_WEB_PORT_DEFAULT);
status = Status.NOT_STARTED;
}
@@ -307,7 +292,7 @@ public class SentryService implements Callable {
public synchronized void stop() throws Exception{
MultiException exception = null;
LOGGER.info("Attempting to stop...");
- leaderStatus.close();
+ act.close();
if (isRunning()) {
LOGGER.info("Attempting to stop sentry thrift service...");
try {
@@ -462,7 +447,7 @@ public class SentryService implements Callable {
return new Gauge<Boolean>() {
@Override
public Boolean getValue() {
- return isActive;
+ return act.isActive();
}
};
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
index 0ab8192..abc3f58 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
@@ -141,6 +141,7 @@ public class ServiceConstants {
.put("datanucleus.transactionIsolation", "read-committed")
.put("datanucleus.cache.level2", "false")
.put("datanucleus.cache.level2.type", "none")
+ .put("datanucleus.query.sql.allowAll", "true")
.put("datanucleus.identifierFactory", "datanucleus1")
.put("datanucleus.rdbms.useLegacyNativeValueStrategy", "true")
.put("datanucleus.plugin.pluginRegistryBundleCheck", "LOG")
@@ -258,4 +259,7 @@ public class ServiceConstants {
TABLE,
COLUMN
}
+
+ public static final String SENTRY_ZK_JAAS_NAME = "Sentry";
+ public static final String CURRENT_INCARNATION_ID_KEY = "current.incarnation.key";
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java
index f14b586..5999580 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/SentryStoreIntegrationBase.java
@@ -21,7 +21,11 @@ import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
import org.apache.sentry.provider.file.PolicyFile;
+import org.apache.sentry.service.thrift.Activator;
+import org.apache.sentry.service.thrift.Activators;
+import org.apache.sentry.service.thrift.ServiceConstants;
import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
import org.junit.After;
import org.junit.AfterClass;
@@ -34,6 +38,7 @@ public abstract class SentryStoreIntegrationBase {
private static File dataDir;
private static File policyFilePath;
protected static Configuration conf;
+ protected static Activator act;
protected static DelegateSentryStore sentryStore;
protected static PolicyFile policyFile;
@@ -57,6 +62,9 @@ public abstract class SentryStoreIntegrationBase {
policyFilePath = new File(Files.createTempDir(), "local_policy_file.ini");
conf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE,
policyFilePath.getPath());
+ act = new Activator(conf);
+ conf.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY, act.getIncarnationId());
+ Activators.INSTANCE.put(act);
}
@After
@@ -66,6 +74,9 @@ public abstract class SentryStoreIntegrationBase {
@AfterClass
public static void teardown() {
+ if (act != null) {
+ IOUtils.cleanup(null, act);
+ }
if (sentryStore != null) {
sentryStore.close();
}
@@ -75,6 +86,10 @@ public abstract class SentryStoreIntegrationBase {
if (policyFilePath != null) {
FileUtils.deleteQuietly(policyFilePath);
}
+ if (act != null) {
+ Activators.INSTANCE.remove(act);
+ act = null;
+ }
}
public static void addGroupsToUser(String user, String... groupNames) {
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java
index 799d5ef..7c66db4 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/persistent/TestPrivilegeOperatePersistence.java
@@ -37,6 +37,8 @@ import org.apache.sentry.core.model.sqoop.SqoopActionConstant;
import org.apache.sentry.core.common.exception.SentryGrantDeniedException;
import org.apache.sentry.provider.db.generic.service.persistent.PrivilegeObject.Builder;
import org.apache.sentry.provider.file.PolicyFile;
+import org.apache.sentry.service.thrift.Activator;
+import org.apache.sentry.service.thrift.Activators;
import org.apache.sentry.service.thrift.ServiceConstants;
import org.junit.Before;
import org.junit.Test;
@@ -987,8 +989,14 @@ public class TestPrivilegeOperatePersistence extends SentryStoreIntegrationBase
Configuration confCopy = new Configuration(conf);
confCopy.set(String.format(ServiceConstants.ServerConfig.SENTRY_COMPONENT_ACTION_FACTORY_FORMAT, externalComponent),
InvalidActionFactory.class.getName());
- SentryStoreLayer store = new DelegateSentryStore(confCopy);
+ Activator act = new Activator(confCopy);
+ confCopy.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY,
+ act.getIncarnationId());
+ Activators.INSTANCE.put(act);
+ SentryStoreLayer store = new DelegateSentryStore(confCopy);
testGrantPrivilege(store, externalComponent);
+ act.close();
+ Activators.INSTANCE.remove(act);
}
@Test
@@ -997,8 +1005,14 @@ public class TestPrivilegeOperatePersistence extends SentryStoreIntegrationBase
Configuration confCopy = new Configuration(conf);
confCopy.set(String.format(ServiceConstants.ServerConfig.SENTRY_COMPONENT_ACTION_FACTORY_FORMAT, externalComponent),
MyComponentActionFactory.class.getName());
+ Activator act = new Activator(confCopy);
+ confCopy.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY,
+ act.getIncarnationId());
+ Activators.INSTANCE.put(act);
SentryStoreLayer store = new DelegateSentryStore(confCopy);
testGrantPrivilege(store, externalComponent);
+ act.close();
+ Activators.INSTANCE.remove(act);
}
@Test
@@ -1007,8 +1021,14 @@ public class TestPrivilegeOperatePersistence extends SentryStoreIntegrationBase
Configuration confCopy = new Configuration(conf);
confCopy.set(String.format(ServiceConstants.ServerConfig.SENTRY_COMPONENT_ACTION_FACTORY_FORMAT, "mycomponent"),
MyComponentActionFactory.class.getName());
+ Activator act = new Activator(confCopy);
+ confCopy.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY,
+ act.getIncarnationId());
+ Activators.INSTANCE.put(act);
SentryStoreLayer store = new DelegateSentryStore(confCopy);
testGrantPrivilege(store, externalComponent);
+ act.close();
+ Activators.INSTANCE.remove(act);
}
private void testGrantPrivilege(SentryStoreLayer sentryStore, String component) throws SentryUserException {
http://git-wip-us.apache.org/repos/asf/sentry/blob/a70cff99/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
index 3ef1cf7..6e00505 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
@@ -27,6 +27,7 @@ import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.security.alias.UserProvider;
@@ -44,6 +45,9 @@ import org.apache.sentry.provider.db.service.thrift.TSentryGroup;
import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
import org.apache.sentry.provider.db.service.thrift.TSentryRole;
import org.apache.sentry.provider.file.PolicyFile;
+import org.apache.sentry.service.thrift.Activator;
+import org.apache.sentry.service.thrift.Activators;
+import org.apache.sentry.service.thrift.ServiceConstants;
import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
import org.junit.After;
import org.junit.AfterClass;
@@ -67,6 +71,7 @@ public class TestSentryStore extends org.junit.Assert {
final long NUM_PRIVS = 60; // > SentryStore.PrivCleaner.NOTIFY_THRESHOLD
private static Configuration conf = null;
private static char[] passwd = new char[] { '1', '2', '3'};
+ private static Activator act;
@BeforeClass
public static void setup() throws Exception {
@@ -89,6 +94,10 @@ public class TestSentryStore extends org.junit.Assert {
policyFilePath = new File(dataDir, "local_policy_file.ini");
conf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE,
policyFilePath.getPath());
+ act = new Activator(conf);
+ conf.set(ServiceConstants.CURRENT_INCARNATION_ID_KEY,
+ act.getIncarnationId());
+ Activators.INSTANCE.put(act);
sentryStore = new SentryStore(conf);
}
@@ -107,12 +116,17 @@ public class TestSentryStore extends org.junit.Assert {
@AfterClass
public static void teardown() {
+ IOUtils.cleanup(null, act);
if (sentryStore != null) {
sentryStore.stop();
}
if (dataDir != null) {
FileUtils.deleteQuietly(dataDir);
}
+ if (act != null) {
+ Activators.INSTANCE.remove(act);
+ act = null;
+ }
}
@Test