You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2015/10/28 17:00:04 UTC

[28/50] [abbrv] incubator-ranger git commit: RANGER-660: updates to setup/install ranger-tagsync

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
new file mode 100644
index 0000000..243aee5
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
@@ -0,0 +1,589 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.atlas.typesystem.EntityImpl;
+import org.apache.atlas.typesystem.IdImpl;
+import org.apache.atlas.typesystem.TraitImpl;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.notification.entity.EntityNotification;
+import org.apache.atlas.notification.entity.EntityNotificationConsumer;
+import org.apache.atlas.notification.entity.EntityNotificationConsumerProvider;
+import org.apache.atlas.typesystem.api.Entity;
+import org.apache.atlas.typesystem.api.Trait;
+import org.apache.ranger.admin.client.datatype.RESTResponse;
+import org.apache.ranger.tagsync.model.TagSink;
+import org.apache.ranger.tagsync.model.TagSource;
+import org.apache.ranger.plugin.util.RangerRESTClient;
+import org.apache.ranger.plugin.util.RangerRESTUtils;
+import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class TagAtlasSource implements TagSource {
+	private static final Log LOG = LogFactory.getLog(TagAtlasSource.class);
+
+
+	private final Map<String, Entity> entities = new LinkedHashMap<>();
+	private TagSink tagSink;
+	private Properties properties;
+	private ConsumerRunnable consumerTask;
+
+	@Override
+	public boolean initialize(Properties properties) {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagAtlasSource.initialize()");
+		}
+
+		boolean ret = true;
+
+		if (properties == null || MapUtils.isEmpty(properties)) {
+			LOG.error("No properties specified for TagFileSource initialization");
+			this.properties = new Properties();
+		} else {
+			this.properties = properties;
+		}
+
+
+		NotificationModule notificationModule = new NotificationModule();
+
+		Injector injector = Guice.createInjector(notificationModule);
+
+		EntityNotificationConsumerProvider consumerProvider = injector.getInstance(EntityNotificationConsumerProvider.class);
+
+		consumerTask = new ConsumerRunnable(consumerProvider.get());
+
+		//ExecutorService executorService = Executors.newFixedThreadPool(1);
+
+		//executorService.submit(new ConsumerRunnable(consumerProvider.get()));
+
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagAtlasSource.initialize(), result=" + ret);
+		}
+		return ret;
+	}
+
+	@Override
+	public void setTagSink(TagSink sink) {
+		if (sink == null) {
+			LOG.error("Sink is null!!!");
+		} else {
+			this.tagSink = sink;
+		}
+	}
+
+	@Override
+	public Thread start() {
+		Thread consumerThread = null;
+		if (consumerTask == null) {
+			LOG.error("No consumerTask!!!");
+		} else {
+			consumerThread = new Thread(consumerTask);
+			consumerThread.setDaemon(true);
+			consumerThread.start();
+		}
+		return consumerThread;
+	}
+
+	@Override
+	public void updateSink() throws Exception {
+	}
+
+	@Override
+	public 	boolean isChanged() {
+		return true;
+	}
+
+	// ----- inner class : ConsumerRunnable ------------------------------------
+
+	private class ConsumerRunnable implements Runnable {
+
+		private final EntityNotificationConsumer consumer;
+
+		private ConsumerRunnable(EntityNotificationConsumer consumer) {
+			this.consumer = consumer;
+		}
+
+
+		// ----- Runnable --------------------------------------------------------
+
+		@Override
+		public void run() {
+			while (consumer.hasNext()) {
+				try {
+					EntityNotification notification = consumer.next();
+					if (notification != null) {
+						printNotification(notification);
+						ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification, properties);
+						if (serviceTags == null) {
+							LOG.error("Failed to map Atlas notification to ServiceTags structure");
+						} else {
+							if (LOG.isDebugEnabled()) {
+								String serviceTagsJSON = new Gson().toJson(serviceTags);
+								LOG.debug("Atlas notification mapped to serviceTags=" + serviceTagsJSON);
+							}
+
+							try {
+								tagSink.uploadServiceTags(serviceTags);
+							} catch (Exception exception) {
+								LOG.error("uploadServiceTags() failed..", exception);
+							}
+						}
+					}
+				} catch(Exception e){
+					LOG.error("Exception encountered when processing notification:", e);
+				}
+			}
+		}
+
+		public void printNotification(EntityNotification notification) {
+			Entity entity = notification.getEntity();
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Notification-Type: " + notification.getOperationType().name());
+				LOG.debug("Entity-Id: " + entity.getId().getGuid());
+				LOG.debug("Entity-Type: " + entity.getTypeName());
+
+				LOG.debug("----------- Entity Values ----------");
+
+
+				for (Map.Entry<String, Object> entry : entity.getValues().entrySet()) {
+					LOG.debug("		Name:" + entry.getKey());
+					Object value = entry.getValue();
+					LOG.debug("		Value:" + value);
+				}
+
+				LOG.debug("----------- Entity Traits ----------");
+
+
+				for (Map.Entry<String, ? extends Trait> entry : entity.getTraits().entrySet()) {
+					LOG.debug("			Trait-Name:" + entry.getKey());
+					Trait trait = entry.getValue();
+					LOG.debug("			Trait-Type:" + trait.getTypeName());
+					Map<String, Object> traitValues = trait.getValues();
+					for (Map.Entry<String, Object> valueEntry : traitValues.entrySet()) {
+						LOG.debug("				Trait-Value-Name:" + valueEntry.getKey());
+						LOG.debug("				Trait-Value:" + valueEntry.getValue());
+					}
+				}
+
+			}
+		}
+
+	}
+
+	public void printAllEntities() {
+		try {
+			new AtlasUtility().getAllEntities();
+		}
+		catch(java.io.IOException ioException) {
+			LOG.error("Caught IOException while retrieving Atlas Entities:", ioException);
+		}
+	}
+
+	// update the set of entities with current from Atlas
+	public void refreshAllEntities() {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagAtlasSource.refreshAllEntities()");
+		}
+		AtlasUtility atlasUtility = new AtlasUtility();
+
+		try {
+			entities.putAll(atlasUtility.getAllEntities());
+		} catch (IOException e) {
+			LOG.error("getAllEntities() failed", e);
+		}
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagAtlasSource.refreshAllEntities()");
+		}
+	}
+
+	// Inner class AtlasUtil
+
+	/**
+	 * Atlas utility.
+	 */
+	@SuppressWarnings("unchecked")
+	private class AtlasUtility {
+
+		/**
+		 * Atlas APIs
+		 */
+		public static final String API_ATLAS_TYPES    = "api/atlas/types";
+		public static final String API_ATLAS_ENTITIES = "api/atlas/entities?type=";
+		public static final String API_ATLAS_ENTITY   = "api/atlas/entities/";
+		public static final String API_ATLAS_TYPE     = "api/atlas/types/";
+
+		/**
+		 * API Response Attributes
+		 */
+		public static final String RESULTS_ATTRIBUTE               = "results";
+		public static final String DEFINITION_ATTRIBUTE            = "definition";
+		public static final String VALUES_ATTRIBUTE                = "values";
+		public static final String TRAITS_ATTRIBUTE                = "traits";
+		public static final String TYPE_NAME_ATTRIBUTE             = "typeName";
+		public static final String TRAIT_TYPES_ATTRIBUTE           = "traitTypes";
+		public static final String SUPER_TYPES_ATTRIBUTE           = "superTypes";
+		public static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = "attributeDefinitions";
+		public static final String NAME_ATTRIBUTE                  = "name";
+
+		private Type mapType = new TypeToken<Map<String, Object>>(){}.getType();
+
+		private RangerRESTClient restClient;
+
+
+		// ----- Constructors ------------------------------------------------------
+
+		/**
+		 * Construct an AtlasUtility
+		 *
+		 */
+		public AtlasUtility() {
+
+			String url               = TagSyncConfig.getAtlasEndpoint(properties);
+			String sslConfigFileName = TagSyncConfig.getAtlasSslConfigFileName(properties);
+
+
+			if(LOG.isDebugEnabled()) {
+				LOG.debug("Initializing RangerRestClient with (url=" + url + ", sslConfigFileName" + sslConfigFileName + ")");
+			}
+
+			restClient = new RangerRESTClient(url, sslConfigFileName);
+
+			if(LOG.isDebugEnabled()) {
+				LOG.debug("Initialized RangerRestClient with (url=" + url + ", sslConfigFileName=" + sslConfigFileName + ")");
+			}
+		}
+
+
+		// ----- AtlasUtility ------------------------------------------------------
+
+		/**
+		 * Get all of the entities defined in Atlas.
+		 *
+		 * @return  a mapping of GUIDs to Atlas entities
+		 *
+		 * @throws IOException if there is an error communicating with Atlas
+		 */
+		public Map<String, Entity> getAllEntities() throws IOException {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("==> TagAtlasSource.getAllEntities()");
+			}
+			Map<String, Entity> entities = new LinkedHashMap<>();
+
+			Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES);
+
+			List<String> types = getAttribute(typesResponse, RESULTS_ATTRIBUTE, List.class);
+
+			for (String type : types) {
+
+				Map<String, Object> entitiesResponse = atlasAPI(API_ATLAS_ENTITIES + type);
+
+				List<String> guids = getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class);
+
+				for (String guid : guids) {
+
+					if (StringUtils.isNotBlank(guid)) {
+
+						Map<Trait, Map<String, ? extends Trait>> traitSuperTypes = new HashMap<>();
+
+						Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid);
+
+						if (entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+							String definitionJSON = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, String.class);
+
+							LOG.info("{");
+							LOG.info("	\"entity-id\":" + guid + ",");
+							LOG.info("	\"entity-definition\":" + definitionJSON);
+							LOG.info("}");
+
+							Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType);
+
+							Map<String, Object> values = getAttribute(definition, VALUES_ATTRIBUTE, Map.class);
+							Map<String, Object> traits = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
+							String typeName = getAttribute(definition, TYPE_NAME_ATTRIBUTE, String.class);
+
+							LOG.info("Received entity(typeName=" + typeName + ", id=" + guid + ")");
+
+
+							Map<String, TraitImpl> traitMap = new HashMap<>();
+
+							if (MapUtils.isNotEmpty(traits)) {
+
+								LOG.info("Traits for entity(typeName=" + typeName + ", id=" + guid + ") ------ ");
+
+								for (Map.Entry<String, Object> entry : traits.entrySet()) {
+
+									Map<String, Object> trait = (Map<String, Object>) entry.getValue();
+
+									Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
+									String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
+
+									Map<String, TraitImpl> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
+
+									TraitImpl trait1 = new TraitImpl(traitTypeName, traitValues, superTypes);
+
+									traitSuperTypes.put(trait1, superTypes);
+
+									traitMap.put(entry.getKey(), trait1);
+
+
+									LOG.info("			Trait(typeName=" + traitTypeName + ")");
+
+								}
+							} else {
+								LOG.info("No traits for entity(typeName=" + typeName + ", id=" + guid + ")");
+							}
+							EntityImpl entity = new EntityImpl(new IdImpl(guid, 0), typeName, values, traitMap);
+
+							showEntity(entity);
+
+							entities.put(guid, entity);
+
+						}
+					}
+				}
+			}
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("==> TagAtlasSource.getAllEntities()");
+			}
+			return entities;
+		}
+
+
+		// ----- helper methods ----------------------------------------------------
+
+		private Map<String, Object> getTraitType(String traitName)
+				throws IOException {
+
+			Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName);
+
+			if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+				String definitionJSON = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, String.class);
+
+				Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType);
+
+				List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class);
+
+				if (traitTypes.size() > 0) {
+					return (Map<String, Object>) traitTypes.get(0);
+				}
+			}
+			return null;
+		}
+
+		private Map<String, TraitImpl> getTraitSuperTypes(Map<String, Object> traitType, Map<String, Object> values)
+				throws IOException {
+
+			Map<String, TraitImpl> superTypes = new HashMap<>();
+
+			if (traitType != null) {
+
+				List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class);
+
+				for (String superTypeName : superTypeNames) {
+
+					Map<String, Object> superTraitType = getTraitType(superTypeName);
+
+					if (superTraitType != null) {
+						List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
+
+						Map<String, Object> superTypeValues = new HashMap<>();
+						for (Map<String, Object> attributeDefinition : attributeDefinitions) {
+
+							String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString();
+							if (values.containsKey(attributeName)) {
+								superTypeValues.put(attributeName, values.get(attributeName));
+							}
+						}
+
+						superTypes.put(superTypeName,
+								//new TraitImpl(getTraitSuperTypes(superTraitType, superTypeValues), superTypeValues, superTypeName));
+								new TraitImpl(superTypeName, superTypeValues, getTraitSuperTypes(superTraitType, superTypeValues)));
+					}
+				}
+			}
+			return superTypes;
+		}
+
+		/*
+		private Map<String, Object> atlasAPI(String endpoint) throws IOException {
+			InputStream in  = streamProvider.readFrom(atlasEndpoint + endpoint, "GET", (String) null, Collections.<String, String>emptyMap());
+			return new Gson().fromJson(IOUtils.toString(in, "UTF-8"), mapType);
+		}
+		*/
+
+		private Map<String, Object> atlasAPI(String endpoint)  {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("==> TagAtlasSource.atlasAPI(" + endpoint +")");
+			}
+			// Create a REST client and perform a get on it
+			Map<String, Object> ret = new HashMap<String, Object>();
+
+			WebResource webResource = restClient.getResource(endpoint);
+
+			ClientResponse response = webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class);
+
+			if(response != null && response.getStatus() == 200) {
+				ret = response.getEntity(ret.getClass());
+			} else {
+				LOG.error("Atlas REST call returned with response={" + response +"}");
+
+				RESTResponse resp = RESTResponse.fromClientResponse(response);
+				LOG.error("Error getting Atlas Entity. request=" + webResource.toString()
+						+ ", response=" + resp.toString());
+			}
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("<== TagAtlasSource.atlasAPI(" + endpoint + ")");
+			}
+			return ret;
+		}
+
+		private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) {
+			return type.cast(map.get(name));
+		}
+
+
+
+		public void showEntity(Entity entity) {
+
+			LOG.debug("Entity-id	:" + entity.getId());
+
+			LOG.debug("Type:		" + entity.getTypeName());
+
+			LOG.debug("----- Values -----");
+
+			for (Map.Entry<String, Object> entry : entity.getValues().entrySet()) {
+				LOG.debug("		Name:	" + entry.getKey() + "");
+				Object value = entry.getValue();
+				LOG.debug("		Value:	" + getValue(value, entities.keySet()));
+			}
+
+			LOG.debug("----- Traits -----");
+
+			for (String traitName : entity.getTraits().keySet()) {
+				LOG.debug("		Name:" + entity.getId() + ", trait=" + traitName + ">" + traitName);
+			}
+
+		}
+
+		public void showTrait(Entity entity, String traitId) {
+
+			String[] traitNames = traitId.split(",");
+
+			Trait trait = entity.getTraits().get(traitNames[0]);
+
+			for (int i = 1; i < traitNames.length; ++i ) {
+				trait = trait.getSuperTypes().get(traitNames[i]);
+			}
+
+			String typeName = trait.getTypeName();
+
+			LOG.debug("Trait " + typeName + " for Entity id=" + entity.getId());
+
+			LOG.debug("Type: " + typeName);
+
+			LOG.debug("----- Values ------");
+
+			for (Map.Entry<String, Object> entry : trait.getValues().entrySet()) {
+				LOG.debug("Name:" + entry.getKey());
+				Object value = entry.getValue();
+				LOG.debug("Value:" + getValue(value, entities.keySet()));
+			}
+
+			LOG.debug("Super Traits");
+
+
+			for (String traitName : trait.getSuperTypes().keySet()) {
+				LOG.debug("Name=" + entity.getId() + "&trait=" + traitId + "," + traitName + ">" + traitName);
+			}
+		}
+
+		// resolve the given value if necessary
+		private String getValue(Object value, Set<String> ids) {
+			if (value == null) {
+				return "";
+			}
+			String idString = getIdValue(value, ids);
+			if (idString != null) {
+				return idString;
+			}
+
+			idString = getIdListValue(value, ids);
+			if (idString != null) {
+				return idString;
+			}
+
+			return value.toString();
+		}
+		// get an id from the given value; return null if the value is not an id type
+		private String getIdValue(Object value, Set<String> ids) {
+			if (value instanceof Map) {
+				Map map = (Map) value;
+				if (map.size() == 3 && map.containsKey("id")){
+					String id = map.get("id").toString();
+					if (ids.contains(id)) {
+						return id;
+					}
+				}
+			}
+			return null;
+		}
+		// get an id list from the given value; return null if the value is not an id list type
+		private String getIdListValue(Object value, Set<String> ids) {
+			if (value instanceof List) {
+				List list = (List) value;
+				if (list.size() > 0) {
+					StringBuilder sb = new StringBuilder();
+					for (Object o : list) {
+						String idString = getIdValue(o, ids);
+						if (idString == null) {
+							return value.toString();
+						}
+						if (sb.length() > 0) {
+							sb.append(", ");
+						}
+						sb.append(idString);
+					}
+					return sb.toString();
+				}
+			}
+			return null;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
new file mode 100644
index 0000000..925a712
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.file;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.tagsync.model.TagSink;
+import org.apache.ranger.tagsync.model.TagSource;
+import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.io.*;
+import java.util.Date;
+import java.util.Properties;
+
+public class TagFileSource implements TagSource, Runnable {
+	private static final Log LOG = LogFactory.getLog(TagFileSource.class);
+
+	private String sourceFileName;
+	private long lastModifiedTimeInMillis = 0L;
+
+	private Gson gson;
+	private TagSink tagSink;
+	private Properties properties;
+
+	@Override
+	public boolean initialize(Properties properties) {
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagFileSource.initialize()");
+		}
+
+		if (properties == null || MapUtils.isEmpty(properties)) {
+			LOG.error("No properties specified for TagFileSource initialization");
+			this.properties = new Properties();
+		} else {
+			this.properties = properties;
+		}
+
+		boolean ret = true;
+
+		if (ret) {
+
+			sourceFileName = TagSyncConfig.getTagSourceFileName(properties);
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Provided sourceFileName=" + sourceFileName);
+			}
+
+			String realFileName = TagSyncConfig.getResourceFileName(sourceFileName);
+			if (realFileName != null) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Real sourceFileName=" + realFileName);
+				}
+				sourceFileName = realFileName;
+			} else {
+				LOG.error(sourceFileName + " is not a file or is not readable");
+				ret = false;
+			}
+		}
+
+		if (ret) {
+			try {
+				gson = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
+			} catch (Throwable excp) {
+				LOG.fatal("failed to create GsonBuilder object", excp);
+				ret = false;
+			}
+		}
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagFileSource.initialize(): sourceFileName=" + sourceFileName + ", result=" + ret);
+		}
+
+		return ret;
+	}
+
+	@Override
+	public void setTagSink(TagSink sink) {
+		if (sink == null) {
+			LOG.error("Sink is null!!!");
+		} else {
+			this.tagSink = sink;
+		}
+	}
+
+	@Override
+	public Thread start() {
+
+		Thread fileMonitoringThread = null;
+
+		fileMonitoringThread = new Thread(this);
+		fileMonitoringThread.setDaemon(true);
+		fileMonitoringThread.start();
+
+		return fileMonitoringThread;
+	}
+
+	@Override
+	public void run() {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagFileSource.run()");
+		}
+		long sleepTimeBetweenCycleInMillis = TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties);
+		boolean shutdownFlag = false;
+
+		while (!shutdownFlag) {
+
+			try {
+				if (isChanged()) {
+					LOG.info("Begin: update tags from source==>sink");
+					if (TagSyncConfig.isTagSyncEnabled(properties)) {
+						updateSink();
+						LOG.info("End: update tags from source==>sink");
+					} else {
+						LOG.info("Tag-sync is not enabled.");
+					}
+				} else {
+					LOG.debug("TagFileSource: no change found for synchronization.");
+				}
+
+				LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds");
+
+				Thread.sleep(sleepTimeBetweenCycleInMillis);
+			}
+			catch (InterruptedException e) {
+				LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before attempting to synchronize tag information", e);
+				shutdownFlag = true;
+			}
+			catch (Throwable t) {
+				LOG.error("tag-sync thread got an error", t);
+			}
+		}
+
+		LOG.info("Shutting down the Tag-file-source thread");
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagFileSource.run()");
+		}
+	}
+
+	@Override
+	public void updateSink() throws Exception {
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagFileSource.updateSink()");
+		}
+		ServiceTags serviceTags = readFromFile();
+
+		if (serviceTags != null) {
+			tagSink.uploadServiceTags(serviceTags);
+		} else {
+			LOG.error("Could not read ServiceTags from file");
+		}
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagFileSource.updateSink()");
+		}
+	}
+
+	@Override
+	public 	boolean isChanged() {
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagFileSource.isChanged()");
+		}
+		boolean ret = false;
+
+		long modificationTime = getModificationTime();
+
+		if (modificationTime > lastModifiedTimeInMillis) {
+			if (LOG.isDebugEnabled()) {
+				Date modifiedDate = new Date(modificationTime);
+				Date lastModifiedDate = new Date(lastModifiedTimeInMillis);
+				LOG.debug("File modified at " + modifiedDate + "last-modified at " + lastModifiedDate);
+			}
+			lastModifiedTimeInMillis = modificationTime;
+			ret = true;
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagFileSource.isChanged(): result=" + ret);
+		}
+		return ret;
+	}
+
+	private ServiceTags readFromFile() {
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagFileSource.readFromFile(): sourceFileName=" + sourceFileName);
+		}
+
+		ServiceTags ret = null;
+
+		Reader reader = null;
+		try {
+
+			reader = new InputStreamReader(TagSyncConfig.getFileInputStream(sourceFileName));
+
+			ret = gson.fromJson(reader, ServiceTags.class);
+
+		}
+		catch (FileNotFoundException exception) {
+			LOG.warn("Tag-source file does not exist or not readble '" + sourceFileName + "'");
+		}
+		catch (Exception excp) {
+			LOG.error("failed to load service-tags from Tag-source file " + sourceFileName, excp);
+		}
+		finally {
+			if (reader != null) {
+				try {
+					reader.close();
+				} catch (Exception excp) {
+					LOG.error("error while closing opened Tag-source file " + sourceFileName, excp);
+				}
+			}
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagFileSource.readFromFile(): sourceFileName=" + sourceFileName);
+		}
+
+		return ret;
+	}
+
+	private long getModificationTime() {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagFileSource.getLastModificationTime(): sourceFileName=" + sourceFileName);
+		}
+		long ret = 0L;
+
+		File sourceFile = new File(sourceFileName);
+
+		if (sourceFile.exists() && sourceFile.isFile() && sourceFile.canRead()) {
+			ret = sourceFile.lastModified();
+		} else {
+			ret = new Date().getTime();
+		}
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagFileSource.lastModificationTime(): sourceFileName=" + sourceFileName + " result=" + new Date(ret));
+		}
+
+		return ret;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/tagsync/src/main/resources/application.properties b/tagsync/src/main/resources/application.properties
deleted file mode 100644
index 7c874b6..0000000
--- a/tagsync/src/main/resources/application.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-# This file is used currently to satisfy needs of Injection of EntityChangeConsumer and its
-# initialization.
-#
-# Basic configuration required to create EntityChangeConsumer
-#
-atlas.notification.kafka.bootstrap.servers=ranger-tag-policy-akulkarni-1:6667
-atlas.notification.kafka.zookeeper.connect=ranger-tag-policy-akulkarni-1:2181
-
-#
-# These properties seem to be internal to Atlas. They probably are used for generating notifications.
-atlas.notification.embedded=false
-atlas.notification.kafka.acks=1
-atlas.notification.kafka.data=${sys:atlas.home}/data/kafka

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/resources/ranger-tagsync-default.xml
----------------------------------------------------------------------
diff --git a/tagsync/src/main/resources/ranger-tagsync-default.xml b/tagsync/src/main/resources/ranger-tagsync-default.xml
index fabe04e..5f754f9 100644
--- a/tagsync/src/main/resources/ranger-tagsync-default.xml
+++ b/tagsync/src/main/resources/ranger-tagsync-default.xml
@@ -18,14 +18,6 @@
 
 <configuration>
 	<property>
-		<name>ranger.tagsync.port</name>
-		<value>6161</value>
-	</property>
-	<property>
-		<name>ranger.tagsync.ssl</name>
-		<value>true</value>
-	</property>
-	<property>
 		<name>ranger.tagsync.enabled</name>
 		<value>true</value>
 	</property>
@@ -34,11 +26,6 @@
 		<value>./log</value>
 	</property>
 	<property>
-		<name>ranger.authentication.method</name>
-		<value>NONE</value>
-		<description></description>
-	</property>
-	<property>
 		<name>ranger.tagsync.tagadmin.rest.url</name>
 		<value>http://localhost:6080</value>
 		<description></description>
@@ -49,12 +36,12 @@
 		<description></description>
 	</property>
 	<property>
-		<name>ranger.tagsync.policymanager.basicauth.username</name>
+		<name>ranger.tagsync.tagadmin.basicauth.username</name>
 		<value>admin</value>
 		<description></description>
 	</property>
 	<property>
-		<name>ranger.tagsync.policymanager.basicauth.password</name>
+		<name>ranger.tagsync.tagadmin.basicauth.password</name>
 		<value>admin</value>
 		<description></description>
 	</property>
@@ -64,28 +51,28 @@
 		<description></description>
 	</property>
 	<property>
-		<name>ranger.tagsync.source.file</name>
+		<name>ranger.tagsync.filesource.filename</name>
 		<value>/etc/ranger/data/tags.json</value>
 		<description></description>
 	</property>
 	<property>
 		<name>ranger.tagsync.source.impl.class</name>
-		<value>org.apache.ranger.source.file.TagFileSource</value>
+		<value>file</value>
 		<description></description>
 	</property>
 	<property>
 		<name>ranger.tagsync.sink.impl.class</name>
-		<value>org.apache.ranger.sink.policymgr.TagRESTSink</value>
+		<value>tagadmin</value>
 		<description></description>
 	</property>
 	<property>
-		<name>atlas.endpoint</name>
+		<name>ranger.tagsync.atlassource.endpoint</name>
 		<value>http://localhost:21000/</value>
 		<description></description>
 	</property>
-        <property>
-                <name>ranger.tagsync.atlas.hive.instance.c1.ranger.service</name>
-                <value>cl1_hive</value>
-                <description></description>
-        </property>
+	<property>
+		<name>ranger.tagsync.atlas.hive.instance.c1.ranger.service</name>
+		<value>cl1_hive</value>
+		<description></description>
+	</property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/test/java/org/apache/ranger/process/TestTagSynchronizer.java
----------------------------------------------------------------------
diff --git a/tagsync/src/test/java/org/apache/ranger/process/TestTagSynchronizer.java b/tagsync/src/test/java/org/apache/ranger/process/TestTagSynchronizer.java
deleted file mode 100644
index e693696..0000000
--- a/tagsync/src/test/java/org/apache/ranger/process/TestTagSynchronizer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.process;
-
-
-import org.apache.ranger.model.TagSource;
-import org.apache.ranger.source.atlas.TagAtlasSource;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.*;
-import java.util.Properties;
-
-import static org.junit.Assert.*;
-
-
-public class TestTagSynchronizer {
-
-	private static TagSynchronizer tagSynchronizer;
-
-	@BeforeClass
-	public static void setUpBeforeClass() throws Exception {
-		System.out.println("setUpBeforeClass() called");
-
-		TagSyncConfig config = TagSyncConfig.getInstance();
-
-		TagSyncConfig.dumpConfiguration(config, new BufferedWriter(new OutputStreamWriter(System.out)));
-
-		Properties props = config.getProperties();
-
-		tagSynchronizer = new TagSynchronizer(props);
-
-	}
-
-	@AfterClass
-	public static void tearDownAfterClass() throws Exception {
-		System.out.println("tearDownAfterClass() called");
-
-	}
-
-	@Test
-	public void testTagSynchronizer() {
-
-		System.out.println("testTagSynchronizer() called");
-
-		//tagSynchronizer.run();
-
-		tagSynchronizer.shutdown("From testTagSynchronizer: time=up");
-
-		System.out.println("Exiting test");
-
-
-	}
-
-	@Test
-	public void testTagDownload() {
-
-		boolean initDone = tagSynchronizer.initLoop();
-
-		System.out.println("TagSynchronizer initialization result=" + initDone);
-
-		/*
-		TagSource tagSource = tagSynchronizer.getTagSource();
-
-		try {
-			TagAtlasSource tagAtlasSource = (TagAtlasSource) tagSource;
-			//tagAtlasSource.printAllEntities();
-		} catch (ClassCastException exception) {
-			System.err.println("TagSource is not of TagAtlasSource");
-		}
-		*/
-
-		System.out.println("Exiting testTagDownload()");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
----------------------------------------------------------------------
diff --git a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
new file mode 100644
index 0000000..10be4e6
--- /dev/null
+++ b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.process;
+
+
+import org.apache.ranger.tagsync.model.TagSource;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+import org.apache.ranger.tagsync.process.TagSynchronizer;
+import org.apache.ranger.tagsync.source.atlas.TagAtlasSource;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+
+
+public class TestTagSynchronizer {
+
+	private static TagSynchronizer tagSynchronizer;
+
+	@BeforeClass
+	public static void setUpBeforeClass() throws Exception {
+		System.out.println("setUpBeforeClass() called");
+
+		TagSyncConfig config = TagSyncConfig.getInstance();
+
+		TagSyncConfig.dumpConfiguration(config, new BufferedWriter(new OutputStreamWriter(System.out)));
+
+		Properties props = config.getProperties();
+
+		tagSynchronizer = new TagSynchronizer(props);
+
+	}
+
+	@AfterClass
+	public static void tearDownAfterClass() throws Exception {
+		System.out.println("tearDownAfterClass() called");
+
+	}
+
+	@Test
+	public void testTagSynchronizer() {
+
+		System.out.println("testTagSynchronizer() called");
+
+		//tagSynchronizer.run();
+
+		tagSynchronizer.shutdown("From testTagSynchronizer: time=up");
+
+		System.out.println("Exiting test");
+
+
+	}
+
+	@Test
+	public void testTagDownload() {
+
+		boolean initDone = tagSynchronizer.initLoop();
+
+		System.out.println("TagSynchronizer initialization result=" + initDone);
+
+		/*
+		TagSource tagSource = tagSynchronizer.getTagSource();
+
+		try {
+			TagAtlasSource tagAtlasSource = (TagAtlasSource) tagSource;
+			//tagAtlasSource.printAllEntities();
+		} catch (ClassCastException exception) {
+			System.err.println("TagSource is not of TagAtlasSource");
+		}
+		*/
+
+		System.out.println("Exiting testTagDownload()");
+	}
+}