You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2015/05/19 21:04:46 UTC
incubator-ranger git commit: RANGER-265 Policy manager should timeout
if a service is not responding to lookup requests in time.
Repository: incubator-ranger
Updated Branches:
refs/heads/master 68d01056c -> 154c49041
RANGER-265 Policy manager should timeout if a service is not responding to lookup requests in time.
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/154c4904
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/154c4904
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/154c4904
Branch: refs/heads/master
Commit: 154c49041863f040ce99c5d45fa5e996968ced96
Parents: 68d0105
Author: Alok Lal <al...@hortonworks.com>
Authored: Fri May 15 10:00:01 2015 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue May 19 11:57:28 2015 -0700
----------------------------------------------------------------------
.../plugin/service/ResourceLookupContext.java | 7 +-
security-admin/pom.xml | 1 -
.../org/apache/ranger/biz/ServiceDBStore.java | 2 +-
.../java/org/apache/ranger/biz/ServiceMgr.java | 178 +++++++++++++++--
.../org/apache/ranger/common/RangerFactory.java | 33 ++++
.../org/apache/ranger/common/TimedExecutor.java | 160 ++++++++++++++++
.../common/TimedExecutorConfigurator.java | 93 +++++++++
.../apache/ranger/service/RangerFactory.java | 33 ----
.../conf.dist/ranger-admin-default-site.xml | 27 +++
.../apache/ranger/common/TestTimedExecutor.java | 190 +++++++++++++++++++
10 files changed, 669 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java b/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java
index 913f824..a8b8ac0 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/service/ResourceLookupContext.java
@@ -27,8 +27,8 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@JsonAutoDetect(getterVisibility=Visibility.NONE, setterVisibility=Visibility.NONE, fieldVisibility=Visibility.ANY)
@@ -82,4 +82,9 @@ public class ResourceLookupContext {
public void setResources(Map<String, List<String>> resources) {
this.resources = resources;
}
+
+ @Override
+ public String toString() {
+ return String.format("ResourceLookupContext={resourceName=%s,userInput=%s,resources=%s}", resourceName, userInput, resources);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/pom.xml
----------------------------------------------------------------------
diff --git a/security-admin/pom.xml b/security-admin/pom.xml
index 9783d1f..3c26837 100644
--- a/security-admin/pom.xml
+++ b/security-admin/pom.xml
@@ -251,7 +251,6 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
----------------------------------------------------------------------
diff --git a/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java b/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
index 009cbf8..2c9ceff 100644
--- a/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
+++ b/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
@@ -40,6 +40,7 @@ import org.apache.ranger.common.PasswordUtils;
import org.apache.ranger.common.RESTErrorUtil;
import org.apache.ranger.common.RangerCommonEnums;
import org.apache.ranger.common.RangerConstants;
+import org.apache.ranger.common.RangerFactory;
import org.apache.ranger.common.StringUtil;
import org.apache.ranger.common.UserSessionBase;
import org.apache.ranger.db.RangerDaoManager;
@@ -106,7 +107,6 @@ import org.apache.ranger.plugin.util.SearchFilter;
import org.apache.ranger.plugin.util.ServicePolicies;
import org.apache.ranger.service.RangerAuditFields;
import org.apache.ranger.service.RangerDataHistService;
-import org.apache.ranger.service.RangerFactory;
import org.apache.ranger.service.RangerPolicyService;
import org.apache.ranger.service.RangerPolicyWithAssignedIdService;
import org.apache.ranger.service.RangerServiceDefService;
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java
----------------------------------------------------------------------
diff --git a/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java b/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java
index 8498fbf..576090f 100644
--- a/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java
+++ b/security-admin/src/main/java/org/apache/ranger/biz/ServiceMgr.java
@@ -23,13 +23,18 @@ import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.common.PropertiesUtil;
+import org.apache.ranger.common.TimedExecutor;
import org.apache.ranger.plugin.client.HadoopException;
import org.apache.ranger.plugin.model.RangerService;
import org.apache.ranger.plugin.model.RangerServiceDef;
@@ -54,6 +59,9 @@ public class ServiceMgr {
@Autowired
ServiceDBStore svcDBStore;
+ @Autowired
+ TimedExecutor timedExecutor;
+
public List<String> lookupResource(String serviceName, ResourceLookupContext context, ServiceStore svcStore) throws Exception {
List<String> ret = null;
@@ -69,18 +77,9 @@ public class ServiceMgr {
}
if(svc != null) {
- ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
-
- try {
- Thread.currentThread().setContextClassLoader(svc.getClass().getClassLoader());
-
- ret = svc.lookupResource(context);
- } catch (Exception e) {
- LOG.error("==> ServiceMgr.lookupResource Error:" + e);
- throw e;
- } finally {
- Thread.currentThread().setContextClassLoader(clsLoader);
- }
+ LookupCallable callable = new LookupCallable(svc, context);
+ long time = getTimeoutValueForLookupInMilliSeconds(svc);
+ ret = timedExecutor.timedTask(callable, time, TimeUnit.MILLISECONDS);
}
if(LOG.isDebugEnabled()) {
@@ -103,12 +102,11 @@ public class ServiceMgr {
}
if(svc != null) {
- ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
-
try {
- Thread.currentThread().setContextClassLoader(svc.getClass().getClassLoader());
-
- HashMap<String, Object> responseData = svc.validateConfig();
+ // Timeout value use during validate config is 10 times that used during lookup
+ long time = getTimeoutValueForValidateConfigInMilliSeconds(svc);
+ ValidateCallable callable = new ValidateCallable(svc);
+ HashMap<String, Object> responseData = timedExecutor.timedTask(callable, time, TimeUnit.MILLISECONDS);
ret = generateResponseForTestConn(responseData, "");
} catch (Exception e) {
@@ -120,8 +118,6 @@ public class ServiceMgr {
}
ret = generateResponseForTestConn(respData, msg);
LOG.error("==> ServiceMgr.validateConfig Error:" + e);
- } finally {
- Thread.currentThread().setContextClassLoader(clsLoader);
}
}
@@ -344,5 +340,149 @@ public class ServiceMgr {
vXResponse.setStatusCode(statusCode);
return vXResponse;
}
+
+ static final long _DefaultTimeoutValue_Lookp = 1000; // 1 s
+ static final long _DefaultTimeoutValue_ValidateConfig = 10000; // 10 s
+
+ long getTimeoutValueForLookupInMilliSeconds(RangerBaseService svc) {
+ return getTimeoutValueInMilliSeconds("resource.lookup", svc, _DefaultTimeoutValue_Lookp);
+ }
+
+ long getTimeoutValueForValidateConfigInMilliSeconds(RangerBaseService svc) {
+ return getTimeoutValueInMilliSeconds("validate.config", svc, _DefaultTimeoutValue_ValidateConfig);
+ }
+
+ long getTimeoutValueInMilliSeconds(final String type, RangerBaseService svc, long defaultValue) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("==> ServiceMgr.getTimeoutValueInMilliSeconds (%s, %s)", type, svc));
+ }
+ String propertyName = type + ".timeout.value.in.ms"; // type == "lookup" || type == "validate-config"
+
+ Long result = null;
+ Map<String, String> config = svc.getConfigs();
+ if (config != null && config.containsKey(propertyName)) {
+ result = parseLong(config.get(propertyName));
+ }
+ if (result != null) {
+ LOG.debug("Found override in service config!");
+ } else {
+ String[] keys = new String[] {
+ "ranger.service." + svc.getServiceName() + "." + propertyName,
+ "ranger.servicetype." + svc.getServiceType() + "." + propertyName,
+ "ranger." + propertyName
+ };
+ for (String key : keys) {
+ String value = PropertiesUtil.getProperty(key);
+ if (value != null) {
+ result = parseLong(value);
+ if (result != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using the value[" + value + "] found in property[" + key + "]");
+ }
+ break;
+ }
+ }
+ }
+ }
+ if (result == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No overrides found in service config of properties file. Using supplied default of[" + defaultValue + "]!");
+ }
+ result = defaultValue;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("<== ServiceMgr.getTimeoutValueInMilliSeconds (%s, %s): %s", type, svc, result));
+ }
+ return result;
+ }
+
+ Long parseLong(String str) {
+ try {
+ return Long.valueOf(str);
+ } catch (NumberFormatException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ServiceMgr.parseLong: could not parse [" + str + "] as Long! Returning null");
+ }
+ return null;
+ }
+ }
+
+ abstract static class TimedCallable<T> implements Callable<T> {
+
+ final RangerBaseService svc;
+ final Date creation; // NOTE: This would be different from when the callable was actually offered to the executor
+
+ public TimedCallable(RangerBaseService svc) {
+ this.svc = svc;
+ this.creation = new Date();
+ }
+
+ @Override
+ public T call() throws Exception {
+ Date start = null;
+ if (LOG.isDebugEnabled()) {
+ start = new Date();
+ LOG.debug("==> TimedCallable: " + toString());
+ }
+
+ ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(svc.getClass().getClassLoader());
+ return actualCall();
+ } catch (Exception e) {
+ LOG.error("TimedCallable.call: Error:" + e);
+ throw e;
+ } finally {
+ Thread.currentThread().setContextClassLoader(clsLoader);
+ if (LOG.isDebugEnabled()) {
+ Date finish = new Date();
+ long waitTime = start.getTime() - creation.getTime();
+ long executionTime = finish.getTime() - start.getTime();
+ LOG.debug(String.format("<== TimedCallable: %s: wait time[%d ms], execution time [%d ms]", toString(), waitTime, executionTime));
+ }
+ }
+ }
+
+ abstract T actualCall() throws Exception;
+ }
+
+ static class LookupCallable extends TimedCallable<List<String>> {
+
+ final ResourceLookupContext context;
+
+ public LookupCallable(final RangerBaseService svc, final ResourceLookupContext context) {
+ super(svc);
+ this.context = context;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("lookup resource[%s] for service[%s], ", context.toString(), svc.getServiceName());
+ }
+
+ @Override
+ public List<String> actualCall() throws Exception {
+ List<String> ret = svc.lookupResource(context);
+ return ret;
+ }
+ }
+
+ static class ValidateCallable extends TimedCallable<HashMap<String, Object>> {
+
+ public ValidateCallable(RangerBaseService svc) {
+ super(svc);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("validate config for service[%s]", svc.getServiceName());
+ }
+
+ @Override
+ public HashMap<String, Object> actualCall() throws Exception {
+ return svc.validateConfig();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/main/java/org/apache/ranger/common/RangerFactory.java
----------------------------------------------------------------------
diff --git a/security-admin/src/main/java/org/apache/ranger/common/RangerFactory.java b/security-admin/src/main/java/org/apache/ranger/common/RangerFactory.java
new file mode 100644
index 0000000..29d972e
--- /dev/null
+++ b/security-admin/src/main/java/org/apache/ranger/common/RangerFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.common;
+
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerPolicyResourceSignature;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Service;
+
+@Service
+@Scope("singleton")
+public class RangerFactory {
+ public RangerPolicyResourceSignature createPolicyResourceSignature(RangerPolicy policy) {
+ return new RangerPolicyResourceSignature(policy);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/main/java/org/apache/ranger/common/TimedExecutor.java
----------------------------------------------------------------------
diff --git a/security-admin/src/main/java/org/apache/ranger/common/TimedExecutor.java b/security-admin/src/main/java/org/apache/ranger/common/TimedExecutor.java
new file mode 100644
index 0000000..643d882
--- /dev/null
+++ b/security-admin/src/main/java/org/apache/ranger/common/TimedExecutor.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.common;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.log4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Service;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+@Service
+@Scope("singleton")
+public class TimedExecutor {
+
+ static final private Logger LOG = Logger.getLogger(TimedExecutor.class);
+
+ @Autowired
+ TimedExecutorConfigurator _configurator;
+
+ ExecutorService _executorService;
+
+ public TimedExecutor() {
+ }
+
+ @PostConstruct
+ void initialize() {
+ initialize(_configurator);
+ }
+
+ // Not designed for public access - only for testability
+ void initialize(TimedExecutorConfigurator configurator) {
+ final ThreadFactory _ThreadFactory = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("timed-executor-pool-%d")
+ .setUncaughtExceptionHandler(new LocalUncaughtExceptionHandler())
+ .build();
+
+ final BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(configurator.getBlockingQueueSize());
+
+ _executorService = new LocalThreadPoolExecutor(configurator.getCoreThreadPoolSize(), configurator.getMaxThreadPoolSize(),
+ configurator.getKeepAliveTime(), configurator.getKeepAliveTimeUnit(),
+ blockingQueue, _ThreadFactory);
+ }
+
+ public <T> T timedTask(Callable<T> callable, long time, TimeUnit unit) throws Exception{
+ try {
+ Future<T> future = _executorService.submit(callable);
+ if (LOG.isDebugEnabled()) {
+ if (future.isCancelled()) {
+ LOG.debug("Got back a future that was cancelled already for callable[" + callable + "]!");
+ }
+ }
+ try {
+ T result = future.get(time, unit);
+ return result;
+ } catch (CancellationException | ExecutionException | InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("TimedExecutor: Caught exception[%s] for callable[%s]: detail[%s]. Re-throwing...", e.getClass().getName(), callable, e.getMessage()));
+ }
+ throw e;
+ } catch (TimeoutException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("TimedExecutor: Timed out waiting for callable[%s] to finish. Cancelling the task.", callable));
+ }
+ boolean interruptRunningTask = true;
+ future.cancel(interruptRunningTask);
+ LOG.debug("TimedExecutor: Re-throwing timeout exception to caller");
+ throw e;
+ }
+ } catch (RejectedExecutionException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executor rejected callable[" + callable + "], due to resource exhaustion. Rethrowing exception...");
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Not designed for public access. Non-private only for testability. Expected to be called by tests to do proper cleanup.
+ */
+ void shutdown() {
+ _executorService.shutdownNow();
+ }
+
+ static class LocalUncaughtExceptionHandler implements UncaughtExceptionHandler {
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ String message = String.format("TimedExecutor: Uncaught exception hanlder received exception[%s] in thread[%s]", t.getClass().getName(), t.getName());
+ LOG.warn(message, e);
+ }
+ }
+
+ static class LocalThreadPoolExecutor extends ThreadPoolExecutor {
+
+ private ThreadLocal<Long> startNanoTime = new ThreadLocal<Long>();
+
+ public LocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ }
+
+ @Override
+ protected void beforeExecute(Thread t, Runnable r) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TimedExecutor: Starting execution of a task.");
+ startNanoTime.set(System.nanoTime());
+ }
+ super.beforeExecute(t, r);
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ if (LOG.isDebugEnabled()) {
+ long duration = System.nanoTime() - startNanoTime.get();
+ LOG.debug("TimedExecutor: Done execution of task. Duration[" + duration/1000000 + " ms].");
+ }
+ }
+
+ @Override
+ protected void terminated() {
+ super.terminated();
+ LOG.info("TimedExecutor: thread pool has terminated");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/main/java/org/apache/ranger/common/TimedExecutorConfigurator.java
----------------------------------------------------------------------
diff --git a/security-admin/src/main/java/org/apache/ranger/common/TimedExecutorConfigurator.java b/security-admin/src/main/java/org/apache/ranger/common/TimedExecutorConfigurator.java
new file mode 100644
index 0000000..1b43abe
--- /dev/null
+++ b/security-admin/src/main/java/org/apache/ranger/common/TimedExecutorConfigurator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.common;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Service;
+
+@Service
+@Scope("singleton")
+public class TimedExecutorConfigurator {
+
+ // these two are important and hence are user configurable.
+ static final String Property_MaxThreadPoolSize = "ranger.timed.executor.max.threadpool.size";
+ static final String Property_QueueSize = "ranger.timed.executor.queue.size";
+ // We need these default-defaults since default-site.xml file isn't inside the jar, i.e. file itself may be missing or values in it might be messed up! :(
+ static final int _DefaultMaxThreadPoolSize = 10;
+ static final private int _DefaultBlockingQueueSize = 100;
+
+
+ private int _maxThreadPoolSize;
+ private int _blockingQueueSize;
+ // The following are hard-coded for now and can be exposed if there is a pressing need.
+ private int _coreThreadPoolSize = 1;
+ private long _keepAliveTime = 10;
+ private TimeUnit _keepAliveTimeUnit = TimeUnit.SECONDS;
+
+ public TimedExecutorConfigurator() {
+ }
+
+ // Infrequently used class (once per lifetime of policy manager) hence, values read from property file aren't cached.
+ @PostConstruct
+ void initialize() {
+ Integer value = PropertiesUtil.getIntProperty(Property_MaxThreadPoolSize);
+ if (value == null) {
+ _maxThreadPoolSize = _DefaultMaxThreadPoolSize;
+ } else {
+ _maxThreadPoolSize = value;
+ }
+
+ value = PropertiesUtil.getIntProperty(Property_QueueSize);
+ if (value == null) {
+ _blockingQueueSize = _DefaultBlockingQueueSize;
+ } else {
+ _blockingQueueSize = value;
+ }
+ }
+ /**
+ * Provided mostly only testability.
+ * @param maxThreadPoolSize
+ * @param blockingQueueSize
+ */
+ public TimedExecutorConfigurator(int maxThreadPoolSize, int blockingQueueSize) {
+ _maxThreadPoolSize = maxThreadPoolSize;
+ _blockingQueueSize = blockingQueueSize;
+ }
+
+ public int getCoreThreadPoolSize() {
+ return _coreThreadPoolSize;
+ }
+ public int getMaxThreadPoolSize() {
+ return _maxThreadPoolSize;
+ }
+ public long getKeepAliveTime() {
+ return _keepAliveTime;
+ }
+ public TimeUnit getKeepAliveTimeUnit() {
+ return _keepAliveTimeUnit;
+ }
+ public int getBlockingQueueSize() {
+ return _blockingQueueSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/main/java/org/apache/ranger/service/RangerFactory.java
----------------------------------------------------------------------
diff --git a/security-admin/src/main/java/org/apache/ranger/service/RangerFactory.java b/security-admin/src/main/java/org/apache/ranger/service/RangerFactory.java
deleted file mode 100644
index 7834262..0000000
--- a/security-admin/src/main/java/org/apache/ranger/service/RangerFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.service;
-
-import org.apache.ranger.plugin.model.RangerPolicy;
-import org.apache.ranger.plugin.model.RangerPolicyResourceSignature;
-import org.springframework.context.annotation.Scope;
-import org.springframework.stereotype.Service;
-
-@Service
-@Scope("singleton")
-public class RangerFactory {
- public RangerPolicyResourceSignature createPolicyResourceSignature(RangerPolicy policy) {
- return new RangerPolicyResourceSignature(policy);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml
----------------------------------------------------------------------
diff --git a/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml b/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml
index 571d2a1..0783f69 100644
--- a/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml
+++ b/security-admin/src/main/resources/conf.dist/ranger-admin-default-site.xml
@@ -395,14 +395,41 @@
<value>ranger.auditdb.password</value>
<description></description>
</property>
+
<property>
<name>ranger.ldap.binddn.credential.alias</name>
<value>ranger.ldap.binddn.password</value>
<description></description>
</property>
+
<property>
<name>ranger.ldap.ad.binddn.credential.alias</name>
<value>ranger.ad.binddn.password</value>
<description></description>
</property>
+
+ <property>
+ <name>ranger.resource.lookup.timeout.value.in.ms</name>
+ <value>1000</value>
+ <description></description>
+ </property>
+
+ <property>
+ <name>ranger.validate.config.timeout.value.in.ms</name>
+ <value>10000</value>
+ <description></description>
+ </property>
+
+ <property>
+ <name>ranger.timed.executor.max.threadpool.size</name>
+ <value>10</value>
+ <description></description>
+ </property>
+
+ <property>
+ <name>ranger.timed.executor.queue.size</name>
+ <value>100</value>
+ <description></description>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/154c4904/security-admin/src/test/java/org/apache/ranger/common/TestTimedExecutor.java
----------------------------------------------------------------------
diff --git a/security-admin/src/test/java/org/apache/ranger/common/TestTimedExecutor.java b/security-admin/src/test/java/org/apache/ranger/common/TestTimedExecutor.java
new file mode 100644
index 0000000..39d8ecf
--- /dev/null
+++ b/security-admin/src/test/java/org/apache/ranger/common/TestTimedExecutor.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTimedExecutor {
+
+ private static final Log LOG = LogFactory.getLog(TestTimedExecutor.class);
+
+ @Before
+ public void before() {
+
+ }
+
+ @Test
+ public void test() throws InterruptedException {
+ /*
+ * Create a pool with 2 threads and queue size of 3 such that 6th item should get rejected right away due to capacity.
+ */
+ int poolSize = 2;
+ int queueSize = 3;
+ _configurator = new TimedExecutorConfigurator(poolSize, queueSize);
+ // Just toa void thread shutting down and restarting set keep alive to high value.
+ _executor.initialize(_configurator);
+
+ // now create 2 callalbles that would keep waiting unless we ask them to proceed
+ // create an executor which would simulate simultaneous threads calling into executor to perform lookups
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
+ /*
+ * We would have 2 permits for 10 callables, such that
+ * - 2 should succeed
+ * - 5 should timeout (2 in pool + 3 in queue)
+ * - 3 should get rejected.
+ */
+ Semaphore semaphore = new Semaphore(2);
+ /*
+ * We need a latch to keep track of when the processing is done so we can check the results of teh test
+ */
+ CountDownLatch latch = new CountDownLatch(10);
+ // Callables will record exception in this map
+ final ConcurrentMap<String, AtomicInteger> results = new ConcurrentHashMap<String, AtomicInteger>();
+ for (int i = 0; i < 10; i++) {
+ LookupTask lookupTask = new LookupTask(i, semaphore);
+ TimedTask timedTask = new TimedTask(_executor, lookupTask, 1, TimeUnit.SECONDS, results, latch);
+ Future<Integer> aFuture = executorService.submit(timedTask);
+ futures.add(aFuture);
+ }
+ // Let's wait for the threads to finish
+ LOG.debug("Starting to wait for threadpool to finish");
+ latch.await();
+ /*
+ * depending on how threads get scheduled the count in results would vary, except we know for sure that.
+ * - 2 must succeed since we have exactly 2 permits available.
+ * - sum of timed out and rejected must be equal to 8.
+ * - at least 3 and no more than 5 tasks must get rejected.
+ * - at least 3 and no more than 5 tasks must get timed out
+ */
+ int successCount = results.get("success").get();
+ int timeoutCount = results.get("java.util.concurrent.TimeoutException").get();
+ int rejectedCount = results.get("java.util.concurrent.RejectedExecutionException").get();
+ assertEquals("success count", 2, successCount);
+ assertTrue("timeout[" + timeoutCount + "]: 3 <= count(timeout) <= 5", timeoutCount >= 3 && timeoutCount <= 5);
+ assertTrue("rejected[" + rejectedCount + "]: 3 <= count(timeout) <= 5", rejectedCount >= 3 && rejectedCount <= 5);
+ assertEquals("total should equal 10", 10, successCount + timeoutCount + rejectedCount);
+ _executor.shutdown();
+ }
+
+ static final String format = "%15s id: %2d";
+
+ static class LookupTask implements Callable<Integer> {
+
+ final int _id;
+ final private Semaphore _semaphore;
+
+ public LookupTask(int id, Semaphore latch) {
+ _id = id;
+ _semaphore = latch;
+ }
+
+ int getId() {
+ return _id;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ LOG.debug(String.format(format, "Starting", _id));
+ _semaphore.acquire();
+ LOG.debug(String.format(format, "Acquired", _id));
+ LOG.debug(String.format(format, "Ended", _id));
+ return _id;
+ }
+
+ }
+
+ static class TimedTask implements Callable<Integer> {
+
+ final LookupTask _callable;
+ final TimedExecutor _executor;
+ final ConcurrentMap<String, AtomicInteger> _results;
+ final long _timeout;
+ final TimeUnit _unit;
+ final CountDownLatch _latch;
+
+ public TimedTask(TimedExecutor executor, LookupTask callable, int timout, TimeUnit unit, ConcurrentMap<String, AtomicInteger> results, CountDownLatch latch) {
+ _callable = callable;
+ _executor = executor;
+ _results = results;
+ _timeout = timout;
+ _unit = unit;
+ _latch = latch;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ int id = _callable.getId();
+ LOG.debug(String.format(format, "Submitting", id));
+ try {
+ Integer result = _executor.timedTask(_callable, _timeout, _unit);
+ LOG.debug(String.format(format, "Finished", id));
+ recordResult(_results, "success");
+ return result;
+ } catch (Exception e) {
+ LOG.debug(String.format(format, "Exception", id));
+ recordResult(_results, e);
+ // re-throw caught exception
+ throw e;
+ } finally {
+ _latch.countDown();
+ }
+ }
+
+ }
+
+ static void recordResult(ConcurrentMap<String, AtomicInteger> results, String key) {
+ if (results.containsKey(key)) {
+ results.get(key).incrementAndGet();
+ } else {
+ AtomicInteger previous = results.putIfAbsent(key, new AtomicInteger(1));
+ if (previous != null) { // a value was already associated with the key
+ previous.incrementAndGet();
+ }
+ }
+ }
+
+ static void recordResult(ConcurrentMap<String, AtomicInteger> results, Exception e) {
+ String exceptionName = e.getClass().getCanonicalName();
+ recordResult(results, exceptionName);
+ }
+
+ private TimedExecutorConfigurator _configurator;
+ private TimedExecutor _executor = new TimedExecutor();
+}