You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2016/03/02 03:48:57 UTC

incubator-ranger git commit: RANGER-838 : Tag-sync should be resilient to Ranger Admin availability

Repository: incubator-ranger
Updated Branches:
  refs/heads/master d40a02054 -> b1b0fb16c


RANGER-838 : Tag-sync should be resilient to Ranger Admin availability

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/b1b0fb16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/b1b0fb16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/b1b0fb16

Branch: refs/heads/master
Commit: b1b0fb16cd4197a67a61d8a1db2c40863739327c
Parents: d40a020
Author: Abhay Kulkarni <ak...@hortonworks.com>
Authored: Wed Jan 27 18:46:49 2016 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Mar 1 18:48:45 2016 -0800

----------------------------------------------------------------------
 .../ranger/tagsync/model/AbstractTagSource.java |  25 +--
 .../apache/ranger/tagsync/model/TagSink.java    |   4 +-
 .../apache/ranger/tagsync/model/TagSource.java  |   5 +-
 .../ranger/tagsync/process/TagSyncConfig.java   |  17 ++
 .../ranger/tagsync/process/TagSynchronizer.java |  55 +++---
 .../tagsync/sink/tagadmin/TagAdminRESTSink.java | 170 ++++++++++++++-----
 .../tagsync/source/atlas/AtlasTagSource.java    |  50 +++---
 .../source/atlasrest/AtlasRESTTagSource.java    |  45 +++--
 .../tagsync/source/atlasrest/AtlasRESTUtil.java |  91 +++++-----
 .../tagsync/source/file/FileTagSource.java      |  53 +++---
 10 files changed, 331 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
index d6baeb2..d46170a 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
@@ -27,7 +27,6 @@ import org.apache.ranger.plugin.util.ServiceTags;
 public abstract  class AbstractTagSource implements TagSource {
 	private static final Log LOG = LogFactory.getLog(AbstractTagSource.class);
 	private TagSink tagSink;
-	protected boolean shutdown = false;
 
 	@Override
 	public void setTagSink(TagSink sink) {
@@ -37,29 +36,31 @@ public abstract  class AbstractTagSource implements TagSource {
 			this.tagSink = sink;
 		}
 	}
-	@Override
-	public void synchUp() {}
 
-	public void updateSink(final ServiceTags serviceTags) {
-		if (serviceTags == null) {
+	protected void updateSink(final ServiceTags toUpload) {
+		if (toUpload == null) {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("No ServiceTags to upload");
 			}
 		} else {
 			if (LOG.isDebugEnabled()) {
-				String serviceTagsJSON = new Gson().toJson(serviceTags);
-				LOG.debug("Uploading serviceTags=" + serviceTagsJSON);
+				String toUploadJSON = new Gson().toJson(toUpload);
+				LOG.debug("Uploading serviceTags=" + toUploadJSON);
 			}
 
 			try {
-				tagSink.uploadServiceTags(serviceTags);
+				ServiceTags uploaded = tagSink.upload(toUpload);
+
+				if (LOG.isDebugEnabled()) {
+					String uploadedJSON = new Gson().toJson(uploaded);
+					LOG.debug("Uploaded serviceTags=" + uploadedJSON);
+				}
 			} catch (Exception exception) {
-				LOG.error("uploadServiceTags() failed..", exception);
+				String toUploadJSON = new Gson().toJson(toUpload);
+				LOG.error("Failed to upload serviceTags: " + toUploadJSON);
+				LOG.error("Exception : ", exception);
 			}
 		}
 	}
 
-	public void stop() {
-		shutdown = true;
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
index 9eb5319..ae66e60 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSink.java
@@ -26,5 +26,7 @@ import java.util.Properties;
 
 public interface TagSink {
 	boolean initialize(Properties properties);
-	void uploadServiceTags(ServiceTags serviceTags) throws Exception;
+	ServiceTags upload(ServiceTags toUpload) throws Exception;
+	boolean start();
+	void stop();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
index 7d19562..5ef6c57 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/TagSource.java
@@ -19,6 +19,7 @@
 
 package org.apache.ranger.tagsync.model;
 
+
 import java.util.Properties;
 
 public interface TagSource {
@@ -27,12 +28,8 @@ public interface TagSource {
 
 	void setTagSink(TagSink sink);
 
-	void synchUp();
-
 	boolean start();
 
 	void stop();
 
-	boolean isChanged();
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
index ed8679a..9588d66 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
@@ -72,6 +72,10 @@ public class TagSyncConfig extends Configuration {
 
 	private static final long DEFAULT_TAGSYNC_REST_SOURCE_DOWNLOAD_INTERVAL = 900000;
 
+	private static final String TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL_PROP = "ranger.tagsync.tagadmin.connection.check.interval";
+
+	private static final int DEFAULT_TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL = 2000;
+
 	private Properties props;
 
 	public static TagSyncConfig getInstance() {
@@ -286,6 +290,19 @@ public class TagSyncConfig extends Configuration {
 		return prop.getProperty(TAGSYNC_SOURCE_ATLAS_CUSTOM_RESOURCE_MAPPERS_PROP);
 	}
 
+	static public long getTagAdminConnectionCheckInterval(Properties prop) {
+		long ret = DEFAULT_TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL;
+		String val = prop.getProperty(TAGSYNC_TAGADMIN_CONNECTION_CHECK_INTERVAL_PROP);
+		if (StringUtils.isNotBlank(val)) {
+			try {
+				ret = Long.valueOf(val);
+			} catch (NumberFormatException exception) {
+				// Ignore
+			}
+		}
+		return ret;
+	}
+
 	private TagSyncConfig() {
 		super(false);
 		init() ;

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
index 696ce5e..5155681 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
@@ -32,10 +32,12 @@ public class TagSynchronizer {
 
 	private static final Logger LOG = Logger.getLogger(TagSynchronizer.class);
 
-	private boolean shutdownFlag = false;
+	private TagSink tagSink = null;
 	private List<TagSource> tagSources;
 	private Properties properties = null;
 
+	private final Object shutdownNotifier = new Object();
+
 	public static void main(String[] args) {
 
 		TagSynchronizer tagSynchronizer = new TagSynchronizer();
@@ -49,23 +51,28 @@ public class TagSynchronizer {
 		boolean tagSynchronizerInitialized = tagSynchronizer.initialize();
 
 		if (tagSynchronizerInitialized) {
-			tagSynchronizer.run();
+			try {
+				tagSynchronizer.run();
+			} catch (Throwable t) {
+				LOG.error("main thread caught exception..:", t);
+				System.exit(1);
+			}
 		} else {
 			LOG.error("TagSynchronizer failed to initialize correctly, exiting..");
-			System.exit(-1);
+			System.exit(1);
 		}
 
 	}
 
-	public TagSynchronizer() {
-		setProperties(null);
+	TagSynchronizer() {
+		this(null);
 	}
 
-	public TagSynchronizer(Properties properties) {
+	TagSynchronizer(Properties properties) {
 		setProperties(properties);
 	}
 
-	public void setProperties(Properties properties) {
+	void setProperties(Properties properties) {
 		if (properties == null || MapUtils.isEmpty(properties)) {
 			this.properties = new Properties();
 		} else {
@@ -89,7 +96,7 @@ public class TagSynchronizer {
 
 			LOG.info("Initializing TAG source and sink");
 
-			TagSink tagSink = initializeTagSink(properties);
+			tagSink = initializeTagSink(properties);
 
 			if (tagSink != null) {
 
@@ -113,39 +120,32 @@ public class TagSynchronizer {
 		return ret;
 	}
 
-	public void run() {
+	public void run() throws Exception {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> TagSynchronizer.run()");
 		}
 
-		long shutdownCheckIntervalInMs = 60*1000;
-
-		boolean tagSourcesStarted = true;
-
 		try {
+			boolean threadsStarted = tagSink.start();
+
 			for (TagSource tagSource : tagSources) {
-				tagSourcesStarted = tagSourcesStarted && tagSource.start();
+				threadsStarted = threadsStarted && tagSource.start();
 			}
 
-			if (tagSourcesStarted) {
-				while (!shutdownFlag) {
-					try {
-						LOG.debug("Sleeping for [" + shutdownCheckIntervalInMs + "] milliSeconds");
-						Thread.sleep(shutdownCheckIntervalInMs);
-					} catch (InterruptedException e) {
-						LOG.error("Failed to wait for [" + shutdownCheckIntervalInMs + "] milliseconds before attempting to synchronize tag information ", e);
-						break;
-					}
+			if (threadsStarted) {
+				synchronized(shutdownNotifier) {
+					shutdownNotifier.wait();
 				}
 			}
-		} catch (Throwable t) {
-			LOG.error("tag-sync main thread got an error", t);
 		} finally {
 			LOG.info("Stopping all tagSources");
 
 			for (TagSource tagSource : tagSources) {
 				tagSource.stop();
 			}
+
+			LOG.info("Stopping tagSink");
+			tagSink.stop();
 		}
 
 		if (LOG.isDebugEnabled()) {
@@ -155,7 +155,10 @@ public class TagSynchronizer {
 
 	public void shutdown(String reason) {
 		LOG.info("Received shutdown(), reason=" + reason);
-		this.shutdownFlag = true;
+
+		synchronized(shutdownNotifier) {
+			shutdownNotifier.notifyAll();
+		}
 	}
 
 	static public void printConfigurationProperties(Properties properties) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
index 1541034..c296b49 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
@@ -33,10 +33,15 @@ import org.apache.ranger.plugin.util.SearchFilter;
 import org.apache.ranger.plugin.util.ServiceTags;
 import org.apache.ranger.tagsync.process.TagSyncConfig;
 
+import javax.servlet.http.HttpServletResponse;
 import java.util.Map;
 import java.util.Properties;
 
-public class TagAdminRESTSink implements TagSink {
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TagAdminRESTSink implements TagSink, Runnable {
 	private static final Log LOG = LogFactory.getLog(TagAdminRESTSink.class);
 
 	private static final String REST_PREFIX = "/service";
@@ -46,45 +51,43 @@ public class TagAdminRESTSink implements TagSink {
 
 	private static final String REST_URL_IMPORT_SERVICETAGS_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/importservicetags/";
 
+	private long rangerAdminConnectionCheckInterval;
+
 	private RangerRESTClient tagRESTClient = null;
 
+	private BlockingQueue<UploadWorkItem> uploadWorkItems;
+
+	private Thread myThread = null;
+
 	@Override
 	public boolean initialize(Properties properties) {
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("==> TagAdminRESTSink.initialize()");
 		}
 
-		boolean ret = true;
+		boolean ret = false;
 
 		String restUrl       = TagSyncConfig.getTagAdminRESTUrl(properties);
 		String sslConfigFile = TagSyncConfig.getTagAdminRESTSslConfigFile(properties);
 		String userName = TagSyncConfig.getTagAdminUserName(properties);
 		String password = TagSyncConfig.getTagAdminPassword(properties);
+		rangerAdminConnectionCheckInterval = TagSyncConfig.getTagAdminConnectionCheckInterval(properties);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("restUrl=" + restUrl);
 			LOG.debug("sslConfigFile=" + sslConfigFile);
 			LOG.debug("userName=" + userName);
+			LOG.debug("rangerAdminConnectionCheckInterval" + rangerAdminConnectionCheckInterval);
 		}
 
-		if (StringUtils.isBlank(restUrl)) {
-			ret = false;
-			LOG.error("No value specified for property 'ranger.tagsync.tagadmin.rest.url'!");
-		} else {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("ranger.tagsync.tagadmin.rest.url:" + restUrl);
-			}
-		}
-
-		if (ret) {
+		if (StringUtils.isNotBlank(restUrl)) {
 			tagRESTClient = new RangerRESTClient(restUrl, sslConfigFile);
 			tagRESTClient.setBasicAuthInfo(userName, password);
+			uploadWorkItems = new LinkedBlockingQueue<UploadWorkItem>();
 
-			ret = testConnection();
-		}
-
-		if (!ret) {
-			LOG.error("Cannot connect to Tag Admin. Please recheck configuration properties and/or check if Tag Admin is running and responsive");
+			ret = true;
+		} else {
+			LOG.error("No value specified for property 'ranger.tagsync.tagadmin.rest.url'!");
 		}
 
 		if(LOG.isDebugEnabled()) {
@@ -94,51 +97,53 @@ public class TagAdminRESTSink implements TagSink {
 		return ret;
 	}
 
-	public boolean testConnection() {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagAdminRESTSink.testConnection()");
+	@Override
+	public ServiceTags upload(ServiceTags toUpload) throws Exception {
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("==> upload() ");
 		}
 
-		boolean ret = true;
+		UploadWorkItem uploadWorkItem = new UploadWorkItem(toUpload);
 
-		try {
-			// build a dummy serviceTags structure and upload it to test connectivity
-			ServiceTags serviceTags = new ServiceTags();
-			serviceTags.setOp(ServiceTags.OP_ADD_OR_UPDATE);
-			uploadServiceTags(serviceTags);
-		} catch (Exception exception) {
-			LOG.error("test-upload of serviceTags failed.", exception);
-			ret = false;
-		}
+		uploadWorkItems.put(uploadWorkItem);
 
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagAdminRESTSink.testConnection(), result=" + ret);
+		// Wait until message is successfully delivered
+		ServiceTags ret = uploadWorkItem.waitForUpload();
+
+		if(LOG.isDebugEnabled()) {
+			LOG.debug("<== upload()");
 		}
+
 		return ret;
 	}
 
-	@Override
-	synchronized public void uploadServiceTags(ServiceTags serviceTags) throws Exception {
+	private ServiceTags doUpload(ServiceTags serviceTags) throws Exception {
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("==> uploadServiceTags()");
+			LOG.debug("==> doUpload()");
 		}
 		WebResource webResource = createWebResource(REST_URL_IMPORT_SERVICETAGS_RESOURCE);
 
 		ClientResponse response    = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).put(ClientResponse.class, tagRESTClient.toJson(serviceTags));
 
-		if(response == null || response.getStatus() != 204) {
-			LOG.error("RangerAdmin REST call returned with response={" + response + "}");
+		if(response == null || response.getStatus() != HttpServletResponse.SC_NO_CONTENT) {
 
 			RESTResponse resp = RESTResponse.fromClientResponse(response);
 
 			LOG.error("Upload of service-tags failed with message " + resp.getMessage());
 
-			throw new Exception("Upload of service-tags failed with response: " + response);
+			if (response == null || response.getStatus() != HttpServletResponse.SC_BAD_REQUEST) {
+				// NOT an application error
+				throw new Exception("Upload of service-tags failed with response: " + response);
+			}
+
 		}
 
 		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== uploadServiceTags()");
+			LOG.debug("<== doUpload()");
 		}
+
+		return serviceTags;
 	}
 
 	private WebResource createWebResource(String url) {
@@ -159,4 +164,91 @@ public class TagAdminRESTSink implements TagSink {
 
 		return ret;
 	}
+
+	@Override
+	public boolean start() {
+
+		myThread = new Thread(this);
+		myThread.setDaemon(true);
+		myThread.start();
+
+		return true;
+	}
+
+	@Override
+	public void stop() {
+		if (myThread != null && myThread.isAlive()) {
+			myThread.interrupt();
+		}
+	}
+
+	@Override
+	public void run() {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagAdminRESTSink.run()");
+		}
+
+		while (true) {
+			UploadWorkItem uploadWorkItem;
+
+			try {
+				uploadWorkItem = uploadWorkItems.take();
+
+				ServiceTags toUpload = uploadWorkItem.getServiceTags();
+
+				boolean doRetry;
+
+				do {
+					doRetry = false;
+
+					try {
+						ServiceTags uploaded = doUpload(toUpload);
+						// ServiceTags uploaded successfully
+						uploadWorkItem.uploadCompleted(uploaded);
+					} catch (InterruptedException interrupted) {
+						LOG.error("Caught exception..: ", interrupted);
+						return;
+					} catch (Exception exception) {
+						doRetry = true;
+						Thread.sleep(rangerAdminConnectionCheckInterval);
+					}
+				} while (doRetry);
+
+			}
+			catch (InterruptedException exception) {
+				LOG.error("Interrupted..: ", exception);
+				return;
+			}
+		}
+
+	}
+
+	class UploadWorkItem {
+		private ServiceTags serviceTags;
+		private BlockingQueue<ServiceTags> uploadedServiceTags;
+
+		ServiceTags getServiceTags() {
+			return serviceTags;
+		}
+
+		ServiceTags waitForUpload() throws InterruptedException {
+			return uploadedServiceTags.take();
+		}
+
+		void uploadCompleted(ServiceTags uploaded) throws InterruptedException {
+			// ServiceTags uploaded successfully
+			uploadedServiceTags.put(uploaded);
+		}
+
+		UploadWorkItem(ServiceTags serviceTags) {
+			setServiceTags(serviceTags);
+			uploadedServiceTags = new ArrayBlockingQueue<ServiceTags>(1);
+		}
+
+		void setServiceTags(ServiceTags serviceTags) {
+			this.serviceTags = serviceTags;
+		}
+
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 2499177..49d6f61 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -50,6 +50,7 @@ public class AtlasTagSource extends AbstractTagSource {
 	public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id";
 
 	private ConsumerRunnable consumerTask;
+	private Thread myThread = null;
 
 	@Override
 	public boolean initialize(Properties properties) {
@@ -123,23 +124,24 @@ public class AtlasTagSource extends AbstractTagSource {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> AtlasTagSource.start()");
 		}
-		Thread consumerThread = null;
 		if (consumerTask == null) {
 			LOG.error("No consumerTask!!!");
 		} else {
-			consumerThread = new Thread(consumerTask);
-			consumerThread.setDaemon(true);
-			consumerThread.start();
+			myThread = new Thread(consumerTask);
+			myThread.setDaemon(true);
+			myThread.start();
 		}
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("<== AtlasTagSource.start()");
 		}
-		return consumerThread != null;
+		return myThread != null;
 	}
 
 	@Override
-	public boolean isChanged() {
-		return true;
+	public void stop() {
+		if (myThread != null && myThread.isAlive()) {
+			myThread.interrupt();
+		}
 	}
 
 	private static String getPrintableEntityNotification(EntityNotification notification) {
@@ -175,24 +177,32 @@ public class AtlasTagSource extends AbstractTagSource {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("==> ConsumerRunnable.run()");
 			}
-			while (!shutdown) {
-				if (hasNext()) {
-					EntityNotification notification = consumer.next();
-					if (notification != null) {
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Notification=" + getPrintableEntityNotification(notification));
-						}
-
-						ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification);
-						if (serviceTags == null) {
-							LOG.error("Failed to create ServiceTags for notification :" + getPrintableEntityNotification(notification));
+			while (true) {
+				try {
+					if (hasNext()) {
+						EntityNotification notification = consumer.peek();
+						if (notification != null) {
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("Notification=" + getPrintableEntityNotification(notification));
+							}
+
+							ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification);
+							if (serviceTags == null) {
+								LOG.error("Failed to create ServiceTags for notification :" + getPrintableEntityNotification(notification));
+							} else {
+								updateSink(serviceTags);
+							}
 						} else {
-							updateSink(serviceTags);
+							LOG.error("Null entityNotification received from Kafka!! Ignoring..");
 						}
+						// Move iterator forward
+						consumer.next();
 					}
+				} catch (Exception exception) {
+					LOG.error("Caught exception..: ", exception);
+					return;
 				}
 			}
-			LOG.info("Shutting down the Tag-Atlas-source thread");
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
index 9d9f25d..11ca2d8 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -46,6 +46,8 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 	private String atlasEndpoint;
 	private long sleepTimeBetweenCycleInMillis;
 
+	private Thread myThread = null;
+
 	public static void main(String[] args) {
 
 		AtlasRESTTagSource atlasRESTTagSource = new AtlasRESTTagSource();
@@ -60,9 +62,19 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 
 		if (tagSink != null) {
 
-			atlasRESTTagSource.initialize(props);
-			atlasRESTTagSource.setTagSink(tagSink);
-			atlasRESTTagSource.synchUp();
+			if (atlasRESTTagSource.initialize(props)) {
+				try {
+					tagSink.start();
+					atlasRESTTagSource.setTagSink(tagSink);
+					atlasRESTTagSource.synchUp();
+				} catch (Exception exception) {
+					LOG.error("ServiceTags upload failed : ", exception);
+					System.exit(1);
+				}
+			} else {
+				LOG.error("AtlasRESTTagSource initialized failed, exiting.");
+				System.exit(1);
+			}
 
 		} else {
 			LOG.error("TagSink initialialization failed, exiting.");
@@ -96,11 +108,18 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 	@Override
 	public boolean start() {
 
-		Thread atlasRESTInvokerThread = new Thread(this);
-		atlasRESTInvokerThread.setDaemon(true);
-		atlasRESTInvokerThread.start();
+		myThread = new Thread(this);
+		myThread.setDaemon(true);
+		myThread.start();
 
-		return atlasRESTInvokerThread != null;
+		return true;
+	}
+
+	@Override
+	public void stop() {
+		if (myThread != null && myThread.isAlive()) {
+			myThread.interrupt();
+		}
 	}
 
 	@Override
@@ -110,7 +129,7 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 			LOG.debug("==> AtlasRESTTagSource.run()");
 		}
 
-		while (!shutdown) {
+		while (true) {
 
 			synchUp();
 
@@ -121,18 +140,12 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 				Thread.sleep(sleepTimeBetweenCycleInMillis);
 
 			} catch (InterruptedException exception) {
-				LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before checking for update to tagFileSource", exception);
+				LOG.error("Interrupted..: ", exception);
+				return;
 			}
 		}
-		LOG.info("Shutting down the Tag-Atlasrest-source thread");
-	}
-
-	@Override
-	public boolean isChanged() {
-		return true;
 	}
 
-	@Override
 	public void synchUp() {
 
 		AtlasRESTUtil atlasRESTUtil = new AtlasRESTUtil(atlasEndpoint);

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
index 7f4676a..d7f983e 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
@@ -27,6 +27,7 @@ import org.apache.atlas.typesystem.Struct;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.ranger.admin.client.datatype.RESTResponse;
 import org.apache.ranger.plugin.util.RangerRESTClient;
@@ -114,43 +115,46 @@ public class AtlasRESTUtil {
 
 					Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid);
 
-					if (MapUtils.isNotEmpty(entityResponse) && entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+					Map<String, Object> definition = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class);
 
-						Map<String, Object> definition = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class);
-						Map<String, Object> traitsAttribute = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
+					Map<String, Object> traitsAttribute = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
 
-						if (MapUtils.isNotEmpty(traitsAttribute)) {
+					if (MapUtils.isNotEmpty(traitsAttribute)) {
 
-							List<IStruct> allTraits = new LinkedList<>();
+						List<IStruct> allTraits = new LinkedList<>();
 
-							for (Map.Entry<String, Object> entry : traitsAttribute.entrySet()) {
+						for (Map.Entry<String, Object> entry : traitsAttribute.entrySet()) {
 
-								Map<String, Object> trait = (Map<String, Object>) entry.getValue();
+							Map<String, Object> trait = (Map<String, Object>) entry.getValue();
 
-								Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
-								String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
+							Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
+							String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
 
-								List<IStruct> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
+							if (StringUtils.isEmpty(traitTypeName)) {
+								continue;
+							}
 
-								Struct trait1 = new Struct(traitTypeName, traitValues);
+							List<IStruct> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
 
-								allTraits.add(trait1);
-								allTraits.addAll(superTypes);
-							}
+							Struct trait1 = new Struct(traitTypeName, traitValues);
+
+							allTraits.add(trait1);
+							allTraits.addAll(superTypes);
+						}
 
-							IReferenceableInstance entity = InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), true);
+						IReferenceableInstance entity = InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), true);
 
-							if (entity != null) {
-								AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entity, allTraits);
-								ret.add(entityWithTraits);
-							} else {
-								if (LOG.isInfoEnabled()) {
-									LOG.info("Could not create Atlas entity from its definition, type=" + type + ", guid=" + guid);
-								}
+						if (entity != null) {
+							AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entity, allTraits);
+							ret.add(entityWithTraits);
+						} else {
+							if (LOG.isInfoEnabled()) {
+								LOG.info("Could not create Atlas entity from its definition, type=" + type + ", guid=" + guid);
 							}
-
 						}
+
 					}
+
 				}
 			}
 			if (LOG.isDebugEnabled()) {
@@ -169,15 +173,12 @@ public class AtlasRESTUtil {
 
 		Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName);
 
-		if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
-
-			Map<String, Object> definition = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, Map.class);
+		Map<String, Object> definition = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, Map.class);
 
-			List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class);
+		List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class);
 
-			if (traitTypes.size() > 0) {
-				ret = (Map<String, Object>) traitTypes.get(0);
-			}
+		if (CollectionUtils.isNotEmpty(traitTypes)) {
+			ret = (Map<String, Object>) traitTypes.get(0);
 		}
 
 		if (LOG.isDebugEnabled()) {
@@ -197,28 +198,30 @@ public class AtlasRESTUtil {
 
 			List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class);
 
-			for (String superTypeName : superTypeNames) {
+			if (CollectionUtils.isNotEmpty(superTypeNames)) {
+				for (String superTypeName : superTypeNames) {
 
-				Map<String, Object> superTraitType = getTraitType(superTypeName);
+					Map<String, Object> superTraitType = getTraitType(superTypeName);
 
-				if (superTraitType != null) {
-					List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
+					if (superTraitType != null) {
+						List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
 
-					Map<String, Object> superTypeValues = new HashMap<>();
-					for (Map<String, Object> attributeDefinition : attributeDefinitions) {
+						Map<String, Object> superTypeValues = new HashMap<>();
+						for (Map<String, Object> attributeDefinition : attributeDefinitions) {
 
-						String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString();
-						if (values.containsKey(attributeName)) {
-							superTypeValues.put(attributeName, values.get(attributeName));
+							String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString();
+							if (values.containsKey(attributeName)) {
+								superTypeValues.put(attributeName, values.get(attributeName));
+							}
 						}
-					}
 
-					List<IStruct> superTraits = getTraitSuperTypes(getTraitType(superTypeName), values);
+						List<IStruct> superTraits = getTraitSuperTypes(getTraitType(superTypeName), values);
 
-					Struct superTrait = new Struct(superTypeName, superTypeValues);
+						Struct superTrait = new Struct(superTypeName, superTypeValues);
 
-					ret.add(superTrait);
-					ret.addAll(superTraits);
+						ret.add(superTrait);
+						ret.addAll(superTraits);
+					}
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/b1b0fb16/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
index e339d5e..e22681e 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
@@ -49,6 +49,8 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
 	private Properties properties;
 	private long fileModTimeCheckIntervalInMs;
 
+	private Thread myThread = null;
+
 	public static void main(String[] args) {
 
 		FileTagSource fileTagSource = new FileTagSource();
@@ -69,9 +71,19 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
 
 		if (tagSink != null) {
 
-			fileTagSource.initialize(props);
-			fileTagSource.setTagSink(tagSink);
-			fileTagSource.synchUp();
+			if (fileTagSource.initialize(props)) {
+				try {
+					tagSink.start();
+					fileTagSource.setTagSink(tagSink);
+					fileTagSource.synchUp();
+				} catch (Exception exception) {
+					LOG.error("ServiceTags upload failed : ", exception);
+					System.exit(1);
+				}
+			} else {
+				LOG.error("FileTagSource initialized failed, exiting.");
+				System.exit(1);
+			}
 
 		} else {
 			LOG.error("TagSink initialialization failed, exiting.");
@@ -177,11 +189,18 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
 	@Override
 	public boolean start() {
 
-		Thread fileMonitoringThread = new Thread(this);
-		fileMonitoringThread.setDaemon(true);
-		fileMonitoringThread.start();
+		myThread = new Thread(this);
+		myThread.setDaemon(true);
+		myThread.start();
+
+		return true;
+	}
 
-		return fileMonitoringThread != null;
+	@Override
+	public void stop() {
+		if (myThread != null && myThread.isAlive()) {
+			myThread.interrupt();
+		}
 	}
 
 	@Override
@@ -190,7 +209,7 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
 			LOG.debug("==> FileTagSource.run()");
 		}
 
-		while (!shutdown) {
+		while (true) {
 
 			try {
 				synchUp();
@@ -199,23 +218,14 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
 
 				Thread.sleep(fileModTimeCheckIntervalInMs);
 			}
-			catch (InterruptedException e) {
-				LOG.error("Failed to wait for [" + fileModTimeCheckIntervalInMs + "] milliseconds before checking for update to tagFileSource", e);
-			}
-			catch (Throwable t) {
-				LOG.error("tag-sync thread got an error", t);
+			catch (InterruptedException exception) {
+				LOG.error("Interrupted..: ", exception);
+				return;
 			}
 		}
-
-		LOG.info("Shutting down the Tag-file-source thread");
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== FileTagSource.run()");
-		}
 	}
 
-	@Override
-	public 	boolean isChanged() {
+	private boolean isChanged() {
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> FileTagSource.isChanged()");
@@ -240,7 +250,6 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
 		return ret;
 	}
 
-	@Override
 	public void synchUp() {
 		if (isChanged()) {
 			if (LOG.isDebugEnabled()) {