You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2016/03/02 03:48:57 UTC
incubator-ranger git commit: RANGER-838 : Tag-sync should be
resilient to Ranger Admin availability
Repository: incubator-ranger
Updated Branches:
refs/heads/master d40a02054 -> b1b0fb16c
RANGER-838 : Tag-sync should be resilient to Ranger Admin availability
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/b1b0fb16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/b1b0fb16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/b1b0fb16
Branch: refs/heads/master
Commit: b1b0fb16cd4197a67a61d8a1db2c40863739327c
Parents: d40a020
Author: Abhay Kulkarni <ak...@hortonworks.com>
Authored: Wed Jan 27 18:46:49 2016 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Mar 1 18:48:45 2016 -0800
----------------------------------------------------------------------
.../ranger/tagsync/model/AbstractTagSource.java | 25 +--
.../apache/ranger/tagsync/model/TagSink.java | 4 +-
.../apache/ranger/tagsync/model/TagSource.java | 5 +-
.../ranger/tagsync/process/TagSyncConfig.java | 17 ++
.../ranger/tagsync/process/TagSynchronizer.java | 55 +++---
.../tagsync/sink/tagadmin/TagAdminRESTSink.java | 170 ++++++++++++++-----
.../tagsync/source/atlas/AtlasTagSource.java | 50 +++---
.../source/atlasrest/AtlasRESTTagSource.java | 45 +++--
.../tagsync/source/atlasrest/AtlasRESTUtil.java | 91 +++++-----
.../tagsync/source/file/FileTagSource.java | 53 +++---
10 files changed, 331 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
index d6baeb2..d46170a 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
@@ -27,7 +27,6 @@ import org.apache.ranger.plugin.util.ServiceTags;
public abstract class AbstractTagSource implements TagSource {
private static final Log LOG = LogFactory.getLog(AbstractTagSource.class);
private TagSink tagSink;
- protected boolean shutdown = false;
@Override
public void setTagSink(TagSink sink) {
@@ -37,29 +36,31 @@ public abstract class AbstractTagSource implements TagSource {
this.tagSink = sink;
}
}
- @Override
- public void synchUp() {}
- public void updateSink(final ServiceTags serviceTags) {
- if (serviceTags == null) {
+ protected void updateSink(final ServiceTags toUpload) {
+ if (toUpload == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("No ServiceTags to upload");
}
} else {
if (LOG.isDebugEnabled()) {
- String serviceTagsJSON = new Gson().toJson(serviceTags);
- LOG.debug("Uploading serviceTags=" + serviceTagsJSON);
+ String toUploadJSON = new Gson().toJson(toUpload);
+ LOG.debug("Uploading serviceTags=" + toUploadJSON);
}
try {
- tagSink.uploadServiceTags(serviceTags);
+ ServiceTags uploaded = tagSink.upload(toUpload);
+
+ if (LOG.isDebugEnabled()) {
+ String uploadedJSON = new Gson().toJson(uploaded);
+ LOG.debug("Uploaded serviceTags=" + uploadedJSON);
+ }
} catch (Exception exception) {
- LOG.error("uploadServiceTags() failed..", exception);
+ String toUploadJSON = new Gson().toJson(toUpload);
+ LOG.error("Failed to upload serviceTags: " + toUploadJSON);
+ LOG.error("Exception : ", exception);
}
}
}
- public void stop() {
- shutdown = true;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
index 9eb5319..ae66e60 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
@@ -26,5 +26,7 @@ import java.util.Properties;
public interface TagSink {
boolean initialize(Properties properties);
- void uploadServiceTags(ServiceTags serviceTags) throws Exception;
+ ServiceTags upload(ServiceTags toUpload) throws Exception;
+ boolean start();
+ void stop();
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
index 7d19562..5ef6c57 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
@@ -19,6 +19,7 @@
package org.apache.ranger.tagsync.model;
+
import java.util.Properties;
public interface TagSource {
@@ -27,12 +28,8 @@ public interface TagSource {
void setTagSink(TagSink sink);
- void synchUp();
-
boolean start();
void stop();
- boolean isChanged();
-
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
index ed8679a..9588d66 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
@@ -72,6 +72,10 @@ public class TagSyncConfig extends Configuration {
private static final long DEFAULT_TAGSYNC_REST_SOURCE_DOWNLOAD_INTERVAL = 900000;
+ private static final String TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL_PROP = "ranger.tagsync.tagadmin.connection.check.interval";
+
+ private static final int DEFAULT_TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL = 2000;
+
private Properties props;
public static TagSyncConfig getInstance() {
@@ -286,6 +290,19 @@ public class TagSyncConfig extends Configuration {
return prop.getProperty(TAGSYNC_SOURCE_ATLAS_CUSTOM_RESOURCE_MAPPERS_PROP);
}
+ static public long getTagAdminConnectionCheckInterval(Properties prop) {
+ long ret = DEFAULT_TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL;
+ String val = prop.getProperty(TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL_PROP);
+ if (StringUtils.isNotBlank(val)) {
+ try {
+ ret = Long.valueOf(val);
+ } catch (NumberFormatException exception) {
+ // Ignore
+ }
+ }
+ return ret;
+ }
+
private TagSyncConfig() {
super(false);
init() ;
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
index 696ce5e..5155681 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
@@ -32,10 +32,12 @@ public class TagSynchronizer {
private static final Logger LOG = Logger.getLogger(TagSynchronizer.class);
- private boolean shutdownFlag = false;
+ private TagSink tagSink = null;
private List<TagSource> tagSources;
private Properties properties = null;
+ private final Object shutdownNotifier = new Object();
+
public static void main(String[] args) {
TagSynchronizer tagSynchronizer = new TagSynchronizer();
@@ -49,23 +51,28 @@ public class TagSynchronizer {
boolean tagSynchronizerInitialized = tagSynchronizer.initialize();
if (tagSynchronizerInitialized) {
- tagSynchronizer.run();
+ try {
+ tagSynchronizer.run();
+ } catch (Throwable t) {
+ LOG.error("main thread caught exception..:", t);
+ System.exit(1);
+ }
} else {
LOG.error("TagSynchronizer failed to initialize correctly, exiting..");
- System.exit(-1);
+ System.exit(1);
}
}
- public TagSynchronizer() {
- setProperties(null);
+ TagSynchronizer() {
+ this(null);
}
- public TagSynchronizer(Properties properties) {
+ TagSynchronizer(Properties properties) {
setProperties(properties);
}
- public void setProperties(Properties properties) {
+ void setProperties(Properties properties) {
if (properties == null || MapUtils.isEmpty(properties)) {
this.properties = new Properties();
} else {
@@ -89,7 +96,7 @@ public class TagSynchronizer {
LOG.info("Initializing TAG source and sink");
- TagSink tagSink = initializeTagSink(properties);
+ tagSink = initializeTagSink(properties);
if (tagSink != null) {
@@ -113,39 +120,32 @@ public class TagSynchronizer {
return ret;
}
- public void run() {
+ public void run() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("==> TagSynchronizer.run()");
}
- long shutdownCheckIntervalInMs = 60*1000;
-
- boolean tagSourcesStarted = true;
-
try {
+ boolean threadsStarted = tagSink.start();
+
for (TagSource tagSource : tagSources) {
- tagSourcesStarted = tagSourcesStarted && tagSource.start();
+ threadsStarted = threadsStarted && tagSource.start();
}
- if (tagSourcesStarted) {
- while (!shutdownFlag) {
- try {
- LOG.debug("Sleeping for [" + shutdownCheckIntervalInMs + "] milliSeconds");
- Thread.sleep(shutdownCheckIntervalInMs);
- } catch (InterruptedException e) {
- LOG.error("Failed to wait for [" + shutdownCheckIntervalInMs + "] milliseconds before attempting to synchronize tag information ", e);
- break;
- }
+ if (threadsStarted) {
+ synchronized(shutdownNotifier) {
+ shutdownNotifier.wait();
}
}
- } catch (Throwable t) {
- LOG.error("tag-sync main thread got an error", t);
} finally {
LOG.info("Stopping all tagSources");
for (TagSource tagSource : tagSources) {
tagSource.stop();
}
+
+ LOG.info("Stopping tagSink");
+ tagSink.stop();
}
if (LOG.isDebugEnabled()) {
@@ -155,7 +155,10 @@ public class TagSynchronizer {
public void shutdown(String reason) {
LOG.info("Received shutdown(), reason=" + reason);
- this.shutdownFlag = true;
+
+ synchronized(shutdownNotifier) {
+ shutdownNotifier.notifyAll();
+ }
}
static public void printConfigurationProperties(Properties properties) {
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
index 1541034..c296b49 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
@@ -33,10 +33,15 @@ import org.apache.ranger.plugin.util.SearchFilter;
import org.apache.ranger.plugin.util.ServiceTags;
import org.apache.ranger.tagsync.process.TagSyncConfig;
+import javax.servlet.http.HttpServletResponse;
import java.util.Map;
import java.util.Properties;
-public class TagAdminRESTSink implements TagSink {
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TagAdminRESTSink implements TagSink, Runnable {
private static final Log LOG = LogFactory.getLog(TagAdminRESTSink.class);
private static final String REST_PREFIX = "/service";
@@ -46,45 +51,43 @@ public class TagAdminRESTSink implements TagSink {
private static final String REST_URL_IMPORT_SERVICETAGS_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/importservicetags/";
+ private long rangerAdminConnectionCheckInterval;
+
private RangerRESTClient tagRESTClient = null;
+ private BlockingQueue<UploadWorkItem> uploadWorkItems;
+
+ private Thread myThread = null;
+
@Override
public boolean initialize(Properties properties) {
if(LOG.isDebugEnabled()) {
LOG.debug("==> TagAdminRESTSink.initialize()");
}
- boolean ret = true;
+ boolean ret = false;
String restUrl = TagSyncConfig.getTagAdminRESTUrl(properties);
String sslConfigFile = TagSyncConfig.getTagAdminRESTSslConfigFile(properties);
String userName = TagSyncConfig.getTagAdminUserName(properties);
String password = TagSyncConfig.getTagAdminPassword(properties);
+ rangerAdminConnectionCheckInterval = TagSyncConfig.getTagAdminConnectionCheckInterval(properties);
if (LOG.isDebugEnabled()) {
LOG.debug("restUrl=" + restUrl);
LOG.debug("sslConfigFile=" + sslConfigFile);
LOG.debug("userName=" + userName);
+ LOG.debug("rangerAdminConnectionCheckInterval" + rangerAdminConnectionCheckInterval);
}
- if (StringUtils.isBlank(restUrl)) {
- ret = false;
- LOG.error("No value specified for property 'ranger.tagsync.tagadmin.rest.url'!");
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("ranger.tagsync.tagadmin.rest.url:" + restUrl);
- }
- }
-
- if (ret) {
+ if (StringUtils.isNotBlank(restUrl)) {
tagRESTClient = new RangerRESTClient(restUrl, sslConfigFile);
tagRESTClient.setBasicAuthInfo(userName, password);
+ uploadWorkItems = new LinkedBlockingQueue<UploadWorkItem>();
- ret = testConnection();
- }
-
- if (!ret) {
- LOG.error("Cannot connect to Tag Admin. Please recheck configuration properties and/or check if Tag Admin is running and responsive");
+ ret = true;
+ } else {
+ LOG.error("No value specified for property 'ranger.tagsync.tagadmin.rest.url'!");
}
if(LOG.isDebugEnabled()) {
@@ -94,51 +97,53 @@ public class TagAdminRESTSink implements TagSink {
return ret;
}
- public boolean testConnection() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> TagAdminRESTSink.testConnection()");
+ @Override
+ public ServiceTags upload(ServiceTags toUpload) throws Exception {
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("==> upload() ");
}
- boolean ret = true;
+ UploadWorkItem uploadWorkItem = new UploadWorkItem(toUpload);
- try {
- // build a dummy serviceTags structure and upload it to test connectivity
- ServiceTags serviceTags = new ServiceTags();
- serviceTags.setOp(ServiceTags.OP_ADD_OR_UPDATE);
- uploadServiceTags(serviceTags);
- } catch (Exception exception) {
- LOG.error("test-upload of serviceTags failed.", exception);
- ret = false;
- }
+ uploadWorkItems.put(uploadWorkItem);
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== TagAdminRESTSink.testConnection(), result=" + ret);
+ // Wait until message is successfully delivered
+ ServiceTags ret = uploadWorkItem.waitForUpload();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("<== upload()");
}
+
return ret;
}
- @Override
- synchronized public void uploadServiceTags(ServiceTags serviceTags) throws Exception {
+ private ServiceTags doUpload(ServiceTags serviceTags) throws Exception {
if(LOG.isDebugEnabled()) {
- LOG.debug("==> uploadServiceTags()");
+ LOG.debug("==> doUpload()");
}
WebResource webResource = createWebResource(REST_URL_IMPORT_SERVICETAGS_RESOURCE);
ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).put(ClientResponse.class, tagRESTClient.toJson(serviceTags));
- if(response == null || response.getStatus() != 204) {
- LOG.error("RangerAdmin REST call returned with response={" + response + "}");
+ if(response == null || response.getStatus() != HttpServletResponse.SC_NO_CONTENT) {
RESTResponse resp = RESTResponse.fromClientResponse(response);
LOG.error("Upload of service-tags failed with message " + resp.getMessage());
- throw new Exception("Upload of service-tags failed with response: " + response);
+ if (response == null || response.getStatus() != HttpServletResponse.SC_BAD_REQUEST) {
+ // NOT an application error
+ throw new Exception("Upload of service-tags failed with response: " + response);
+ }
+
}
if(LOG.isDebugEnabled()) {
- LOG.debug("<== uploadServiceTags()");
+ LOG.debug("<== doUpload()");
}
+
+ return serviceTags;
}
private WebResource createWebResource(String url) {
@@ -159,4 +164,91 @@ public class TagAdminRESTSink implements TagSink {
return ret;
}
+
+ @Override
+ public boolean start() {
+
+ myThread = new Thread(this);
+ myThread.setDaemon(true);
+ myThread.start();
+
+ return true;
+ }
+
+ @Override
+ public void stop() {
+ if (myThread != null && myThread.isAlive()) {
+ myThread.interrupt();
+ }
+ }
+
+ @Override
+ public void run() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> TagAdminRESTSink.run()");
+ }
+
+ while (true) {
+ UploadWorkItem uploadWorkItem;
+
+ try {
+ uploadWorkItem = uploadWorkItems.take();
+
+ ServiceTags toUpload = uploadWorkItem.getServiceTags();
+
+ boolean doRetry;
+
+ do {
+ doRetry = false;
+
+ try {
+ ServiceTags uploaded = doUpload(toUpload);
+ // ServiceTags uploaded successfully
+ uploadWorkItem.uploadCompleted(uploaded);
+ } catch (InterruptedException interrupted) {
+ LOG.error("Caught exception..: ", interrupted);
+ return;
+ } catch (Exception exception) {
+ doRetry = true;
+ Thread.sleep(rangerAdminConnectionCheckInterval);
+ }
+ } while (doRetry);
+
+ }
+ catch (InterruptedException exception) {
+ LOG.error("Interrupted..: ", exception);
+ return;
+ }
+ }
+
+ }
+
+ class UploadWorkItem {
+ private ServiceTags serviceTags;
+ private BlockingQueue<ServiceTags> uploadedServiceTags;
+
+ ServiceTags getServiceTags() {
+ return serviceTags;
+ }
+
+ ServiceTags waitForUpload() throws InterruptedException {
+ return uploadedServiceTags.take();
+ }
+
+ void uploadCompleted(ServiceTags uploaded) throws InterruptedException {
+ // ServiceTags uploaded successfully
+ uploadedServiceTags.put(uploaded);
+ }
+
+ UploadWorkItem(ServiceTags serviceTags) {
+ setServiceTags(serviceTags);
+ uploadedServiceTags = new ArrayBlockingQueue<ServiceTags>(1);
+ }
+
+ void setServiceTags(ServiceTags serviceTags) {
+ this.serviceTags = serviceTags;
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 2499177..49d6f61 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -50,6 +50,7 @@ public class AtlasTagSource extends AbstractTagSource {
public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id";
private ConsumerRunnable consumerTask;
+ private Thread myThread = null;
@Override
public boolean initialize(Properties properties) {
@@ -123,23 +124,24 @@ public class AtlasTagSource extends AbstractTagSource {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AtlasTagSource.start()");
}
- Thread consumerThread = null;
if (consumerTask == null) {
LOG.error("No consumerTask!!!");
} else {
- consumerThread = new Thread(consumerTask);
- consumerThread.setDaemon(true);
- consumerThread.start();
+ myThread = new Thread(consumerTask);
+ myThread.setDaemon(true);
+ myThread.start();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTagSource.start()");
}
- return consumerThread != null;
+ return myThread != null;
}
@Override
- public boolean isChanged() {
- return true;
+ public void stop() {
+ if (myThread != null && myThread.isAlive()) {
+ myThread.interrupt();
+ }
}
private static String getPrintableEntityNotification(EntityNotification notification) {
@@ -175,24 +177,32 @@ public class AtlasTagSource extends AbstractTagSource {
if (LOG.isDebugEnabled()) {
LOG.debug("==> ConsumerRunnable.run()");
}
- while (!shutdown) {
- if (hasNext()) {
- EntityNotification notification = consumer.next();
- if (notification != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Notification=" + getPrintableEntityNotification(notification));
- }
-
- ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification);
- if (serviceTags == null) {
- LOG.error("Failed to create ServiceTags for notification :" + getPrintableEntityNotification(notification));
+ while (true) {
+ try {
+ if (hasNext()) {
+ EntityNotification notification = consumer.peek();
+ if (notification != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Notification=" + getPrintableEntityNotification(notification));
+ }
+
+ ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification);
+ if (serviceTags == null) {
+ LOG.error("Failed to create ServiceTags for notification :" + getPrintableEntityNotification(notification));
+ } else {
+ updateSink(serviceTags);
+ }
} else {
- updateSink(serviceTags);
+ LOG.error("Null entityNotification received from Kafka!! Ignoring..");
}
+ // Move iterator forward
+ consumer.next();
}
+ } catch (Exception exception) {
+ LOG.error("Caught exception..: ", exception);
+ return;
}
}
- LOG.info("Shutting down the Tag-Atlas-source thread");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
index 9d9f25d..11ca2d8 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -46,6 +46,8 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
private String atlasEndpoint;
private long sleepTimeBetweenCycleInMillis;
+ private Thread myThread = null;
+
public static void main(String[] args) {
AtlasRESTTagSource atlasRESTTagSource = new AtlasRESTTagSource();
@@ -60,9 +62,19 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
if (tagSink != null) {
- atlasRESTTagSource.initialize(props);
- atlasRESTTagSource.setTagSink(tagSink);
- atlasRESTTagSource.synchUp();
+ if (atlasRESTTagSource.initialize(props)) {
+ try {
+ tagSink.start();
+ atlasRESTTagSource.setTagSink(tagSink);
+ atlasRESTTagSource.synchUp();
+ } catch (Exception exception) {
+ LOG.error("ServiceTags upload failed : ", exception);
+ System.exit(1);
+ }
+ } else {
+ LOG.error("AtlasRESTTagSource initialized failed, exiting.");
+ System.exit(1);
+ }
} else {
LOG.error("TagSink initialialization failed, exiting.");
@@ -96,11 +108,18 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
@Override
public boolean start() {
- Thread atlasRESTInvokerThread = new Thread(this);
- atlasRESTInvokerThread.setDaemon(true);
- atlasRESTInvokerThread.start();
+ myThread = new Thread(this);
+ myThread.setDaemon(true);
+ myThread.start();
- return atlasRESTInvokerThread != null;
+ return true;
+ }
+
+ @Override
+ public void stop() {
+ if (myThread != null && myThread.isAlive()) {
+ myThread.interrupt();
+ }
}
@Override
@@ -110,7 +129,7 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
LOG.debug("==> AtlasRESTTagSource.run()");
}
- while (!shutdown) {
+ while (true) {
synchUp();
@@ -121,18 +140,12 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
Thread.sleep(sleepTimeBetweenCycleInMillis);
} catch (InterruptedException exception) {
- LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before checking for update to tagFileSource", exception);
+ LOG.error("Interrupted..: ", exception);
+ return;
}
}
- LOG.info("Shutting down the Tag-Atlasrest-source thread");
- }
-
- @Override
- public boolean isChanged() {
- return true;
}
- @Override
public void synchUp() {
AtlasRESTUtil atlasRESTUtil = new AtlasRESTUtil(atlasEndpoint);
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
index 7f4676a..d7f983e 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
@@ -27,6 +27,7 @@ import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.ranger.admin.client.datatype.RESTResponse;
import org.apache.ranger.plugin.util.RangerRESTClient;
@@ -114,43 +115,46 @@ public class AtlasRESTUtil {
Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid);
- if (MapUtils.isNotEmpty(entityResponse) && entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+ Map<String, Object> definition = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class);
- Map<String, Object> definition = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class);
- Map<String, Object> traitsAttribute = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
+ Map<String, Object> traitsAttribute = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
- if (MapUtils.isNotEmpty(traitsAttribute)) {
+ if (MapUtils.isNotEmpty(traitsAttribute)) {
- List<IStruct> allTraits = new LinkedList<>();
+ List<IStruct> allTraits = new LinkedList<>();
- for (Map.Entry<String, Object> entry : traitsAttribute.entrySet()) {
+ for (Map.Entry<String, Object> entry : traitsAttribute.entrySet()) {
- Map<String, Object> trait = (Map<String, Object>) entry.getValue();
+ Map<String, Object> trait = (Map<String, Object>) entry.getValue();
- Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
- String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
+ Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
+ String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
- List<IStruct> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
+ if (StringUtils.isEmpty(traitTypeName)) {
+ continue;
+ }
- Struct trait1 = new Struct(traitTypeName, traitValues);
+ List<IStruct> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
- allTraits.add(trait1);
- allTraits.addAll(superTypes);
- }
+ Struct trait1 = new Struct(traitTypeName, traitValues);
+
+ allTraits.add(trait1);
+ allTraits.addAll(superTypes);
+ }
- IReferenceableInstance entity = InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), true);
+ IReferenceableInstance entity = InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), true);
- if (entity != null) {
- AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entity, allTraits);
- ret.add(entityWithTraits);
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info("Could not create Atlas entity from its definition, type=" + type + ", guid=" + guid);
- }
+ if (entity != null) {
+ AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entity, allTraits);
+ ret.add(entityWithTraits);
+ } else {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Could not create Atlas entity from its definition, type=" + type + ", guid=" + guid);
}
-
}
+
}
+
}
}
if (LOG.isDebugEnabled()) {
@@ -169,15 +173,12 @@ public class AtlasRESTUtil {
Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName);
- if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
-
- Map<String, Object> definition = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, Map.class);
+ Map<String, Object> definition = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, Map.class);
- List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class);
+ List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class);
- if (traitTypes.size() > 0) {
- ret = (Map<String, Object>) traitTypes.get(0);
- }
+ if (CollectionUtils.isNotEmpty(traitTypes)) {
+ ret = (Map<String, Object>) traitTypes.get(0);
}
if (LOG.isDebugEnabled()) {
@@ -197,28 +198,30 @@ public class AtlasRESTUtil {
List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class);
- for (String superTypeName : superTypeNames) {
+ if (CollectionUtils.isNotEmpty(superTypeNames)) {
+ for (String superTypeName : superTypeNames) {
- Map<String, Object> superTraitType = getTraitType(superTypeName);
+ Map<String, Object> superTraitType = getTraitType(superTypeName);
- if (superTraitType != null) {
- List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
+ if (superTraitType != null) {
+ List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
- Map<String, Object> superTypeValues = new HashMap<>();
- for (Map<String, Object> attributeDefinition : attributeDefinitions) {
+ Map<String, Object> superTypeValues = new HashMap<>();
+ for (Map<String, Object> attributeDefinition : attributeDefinitions) {
- String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString();
- if (values.containsKey(attributeName)) {
- superTypeValues.put(attributeName, values.get(attributeName));
+ String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString();
+ if (values.containsKey(attributeName)) {
+ superTypeValues.put(attributeName, values.get(attributeName));
+ }
}
- }
- List<IStruct> superTraits = getTraitSuperTypes(getTraitType(superTypeName), values);
+ List<IStruct> superTraits = getTraitSuperTypes(getTraitType(superTypeName), values);
- Struct superTrait = new Struct(superTypeName, superTypeValues);
+ Struct superTrait = new Struct(superTypeName, superTypeValues);
- ret.add(superTrait);
- ret.addAll(superTraits);
+ ret.add(superTrait);
+ ret.addAll(superTraits);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
index e339d5e..e22681e 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
@@ -49,6 +49,8 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
private Properties properties;
private long fileModTimeCheckIntervalInMs;
+ private Thread myThread = null;
+
public static void main(String[] args) {
FileTagSource fileTagSource = new FileTagSource();
@@ -69,9 +71,19 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
if (tagSink != null) {
- fileTagSource.initialize(props);
- fileTagSource.setTagSink(tagSink);
- fileTagSource.synchUp();
+ if (fileTagSource.initialize(props)) {
+ try {
+ tagSink.start();
+ fileTagSource.setTagSink(tagSink);
+ fileTagSource.synchUp();
+ } catch (Exception exception) {
+ LOG.error("ServiceTags upload failed : ", exception);
+ System.exit(1);
+ }
+ } else {
+ LOG.error("FileTagSource initialized failed, exiting.");
+ System.exit(1);
+ }
} else {
LOG.error("TagSink initialialization failed, exiting.");
@@ -177,11 +189,18 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
@Override
public boolean start() {
- Thread fileMonitoringThread = new Thread(this);
- fileMonitoringThread.setDaemon(true);
- fileMonitoringThread.start();
+ myThread = new Thread(this);
+ myThread.setDaemon(true);
+ myThread.start();
+
+ return true;
+ }
- return fileMonitoringThread != null;
+ @Override
+ public void stop() {
+ if (myThread != null && myThread.isAlive()) {
+ myThread.interrupt();
+ }
}
@Override
@@ -190,7 +209,7 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
LOG.debug("==> FileTagSource.run()");
}
- while (!shutdown) {
+ while (true) {
try {
synchUp();
@@ -199,23 +218,14 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
Thread.sleep(fileModTimeCheckIntervalInMs);
}
- catch (InterruptedException e) {
- LOG.error("Failed to wait for [" + fileModTimeCheckIntervalInMs + "] milliseconds before checking for update to tagFileSource", e);
- }
- catch (Throwable t) {
- LOG.error("tag-sync thread got an error", t);
+ catch (InterruptedException exception) {
+ LOG.error("Interrupted..: ", exception);
+ return;
}
}
-
- LOG.info("Shutting down the Tag-file-source thread");
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== FileTagSource.run()");
- }
}
- @Override
- public boolean isChanged() {
+ private boolean isChanged() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> FileTagSource.isChanged()");
@@ -240,7 +250,6 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
return ret;
}
- @Override
public void synchUp() {
if (isChanged()) {
if (LOG.isDebugEnabled()) {