You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by bo...@apache.org on 2015/05/30 21:19:41 UTC
[1/2] incubator-ranger git commit: RANGER-397 Support RDBMS as audit
destination using V3 configuration
Repository: incubator-ranger
Updated Branches:
refs/heads/master 9e5bd8540 -> 94ba6beb3
RANGER-397 Support RDBMS as audit destination using V3 configuration
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/a2de2450
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/a2de2450
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/a2de2450
Branch: refs/heads/master
Commit: a2de2450a572468af1928d5d021567c39544e193
Parents: 9e5bd85
Author: Don Bosco Durai <bo...@apache.org>
Authored: Fri May 29 14:54:22 2015 -0700
Committer: Don Bosco Durai <bo...@apache.org>
Committed: Fri May 29 14:54:22 2015 -0700
----------------------------------------------------------------------
.../org/apache/ranger/audit/dao/DaoManager.java | 2 +
.../audit/destination/DBAuditDestination.java | 306 +++++++++++++++++++
.../audit/destination/HDFSAuditDestination.java | 3 +
.../audit/provider/AuditProviderFactory.java | 3 +-
.../ranger/audit/provider/BaseAuditHandler.java | 5 +-
.../apache/ranger/audit/provider/MiscUtil.java | 15 +
.../ranger/audit/queue/AuditAsyncQueue.java | 25 +-
.../ranger/audit/queue/AuditBatchQueue.java | 24 +-
.../apache/ranger/audit/queue/AuditQueue.java | 6 +
.../ranger/audit/queue/AuditSummaryQueue.java | 25 +-
10 files changed, 409 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java b/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java
index 6d81744..fd4d096 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java
@@ -49,6 +49,8 @@ public class DaoManager extends DaoManagerBase {
sEntityManager.set(em);
}
+ } else {
+ logger.error("EntityManagerFactory was not set in this thread.", new Throwable());
}
return em;
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
new file mode 100644
index 0000000..c58748e
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.destination;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.EntityTransaction;
+import javax.persistence.Persistence;
+
+import org.apache.ranger.audit.dao.DaoManager;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+public class DBAuditDestination extends AuditDestination {
+
+ private static final Log logger = LogFactory
+ .getLog(DBAuditDestination.class);
+
+ public static final String PROP_DB_JDBC_DRIVER = "jdbc.driver";
+ public static final String PROP_DB_JDBC_URL = "jdbc.url";
+ public static final String PROP_DB_USER = "user";
+ public static final String PROP_DB_PASSWORD = "password";
+ public static final String PROP_DB_PASSWORD_ALIAS = "password.alias";
+
+ private EntityManagerFactory entityManagerFactory;
+ private DaoManager daoManager;
+
+ private String jdbcDriver = null;
+ private String jdbcURL = null;
+ private String dbUser = null;
+ private String dbPasswordAlias = "auditDBCred";
+
+ public DBAuditDestination() {
+ logger.info("DBAuditDestination() called");
+ }
+
+ @Override
+ public void init(Properties props, String propPrefix) {
+ logger.info("init() called");
+ super.init(props, propPrefix);
+
+ // Initial connect
+ connect();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditHandler#logger(java.util.Collection
+ * )
+ */
+ @Override
+ public boolean log(Collection<AuditEventBase> events) {
+ boolean retValue = false;
+
+ if (!beginTransaction()) {
+ return false;
+ }
+ boolean isFailed = false;
+ for (AuditEventBase event : events) {
+ try {
+ event.persist(daoManager);
+ } catch (Throwable t) {
+ logger.error("Error persisting data. event=" + event, t);
+ isFailed = true;
+ break;
+ }
+ }
+ if (isFailed) {
+ retValue = false;
+ rollbackTransaction();
+ } else {
+ retValue = commitTransaction();
+ }
+ return retValue;
+ }
+
+ @Override
+ public void stop() {
+ cleanUp();
+ super.stop();
+ }
+
+ // Local methods
+ protected void connect() {
+ if (isDbConnected()) {
+ return;
+ }
+ try {
+ jdbcDriver = MiscUtil.getStringProperty(props, propPrefix + "."
+ + PROP_DB_JDBC_DRIVER);
+ jdbcURL = MiscUtil.getStringProperty(props, propPrefix + "."
+ + PROP_DB_JDBC_URL);
+ dbUser = MiscUtil.getStringProperty(props, propPrefix + "."
+ + PROP_DB_USER);
+ String dbPassword = MiscUtil.getStringProperty(props, propPrefix
+ + "." + PROP_DB_PASSWORD);
+ String tmpAlias = MiscUtil.getStringProperty(props, propPrefix
+ + "." + PROP_DB_PASSWORD_ALIAS);
+ dbPasswordAlias = tmpAlias != null ? tmpAlias : dbPasswordAlias;
+ String credFile = MiscUtil.getStringProperty(props,
+ AUDIT_DB_CREDENTIAL_PROVIDER_FILE);
+
+ if (jdbcDriver == null || jdbcDriver.isEmpty()) {
+ logger.fatal("JDBC driver not provided. Set property name "
+ + propPrefix + "." + PROP_DB_JDBC_DRIVER);
+ return;
+ }
+ if (jdbcURL == null || jdbcURL.isEmpty()) {
+ logger.fatal("JDBC URL not provided. Set property name "
+ + propPrefix + "." + PROP_DB_JDBC_URL);
+ return;
+ }
+ if (dbUser == null || dbUser.isEmpty()) {
+ logger.fatal("DB user not provided. Set property name "
+ + propPrefix + "." + PROP_DB_USER);
+ return;
+ }
+ if (dbPassword == null || dbPassword.isEmpty()) {
+ logger.warn("DB password not provided. Will assume empty for now. Set property name "
+ + propPrefix + "." + PROP_DB_PASSWORD);
+ } else {
+ dbPassword = MiscUtil.getCredentialString(credFile,
+ dbPasswordAlias);
+ }
+ logger.info("JDBC Driver=" + jdbcDriver + ", JDBC URL=" + jdbcURL
+ + ", dbUser=" + dbUser + ", passwordAlias="
+ + dbPasswordAlias + ", credFile=" + credFile);
+
+ Map<String, String> dbProperties = new HashMap<String, String>();
+ dbProperties.put("javax.persistence.jdbc.driver", jdbcDriver);
+ dbProperties.put("javax.persistence.jdbc.url", jdbcURL);
+ dbProperties.put("javax.persistence.jdbc.user", dbUser);
+ if (dbPassword != null) {
+ dbProperties.put("javax.persistence.jdbc.password", dbPassword);
+ }
+
+ entityManagerFactory = Persistence.createEntityManagerFactory(
+ "xa_server", dbProperties);
+
+ logger.info("entityManagerFactory=" + entityManagerFactory);
+
+ daoManager = new DaoManager();
+ daoManager.setEntityManagerFactory(entityManagerFactory);
+
+ // this forces the connection to be made to DB
+ if (daoManager.getEntityManager() != null) {
+ logger.error("Error connecting audit database. EntityManager is null. dbURL="
+ + jdbcURL + ", dbUser=" + dbUser);
+ }
+
+ } catch (Throwable t) {
+ logger.error("Error connecting audit database. dbURL=" + jdbcURL
+ + ", dbUser=" + dbUser, t);
+ }
+ }
+
+ private synchronized void cleanUp() {
+ logger.info("DBAuditDestination: cleanUp()");
+
+ try {
+ if (entityManagerFactory != null && entityManagerFactory.isOpen()) {
+ entityManagerFactory.close();
+ }
+ } catch (Exception excp) {
+ logger.error("DBAuditDestination.cleanUp(): failed", excp);
+ } finally {
+ entityManagerFactory = null;
+ daoManager = null;
+ }
+ }
+
+ private EntityManager getEntityManager() {
+ DaoManager daoMgr = daoManager;
+
+ if (daoMgr != null) {
+ try {
+ return daoMgr.getEntityManager();
+ } catch (Exception excp) {
+ logger.error("DBAuditDestination.getEntityManager(): failed",
+ excp);
+
+ cleanUp();
+ }
+ }
+
+ return null;
+ }
+
+ private boolean isDbConnected() {
+ EntityManager em = getEntityManager();
+ return em != null && em.isOpen();
+ }
+
+ private void clearEntityManager() {
+ try {
+ EntityManager em = getEntityManager();
+
+ if (em != null) {
+ em.clear();
+ }
+ } catch (Exception excp) {
+ logger.warn("DBAuditDestination.clearEntityManager(): failed", excp);
+ }
+ }
+
+ private EntityTransaction getTransaction() {
+ if (!isDbConnected()) {
+ connect();
+ }
+
+ EntityManager em = getEntityManager();
+
+ return em != null ? em.getTransaction() : null;
+ }
+
+ private boolean beginTransaction() {
+ EntityTransaction trx = getTransaction();
+
+ if (trx != null && !trx.isActive()) {
+ trx.begin();
+ }
+
+ if (trx == null) {
+ logger.warn("DBAuditDestination.beginTransaction(): trx is null");
+ }
+
+ return trx != null;
+ }
+
+ private boolean commitTransaction() {
+ boolean ret = false;
+ EntityTransaction trx = null;
+
+ try {
+ trx = getTransaction();
+
+ if (trx != null && trx.isActive()) {
+ trx.commit();
+ ret = true;
+ } else {
+ throw new Exception("trx is null or not active");
+ }
+ } catch (Throwable excp) {
+ logger.error("DBAuditDestination.commitTransaction(): failed", excp);
+
+ cleanUp(); // so that next insert will try to init()
+ } finally {
+ clearEntityManager();
+ }
+
+ return ret;
+ }
+
+ private boolean rollbackTransaction() {
+ boolean ret = false;
+ EntityTransaction trx = null;
+
+ try {
+ trx = getTransaction();
+
+ if (trx != null && trx.isActive()) {
+ trx.rollback();
+ ret = true;
+ } else {
+ throw new Exception("trx is null or not active");
+ }
+ } catch (Throwable excp) {
+ logger.error("DBAuditDestination.rollbackTransaction(): failed",
+ excp);
+
+ cleanUp(); // so that next insert will try to init()
+ } finally {
+ clearEntityManager();
+ }
+
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
index 6ca4fce..67382a9 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
@@ -105,6 +105,9 @@ public class HDFSAuditDestination extends AuditDestination {
@Override
synchronized public boolean logJSON(Collection<String> events) {
+ if (!initDone) {
+ return false;
+ }
if (isStopped) {
logError("log() called after stop was requested. name=" + getName());
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index d6ef318..c3a05ce 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -24,6 +24,7 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.DBAuditDestination;
import org.apache.ranger.audit.destination.FileAuditDestination;
import org.apache.ranger.audit.destination.HDFSAuditDestination;
import org.apache.ranger.audit.destination.SolrAuditDestination;
@@ -415,7 +416,7 @@ public class AuditProviderFactory {
} else if (providerName.equals("kafka")) {
provider = new KafkaAuditProvider();
} else if (providerName.equals("db")) {
- provider = new DbAuditProvider();
+ provider = new DBAuditDestination();
} else if (providerName.equals("log4j")) {
provider = new Log4jAuditProvider();
} else if (providerName.equals("batch")) {
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
index dd44def..09335c7 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.model.AuditEventBase;
import org.apache.ranger.audit.model.AuthzAuditEvent;
+
import com.google.gson.GsonBuilder;
import java.util.concurrent.atomic.AtomicLong;
@@ -33,7 +34,9 @@ import java.util.Properties;
public abstract class BaseAuditHandler implements AuditHandler {
private static final Log LOG = LogFactory.getLog(BaseAuditHandler.class);
- private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms";
+ static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms";
+ protected static final String AUDIT_DB_CREDENTIAL_PROVIDER_FILE = "xasecure.audit.credential.provider.file";
+
private int mLogFailureReportMinIntervalInMs = 60 * 1000;
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
index fe6b0e9..abb0a90 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
@@ -29,6 +29,7 @@ import java.util.StringTokenizer;
import java.util.UUID;
import org.apache.log4j.helpers.LogLog;
+import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -377,5 +378,19 @@ public class MiscUtil {
}
return list;
}
+
+ public static String getCredentialString(String url,String alias) {
+ String ret = null;
+
+ if(url != null && alias != null) {
+ char[] cred = RangerCredentialProvider.getInstance().getCredentialString(url,alias);
+
+ if ( cred != null ) {
+ ret = new String(cred);
+ }
+ }
+
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
index d16fff9..de5941a 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
@@ -102,9 +102,16 @@ public class AuditAsyncQueue extends AuditQueue implements Runnable {
*/
@Override
public void stop() {
+ logger.info("Stop called. name=" + getName());
+ if (stopTime != 0) {
+ stopTime = System.currentTimeMillis();
+ }
setDrain(true);
try {
if (consumerThread != null) {
+ logger.info("Interrupting consumerThread. name=" + getName()
+ + ", consumer="
+ + (consumer == null ? null : consumer.getName()));
consumerThread.interrupt();
}
} catch (Throwable t) {
@@ -138,7 +145,7 @@ public class AuditAsyncQueue extends AuditQueue implements Runnable {
}
} catch (InterruptedException e) {
logger.info(
- "Caught exception in consumer thread. Mostly to about loop",
+ "Caught exception in consumer thread. Mostly server is shutting down.",
e);
} catch (Throwable t) {
logger.error("Caught error during processing request.", t);
@@ -146,13 +153,29 @@ public class AuditAsyncQueue extends AuditQueue implements Runnable {
if (isDrain() && queue.isEmpty()) {
break;
}
+ if (isDrain()
+ && (stopTime - System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS) {
+ logger.warn("Exiting polling loop to max time allowed. name="
+ + getName() + ", waited for "
+ + (stopTime - System.currentTimeMillis()) + " ms");
+
+ break;
+ }
}
+ logger.info("Exiting polling loop. name=" + getName());
+
try {
// Call stop on the consumer
+ logger.info("Calling to stop consumer. name=" + getName()
+ + ", consumer.name=" + consumer.getName());
+
+ // Call stop on the consumer
consumer.stop();
} catch (Throwable t) {
logger.error("Error while calling stop on consumer.", t);
}
+ logger.info("Exiting consumerThread.run() method. name=" + getName());
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
index 8316c2b..645483b 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
@@ -119,10 +119,19 @@ public class AuditBatchQueue extends AuditQueue implements Runnable {
*/
@Override
public void stop() {
+ logger.info("Stop called. name=" + getName());
+ if (stopTime != 0) {
+ stopTime = System.currentTimeMillis();
+ }
+
setDrain(true);
flush();
try {
if (consumerThread != null) {
+ logger.info("Interrupting consumerThread. name=" + getName()
+ + ", consumer="
+ + (consumer == null ? null : consumer.getName()));
+
consumerThread.interrupt();
}
} catch (Throwable t) {
@@ -257,7 +266,7 @@ public class AuditBatchQueue extends AuditQueue implements Runnable {
}
} catch (InterruptedException e) {
logger.info(
- "Caught exception in consumer thread. Mostly to abort loop",
+ "Caught exception in consumer thread. Mostly server is shutting down.",
e);
setDrain(true);
} catch (Throwable t) {
@@ -311,12 +320,24 @@ public class AuditBatchQueue extends AuditQueue implements Runnable {
break;
}
}
+ if (isDrain()
+ && (stopTime - System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS) {
+ logger.warn("Exiting polling loop to max time allowed. name="
+ + getName() + ", waited for "
+ + (stopTime - System.currentTimeMillis()) + " ms");
+
+ break;
+ }
+
}
logger.info("Exiting consumerThread. Queue=" + getName() + ", dest="
+ consumer.getName());
try {
// Call stop on the consumer
+ logger.info("Calling to stop consumer. name=" + getName()
+ + ", consumer.name=" + consumer.getName());
+
consumer.stop();
if (fileSpoolerEnabled) {
fileSpooler.stop();
@@ -324,5 +345,6 @@ public class AuditBatchQueue extends AuditQueue implements Runnable {
} catch (Throwable t) {
logger.error("Error while calling stop on consumer.", t);
}
+ logger.info("Exiting consumerThread.run() method. name=" + getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
index 4c3ac5f..039dc6d 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
@@ -33,6 +33,9 @@ public abstract class AuditQueue extends BaseAuditHandler {
public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024;
public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 1000;
public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000;
+
+ //This is the max time the consumer thread will wait before exiting the loop
+ public static final int AUDIT_CONSUMER_THREAD_WAIT_MS = 5000;
private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT;
private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS;
@@ -57,6 +60,9 @@ public abstract class AuditQueue extends BaseAuditHandler {
protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes
protected int fileSpoolDrainThresholdPercent = 80;
+ //This is set when the first time stop is called.
+ protected long stopTime = 0;
+
/**
* @param consumer
*/
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
index 7922312..1e5b500 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
@@ -122,9 +122,18 @@ public class AuditSummaryQueue extends AuditQueue implements Runnable {
*/
@Override
public void stop() {
+ logger.info("Stop called. name=" + getName());
+ if (stopTime != 0) {
+ stopTime = System.currentTimeMillis();
+ }
+
setDrain(true);
try {
if (consumerThread != null) {
+ logger.info("Interrupting consumerThread. name=" + getName()
+ + ", consumer="
+ + (consumer == null ? null : consumer.getName()));
+
consumerThread.interrupt();
}
} catch (Throwable t) {
@@ -170,7 +179,7 @@ public class AuditSummaryQueue extends AuditQueue implements Runnable {
}
} catch (InterruptedException e) {
logger.info(
- "Caught exception in consumer thread. Mostly to about loop",
+ "Caught exception in consumer thread. Mostly server is shutting down.",
e);
} catch (Throwable t) {
logger.error("Caught error during processing request.", t);
@@ -217,14 +226,28 @@ public class AuditSummaryQueue extends AuditQueue implements Runnable {
if (isDrain() && summaryMap.isEmpty() && queue.isEmpty()) {
break;
}
+ if (isDrain()
+ && (stopTime - System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS) {
+ logger.warn("Exiting polling loop to max time allowed. name="
+ + getName() + ", waited for "
+ + (stopTime - System.currentTimeMillis()) + " ms");
+
+ break;
+ }
+
}
+ logger.info("Exiting polling loop. name=" + getName());
try {
// Call stop on the consumer
+ logger.info("Calling to stop consumer. name=" + getName()
+ + ", consumer.name=" + consumer.getName());
consumer.stop();
} catch (Throwable t) {
logger.error("Error while calling stop on consumer.", t);
}
+ logger.info("Exiting consumerThread.run() method. name=" + getName());
+
}
class AuditSummary {
[2/2] incubator-ranger git commit: RANGER-397 Applied review feedback
Posted by bo...@apache.org.
RANGER-397 Applied review feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/94ba6beb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/94ba6beb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/94ba6beb
Branch: refs/heads/master
Commit: 94ba6beb3841f094d5800619275d80296a8b54b6
Parents: a2de245
Author: Don Bosco Durai <bo...@apache.org>
Authored: Sat May 30 12:14:19 2015 -0700
Committer: Don Bosco Durai <bo...@apache.org>
Committed: Sat May 30 12:14:19 2015 -0700
----------------------------------------------------------------------
.../audit/destination/DBAuditDestination.java | 24 +++++++++++-------
.../ranger/audit/queue/AuditAsyncQueue.java | 25 +++++++++----------
.../ranger/audit/queue/AuditBatchQueue.java | 21 ++++++----------
.../apache/ranger/audit/queue/AuditQueue.java | 7 ++++++
.../ranger/audit/queue/AuditSummaryQueue.java | 26 +++++++++-----------
.../kafka/client/ServiceKafkaClient.java | 5 ++--
.../services/solr/client/ServiceSolrClient.java | 5 ++--
.../org/apache/ranger/common/ServiceUtil.java | 13 +++++++---
src/main/assembly/plugin-kafka.xml | 2 ++
9 files changed, 66 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/94ba6beb/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
index c58748e..8cece4e 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
@@ -119,8 +119,8 @@ public class DBAuditDestination extends AuditDestination {
+ PROP_DB_JDBC_URL);
dbUser = MiscUtil.getStringProperty(props, propPrefix + "."
+ PROP_DB_USER);
- String dbPassword = MiscUtil.getStringProperty(props, propPrefix
- + "." + PROP_DB_PASSWORD);
+ String dbPasswordFromProp = MiscUtil.getStringProperty(props,
+ propPrefix + "." + PROP_DB_PASSWORD);
String tmpAlias = MiscUtil.getStringProperty(props, propPrefix
+ "." + PROP_DB_PASSWORD_ALIAS);
dbPasswordAlias = tmpAlias != null ? tmpAlias : dbPasswordAlias;
@@ -142,16 +142,22 @@ public class DBAuditDestination extends AuditDestination {
+ propPrefix + "." + PROP_DB_USER);
return;
}
+ String dbPassword = MiscUtil.getCredentialString(credFile,
+ dbPasswordAlias);
+
if (dbPassword == null || dbPassword.isEmpty()) {
- logger.warn("DB password not provided. Will assume empty for now. Set property name "
- + propPrefix + "." + PROP_DB_PASSWORD);
- } else {
- dbPassword = MiscUtil.getCredentialString(credFile,
- dbPasswordAlias);
+ // If password is not in credential store, let's try password
+ // from property
+ dbPassword = dbPasswordFromProp;
+ }
+
+ if (dbPassword == null || dbPassword.isEmpty()) {
+ logger.warn("DB password not provided. Will assume it is empty and continue");
}
logger.info("JDBC Driver=" + jdbcDriver + ", JDBC URL=" + jdbcURL
+ ", dbUser=" + dbUser + ", passwordAlias="
- + dbPasswordAlias + ", credFile=" + credFile);
+ + dbPasswordAlias + ", credFile=" + credFile
+ + ", usingPassword=" + (dbPassword == null ? "no" : "yes"));
Map<String, String> dbProperties = new HashMap<String, String>();
dbProperties.put("javax.persistence.jdbc.driver", jdbcDriver);
@@ -170,7 +176,7 @@ public class DBAuditDestination extends AuditDestination {
daoManager.setEntityManagerFactory(entityManagerFactory);
// this forces the connection to be made to DB
- if (daoManager.getEntityManager() != null) {
+ if (daoManager.getEntityManager() == null) {
logger.error("Error connecting audit database. EntityManager is null. dbURL="
+ jdbcURL + ", dbUser=" + dbUser);
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/94ba6beb/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
index de5941a..47480da 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
@@ -103,9 +103,6 @@ public class AuditAsyncQueue extends AuditQueue implements Runnable {
@Override
public void stop() {
logger.info("Stop called. name=" + getName());
- if (stopTime != 0) {
- stopTime = System.currentTimeMillis();
- }
setDrain(true);
try {
if (consumerThread != null) {
@@ -145,21 +142,21 @@ public class AuditAsyncQueue extends AuditQueue implements Runnable {
}
} catch (InterruptedException e) {
logger.info(
- "Caught exception in consumer thread. Mostly server is shutting down.",
+ "Caught exception in consumer thread. Shutdown might be in progress",
e);
} catch (Throwable t) {
logger.error("Caught error during processing request.", t);
}
- if (isDrain() && queue.isEmpty()) {
- break;
- }
- if (isDrain()
- && (stopTime - System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS) {
- logger.warn("Exiting polling loop to max time allowed. name="
- + getName() + ", waited for "
- + (stopTime - System.currentTimeMillis()) + " ms");
-
- break;
+ if (isDrain()) {
+ if (queue.isEmpty()) {
+ break;
+ }
+ if (isDrainMaxTimeElapsed()) {
+ logger.warn("Exiting polling loop because max time allowed reached. name="
+ + getName()
+ + ", waited for "
+ + (stopTime - System.currentTimeMillis()) + " ms");
+ }
}
}
logger.info("Exiting polling loop. name=" + getName());
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/94ba6beb/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
index 645483b..80d7853 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
@@ -120,10 +120,6 @@ public class AuditBatchQueue extends AuditQueue implements Runnable {
@Override
public void stop() {
logger.info("Stop called. name=" + getName());
- if (stopTime != 0) {
- stopTime = System.currentTimeMillis();
- }
-
setDrain(true);
flush();
try {
@@ -266,7 +262,7 @@ public class AuditBatchQueue extends AuditQueue implements Runnable {
}
} catch (InterruptedException e) {
logger.info(
- "Caught exception in consumer thread. Mostly server is shutting down.",
+ "Caught exception in consumer thread. Shutdown might be in progress",
e);
setDrain(true);
} catch (Throwable t) {
@@ -319,16 +315,13 @@ public class AuditBatchQueue extends AuditQueue implements Runnable {
} else {
break;
}
+ if (isDrainMaxTimeElapsed()) {
+ logger.warn("Exiting polling loop because max time allowed reached. name="
+ + getName()
+ + ", waited for "
+ + (stopTime - System.currentTimeMillis()) + " ms");
+ }
}
- if (isDrain()
- && (stopTime - System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS) {
- logger.warn("Exiting polling loop to max time allowed. name="
- + getName() + ", waited for "
- + (stopTime - System.currentTimeMillis()) + " ms");
-
- break;
- }
-
}
logger.info("Exiting consumerThread. Queue=" + getName() + ", dest="
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/94ba6beb/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
index 039dc6d..e873459 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
@@ -114,11 +114,18 @@ public abstract class AuditQueue extends BaseAuditHandler {
return consumer;
}
+ public boolean isDrainMaxTimeElapsed() {
+ return (stopTime - System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS;
+ }
+
public boolean isDrain() {
return isDrain;
}
public void setDrain(boolean isDrain) {
+ if (isDrain && stopTime != 0) {
+ stopTime = System.currentTimeMillis();
+ }
this.isDrain = isDrain;
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/94ba6beb/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
index 1e5b500..f1ce799 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
@@ -123,10 +123,6 @@ public class AuditSummaryQueue extends AuditQueue implements Runnable {
@Override
public void stop() {
logger.info("Stop called. name=" + getName());
- if (stopTime != 0) {
- stopTime = System.currentTimeMillis();
- }
-
setDrain(true);
try {
if (consumerThread != null) {
@@ -179,7 +175,7 @@ public class AuditSummaryQueue extends AuditQueue implements Runnable {
}
} catch (InterruptedException e) {
logger.info(
- "Caught exception in consumer thread. Mostly server is shutting down.",
+ "Caught exception in consumer thread. Shutdown might be in progress",
e);
} catch (Throwable t) {
logger.error("Caught error during processing request.", t);
@@ -223,16 +219,16 @@ public class AuditSummaryQueue extends AuditQueue implements Runnable {
summaryMap.clear();
}
- if (isDrain() && summaryMap.isEmpty() && queue.isEmpty()) {
- break;
- }
- if (isDrain()
- && (stopTime - System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS) {
- logger.warn("Exiting polling loop to max time allowed. name="
- + getName() + ", waited for "
- + (stopTime - System.currentTimeMillis()) + " ms");
-
- break;
+ if (isDrain()) {
+ if (summaryMap.isEmpty() && queue.isEmpty()) {
+ break;
+ }
+ if (isDrainMaxTimeElapsed()) {
+ logger.warn("Exiting polling loop because max time allowed reached. name="
+ + getName()
+ + ", waited for "
+ + (stopTime - System.currentTimeMillis()) + " ms");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/94ba6beb/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
index 5cca619..0698bf6 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
@@ -61,20 +61,19 @@ public class ServiceKafkaClient {
public HashMap<String, Object> testConnection() throws Exception {
String errMsg = errMessage;
- boolean connectivityStatus = false;
HashMap<String, Object> responseData = new HashMap<String, Object>();
try {
getTopicList(null);
// If it doesn't throw exception, then assume the instance is
// reachable
String successMsg = "TestConnection Successful";
- BaseClient.generateResponseDataMap(connectivityStatus, successMsg,
+ BaseClient.generateResponseDataMap(true, successMsg,
successMsg, null, null, responseData);
} catch (IOException e) {
LOG.error("Error connecting to Kafka. kafkaClient=" + this, e);
String failureMsg = "Unable to connect to Kafka instance."
+ e.getMessage();
- BaseClient.generateResponseDataMap(connectivityStatus, failureMsg,
+ BaseClient.generateResponseDataMap(false, failureMsg,
failureMsg + errMsg, null, null, responseData);
}
return responseData;
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/94ba6beb/plugin-solr/src/main/java/org/apache/ranger/services/solr/client/ServiceSolrClient.java
----------------------------------------------------------------------
diff --git a/plugin-solr/src/main/java/org/apache/ranger/services/solr/client/ServiceSolrClient.java b/plugin-solr/src/main/java/org/apache/ranger/services/solr/client/ServiceSolrClient.java
index 6a192f4..801578b 100644
--- a/plugin-solr/src/main/java/org/apache/ranger/services/solr/client/ServiceSolrClient.java
+++ b/plugin-solr/src/main/java/org/apache/ranger/services/solr/client/ServiceSolrClient.java
@@ -72,7 +72,6 @@ public class ServiceSolrClient {
public HashMap<String, Object> testConnection() throws Exception {
String errMsg = errMessage;
- boolean connectivityStatus = false;
HashMap<String, Object> responseData = new HashMap<String, Object>();
try {
@@ -80,13 +79,13 @@ public class ServiceSolrClient {
// If it doesn't throw exception, then assume the instance is
// reachable
String successMsg = "TestConnection Successful";
- BaseClient.generateResponseDataMap(connectivityStatus, successMsg,
+ BaseClient.generateResponseDataMap(true, successMsg,
successMsg, null, null, responseData);
} catch (IOException e) {
LOG.error("Error connecting to Solr. solrClient=" + solrClient, e);
String failureMsg = "Unable to connect to Solr instance."
+ e.getMessage();
- BaseClient.generateResponseDataMap(connectivityStatus, failureMsg,
+ BaseClient.generateResponseDataMap(false, failureMsg,
failureMsg + errMsg, null, null, responseData);
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/94ba6beb/security-admin/src/main/java/org/apache/ranger/common/ServiceUtil.java
----------------------------------------------------------------------
diff --git a/security-admin/src/main/java/org/apache/ranger/common/ServiceUtil.java b/security-admin/src/main/java/org/apache/ranger/common/ServiceUtil.java
index 09759c3..b7a923b 100644
--- a/security-admin/src/main/java/org/apache/ranger/common/ServiceUtil.java
+++ b/security-admin/src/main/java/org/apache/ranger/common/ServiceUtil.java
@@ -1305,27 +1305,29 @@ public class ServiceUtil {
try {
service = svcStore.getServiceByName(serviceName);
} catch (Exception e) {
- LOG.error("Requested Service not found");
+ LOG.error("Requested Service not found. serviceName=" + serviceName);
throw restErrorUtil.createRESTException("Serivce:" + serviceName + " not found",
MessageEnums.DATA_NOT_FOUND);
}
if(service==null){
- LOG.error("Requested Service not found");
+ LOG.error("Requested Service not found. Service name is null.");
throw restErrorUtil.createRESTException("No Data Found.",
MessageEnums.DATA_NOT_FOUND);
}
if(!service.getIsEnabled()){
- LOG.error("Requested Service is disabled");
+ LOG.error("Requested Service is disabled. serviceName=" + serviceName);
throw restErrorUtil.createRESTException("Unauthorized access.",
MessageEnums.OPER_NOT_ALLOWED_FOR_STATE);
}
if (!httpEnabled) {
if (!isSecure) {
+ LOG.error("Unauthorized access. Only https is allowed. serviceName=" + serviceName);
throw restErrorUtil.createRESTException("Unauthorized access -"
+ " only https allowed",
MessageEnums.OPER_NOT_ALLOWED_FOR_ENTITY);
}
if (certchain == null || certchain.length == 0) {
+ LOG.error("Unauthorized access. Unable to get client certificate. serviceName=" + serviceName);
throw restErrorUtil.createRESTException("Unauthorized access -"
+ " unable to get client certificate",
MessageEnums.OPER_NOT_ALLOWED_FOR_ENTITY);
@@ -1344,13 +1346,14 @@ public class ServiceUtil {
}
}
if (commonName == null) {
+ LOG.error("Unauthorized access. CName is null. serviceName=" + serviceName);
throw restErrorUtil.createRESTException(
"Unauthorized access - Unable to find Common Name from ["
+ dn + "]",
MessageEnums.OPER_NOT_ALLOWED_FOR_ENTITY);
}
} catch (InvalidNameException e) {
- LOG.error("Invalid Common Name.", e);
+ LOG.error("Invalid Common Name. CName=" + commonName + ", serviceName=" + serviceName, e);
throw restErrorUtil.createRESTException(
"Unauthorized access - Invalid Common Name",
MessageEnums.OPER_NOT_ALLOWED_FOR_ENTITY);
@@ -1362,6 +1365,8 @@ public class ServiceUtil {
String cnFromConfig = configMap.get("commonNameForCertificate");
if (cnFromConfig == null
|| !commonName.equalsIgnoreCase(cnFromConfig)) {
+ LOG.error("Unauthorized access. expected [" + cnFromConfig + "], found ["
+ + commonName + "], serviceName=" + serviceName);
throw restErrorUtil.createRESTException(
"Unauthorized access. expected [" + cnFromConfig
+ "], found [" + commonName + "]",
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/94ba6beb/src/main/assembly/plugin-kafka.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/plugin-kafka.xml b/src/main/assembly/plugin-kafka.xml
index 77c4e65..67e8489 100644
--- a/src/main/assembly/plugin-kafka.xml
+++ b/src/main/assembly/plugin-kafka.xml
@@ -36,6 +36,8 @@
</include>
<include>org.apache.hadoop:hadoop-common-plus:jar:${hadoop-common.version}
</include>
+ <include>org.apache.hadoop:hadoop-auth:jar:${hadoop-common.version}
+ </include>
<include>com.google.code.gson:gson</include>
<include>org.eclipse.persistence:eclipselink</include>
<include>org.eclipse.persistence:javax.persistence</include>