You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2015/10/28 17:00:04 UTC
[28/50] [abbrv] incubator-ranger git commit: RANGER-660: updates to
setup/install ranger-tagsync
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
new file mode 100644
index 0000000..243aee5
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
@@ -0,0 +1,589 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.atlas.typesystem.EntityImpl;
+import org.apache.atlas.typesystem.IdImpl;
+import org.apache.atlas.typesystem.TraitImpl;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.notification.entity.EntityNotification;
+import org.apache.atlas.notification.entity.EntityNotificationConsumer;
+import org.apache.atlas.notification.entity.EntityNotificationConsumerProvider;
+import org.apache.atlas.typesystem.api.Entity;
+import org.apache.atlas.typesystem.api.Trait;
+import org.apache.ranger.admin.client.datatype.RESTResponse;
+import org.apache.ranger.tagsync.model.TagSink;
+import org.apache.ranger.tagsync.model.TagSource;
+import org.apache.ranger.plugin.util.RangerRESTClient;
+import org.apache.ranger.plugin.util.RangerRESTUtils;
+import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class TagAtlasSource implements TagSource {
+ private static final Log LOG = LogFactory.getLog(TagAtlasSource.class);
+
+
+ private final Map<String, Entity> entities = new LinkedHashMap<>();
+ private TagSink tagSink;
+ private Properties properties;
+ private ConsumerRunnable consumerTask;
+
+ @Override
+ public boolean initialize(Properties properties) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> TagAtlasSource.initialize()");
+ }
+
+ boolean ret = true;
+
+ if (properties == null || MapUtils.isEmpty(properties)) {
+ LOG.error("No properties specified for TagFileSource initialization");
+ this.properties = new Properties();
+ } else {
+ this.properties = properties;
+ }
+
+
+ NotificationModule notificationModule = new NotificationModule();
+
+ Injector injector = Guice.createInjector(notificationModule);
+
+ EntityNotificationConsumerProvider consumerProvider = injector.getInstance(EntityNotificationConsumerProvider.class);
+
+ consumerTask = new ConsumerRunnable(consumerProvider.get());
+
+ //ExecutorService executorService = Executors.newFixedThreadPool(1);
+
+ //executorService.submit(new ConsumerRunnable(consumerProvider.get()));
+
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== TagAtlasSource.initialize(), result=" + ret);
+ }
+ return ret;
+ }
+
+ @Override
+ public void setTagSink(TagSink sink) {
+ if (sink == null) {
+ LOG.error("Sink is null!!!");
+ } else {
+ this.tagSink = sink;
+ }
+ }
+
+ @Override
+ public Thread start() {
+ Thread consumerThread = null;
+ if (consumerTask == null) {
+ LOG.error("No consumerTask!!!");
+ } else {
+ consumerThread = new Thread(consumerTask);
+ consumerThread.setDaemon(true);
+ consumerThread.start();
+ }
+ return consumerThread;
+ }
+
+ @Override
+ public void updateSink() throws Exception {
+ }
+
+ @Override
+ public boolean isChanged() {
+ return true;
+ }
+
+ // ----- inner class : ConsumerRunnable ------------------------------------
+
+ private class ConsumerRunnable implements Runnable {
+
+ private final EntityNotificationConsumer consumer;
+
+ private ConsumerRunnable(EntityNotificationConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+
+ // ----- Runnable --------------------------------------------------------
+
+ @Override
+ public void run() {
+ while (consumer.hasNext()) {
+ try {
+ EntityNotification notification = consumer.next();
+ if (notification != null) {
+ printNotification(notification);
+ ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification, properties);
+ if (serviceTags == null) {
+ LOG.error("Failed to map Atlas notification to ServiceTags structure");
+ } else {
+ if (LOG.isDebugEnabled()) {
+ String serviceTagsJSON = new Gson().toJson(serviceTags);
+ LOG.debug("Atlas notification mapped to serviceTags=" + serviceTagsJSON);
+ }
+
+ try {
+ tagSink.uploadServiceTags(serviceTags);
+ } catch (Exception exception) {
+ LOG.error("uploadServiceTags() failed..", exception);
+ }
+ }
+ }
+ } catch(Exception e){
+ LOG.error("Exception encountered when processing notification:", e);
+ }
+ }
+ }
+
+ public void printNotification(EntityNotification notification) {
+ Entity entity = notification.getEntity();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Notification-Type: " + notification.getOperationType().name());
+ LOG.debug("Entity-Id: " + entity.getId().getGuid());
+ LOG.debug("Entity-Type: " + entity.getTypeName());
+
+ LOG.debug("----------- Entity Values ----------");
+
+
+ for (Map.Entry<String, Object> entry : entity.getValues().entrySet()) {
+ LOG.debug(" Name:" + entry.getKey());
+ Object value = entry.getValue();
+ LOG.debug(" Value:" + value);
+ }
+
+ LOG.debug("----------- Entity Traits ----------");
+
+
+ for (Map.Entry<String, ? extends Trait> entry : entity.getTraits().entrySet()) {
+ LOG.debug(" Trait-Name:" + entry.getKey());
+ Trait trait = entry.getValue();
+ LOG.debug(" Trait-Type:" + trait.getTypeName());
+ Map<String, Object> traitValues = trait.getValues();
+ for (Map.Entry<String, Object> valueEntry : traitValues.entrySet()) {
+ LOG.debug(" Trait-Value-Name:" + valueEntry.getKey());
+ LOG.debug(" Trait-Value:" + valueEntry.getValue());
+ }
+ }
+
+ }
+ }
+
+ }
+
+ public void printAllEntities() {
+ try {
+ new AtlasUtility().getAllEntities();
+ }
+ catch(java.io.IOException ioException) {
+ LOG.error("Caught IOException while retrieving Atlas Entities:", ioException);
+ }
+ }
+
+ // update the set of entities with current from Atlas
+ public void refreshAllEntities() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> TagAtlasSource.refreshAllEntities()");
+ }
+ AtlasUtility atlasUtility = new AtlasUtility();
+
+ try {
+ entities.putAll(atlasUtility.getAllEntities());
+ } catch (IOException e) {
+ LOG.error("getAllEntities() failed", e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== TagAtlasSource.refreshAllEntities()");
+ }
+ }
+
+ // Inner class AtlasUtil
+
+ /**
+ * Atlas utility.
+ */
+ @SuppressWarnings("unchecked")
+ private class AtlasUtility {
+
+ /**
+ * Atlas APIs
+ */
+ public static final String API_ATLAS_TYPES = "api/atlas/types";
+ public static final String API_ATLAS_ENTITIES = "api/atlas/entities?type=";
+ public static final String API_ATLAS_ENTITY = "api/atlas/entities/";
+ public static final String API_ATLAS_TYPE = "api/atlas/types/";
+
+ /**
+ * API Response Attributes
+ */
+ public static final String RESULTS_ATTRIBUTE = "results";
+ public static final String DEFINITION_ATTRIBUTE = "definition";
+ public static final String VALUES_ATTRIBUTE = "values";
+ public static final String TRAITS_ATTRIBUTE = "traits";
+ public static final String TYPE_NAME_ATTRIBUTE = "typeName";
+ public static final String TRAIT_TYPES_ATTRIBUTE = "traitTypes";
+ public static final String SUPER_TYPES_ATTRIBUTE = "superTypes";
+ public static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = "attributeDefinitions";
+ public static final String NAME_ATTRIBUTE = "name";
+
+ private Type mapType = new TypeToken<Map<String, Object>>(){}.getType();
+
+ private RangerRESTClient restClient;
+
+
+ // ----- Constructors ------------------------------------------------------
+
+ /**
+ * Construct an AtlasUtility
+ *
+ */
+ public AtlasUtility() {
+
+ String url = TagSyncConfig.getAtlasEndpoint(properties);
+ String sslConfigFileName = TagSyncConfig.getAtlasSslConfigFileName(properties);
+
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Initializing RangerRestClient with (url=" + url + ", sslConfigFileName" + sslConfigFileName + ")");
+ }
+
+ restClient = new RangerRESTClient(url, sslConfigFileName);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Initialized RangerRestClient with (url=" + url + ", sslConfigFileName=" + sslConfigFileName + ")");
+ }
+ }
+
+
+ // ----- AtlasUtility ------------------------------------------------------
+
+ /**
+ * Get all of the entities defined in Atlas.
+ *
+ * @return a mapping of GUIDs to Atlas entities
+ *
+ * @throws IOException if there is an error communicating with Atlas
+ */
+ public Map<String, Entity> getAllEntities() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> TagAtlasSource.getAllEntities()");
+ }
+ Map<String, Entity> entities = new LinkedHashMap<>();
+
+ Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES);
+
+ List<String> types = getAttribute(typesResponse, RESULTS_ATTRIBUTE, List.class);
+
+ for (String type : types) {
+
+ Map<String, Object> entitiesResponse = atlasAPI(API_ATLAS_ENTITIES + type);
+
+ List<String> guids = getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class);
+
+ for (String guid : guids) {
+
+ if (StringUtils.isNotBlank(guid)) {
+
+ Map<Trait, Map<String, ? extends Trait>> traitSuperTypes = new HashMap<>();
+
+ Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid);
+
+ if (entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+ String definitionJSON = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, String.class);
+
+ LOG.info("{");
+ LOG.info(" \"entity-id\":" + guid + ",");
+ LOG.info(" \"entity-definition\":" + definitionJSON);
+ LOG.info("}");
+
+ Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType);
+
+ Map<String, Object> values = getAttribute(definition, VALUES_ATTRIBUTE, Map.class);
+ Map<String, Object> traits = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
+ String typeName = getAttribute(definition, TYPE_NAME_ATTRIBUTE, String.class);
+
+ LOG.info("Received entity(typeName=" + typeName + ", id=" + guid + ")");
+
+
+ Map<String, TraitImpl> traitMap = new HashMap<>();
+
+ if (MapUtils.isNotEmpty(traits)) {
+
+ LOG.info("Traits for entity(typeName=" + typeName + ", id=" + guid + ") ------ ");
+
+ for (Map.Entry<String, Object> entry : traits.entrySet()) {
+
+ Map<String, Object> trait = (Map<String, Object>) entry.getValue();
+
+ Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
+ String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
+
+ Map<String, TraitImpl> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
+
+ TraitImpl trait1 = new TraitImpl(traitTypeName, traitValues, superTypes);
+
+ traitSuperTypes.put(trait1, superTypes);
+
+ traitMap.put(entry.getKey(), trait1);
+
+
+ LOG.info(" Trait(typeName=" + traitTypeName + ")");
+
+ }
+ } else {
+ LOG.info("No traits for entity(typeName=" + typeName + ", id=" + guid + ")");
+ }
+ EntityImpl entity = new EntityImpl(new IdImpl(guid, 0), typeName, values, traitMap);
+
+ showEntity(entity);
+
+ entities.put(guid, entity);
+
+ }
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> TagAtlasSource.getAllEntities()");
+ }
+ return entities;
+ }
+
+
+ // ----- helper methods ----------------------------------------------------
+
+ private Map<String, Object> getTraitType(String traitName)
+ throws IOException {
+
+ Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName);
+
+ if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
+ String definitionJSON = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, String.class);
+
+ Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType);
+
+ List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class);
+
+ if (traitTypes.size() > 0) {
+ return (Map<String, Object>) traitTypes.get(0);
+ }
+ }
+ return null;
+ }
+
+ private Map<String, TraitImpl> getTraitSuperTypes(Map<String, Object> traitType, Map<String, Object> values)
+ throws IOException {
+
+ Map<String, TraitImpl> superTypes = new HashMap<>();
+
+ if (traitType != null) {
+
+ List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class);
+
+ for (String superTypeName : superTypeNames) {
+
+ Map<String, Object> superTraitType = getTraitType(superTypeName);
+
+ if (superTraitType != null) {
+ List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
+
+ Map<String, Object> superTypeValues = new HashMap<>();
+ for (Map<String, Object> attributeDefinition : attributeDefinitions) {
+
+ String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString();
+ if (values.containsKey(attributeName)) {
+ superTypeValues.put(attributeName, values.get(attributeName));
+ }
+ }
+
+ superTypes.put(superTypeName,
+ //new TraitImpl(getTraitSuperTypes(superTraitType, superTypeValues), superTypeValues, superTypeName));
+ new TraitImpl(superTypeName, superTypeValues, getTraitSuperTypes(superTraitType, superTypeValues)));
+ }
+ }
+ }
+ return superTypes;
+ }
+
+ /*
+ private Map<String, Object> atlasAPI(String endpoint) throws IOException {
+ InputStream in = streamProvider.readFrom(atlasEndpoint + endpoint, "GET", (String) null, Collections.<String, String>emptyMap());
+ return new Gson().fromJson(IOUtils.toString(in, "UTF-8"), mapType);
+ }
+ */
+
+ private Map<String, Object> atlasAPI(String endpoint) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> TagAtlasSource.atlasAPI(" + endpoint +")");
+ }
+ // Create a REST client and perform a get on it
+ Map<String, Object> ret = new HashMap<String, Object>();
+
+ WebResource webResource = restClient.getResource(endpoint);
+
+ ClientResponse response = webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class);
+
+ if(response != null && response.getStatus() == 200) {
+ ret = response.getEntity(ret.getClass());
+ } else {
+ LOG.error("Atlas REST call returned with response={" + response +"}");
+
+ RESTResponse resp = RESTResponse.fromClientResponse(response);
+ LOG.error("Error getting Atlas Entity. request=" + webResource.toString()
+ + ", response=" + resp.toString());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== TagAtlasSource.atlasAPI(" + endpoint + ")");
+ }
+ return ret;
+ }
+
+ private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) {
+ return type.cast(map.get(name));
+ }
+
+
+
+ public void showEntity(Entity entity) {
+
+ LOG.debug("Entity-id :" + entity.getId());
+
+ LOG.debug("Type: " + entity.getTypeName());
+
+ LOG.debug("----- Values -----");
+
+ for (Map.Entry<String, Object> entry : entity.getValues().entrySet()) {
+ LOG.debug(" Name: " + entry.getKey() + "");
+ Object value = entry.getValue();
+ LOG.debug(" Value: " + getValue(value, entities.keySet()));
+ }
+
+ LOG.debug("----- Traits -----");
+
+ for (String traitName : entity.getTraits().keySet()) {
+ LOG.debug(" Name:" + entity.getId() + ", trait=" + traitName + ">" + traitName);
+ }
+
+ }
+
+ public void showTrait(Entity entity, String traitId) {
+
+ String[] traitNames = traitId.split(",");
+
+ Trait trait = entity.getTraits().get(traitNames[0]);
+
+ for (int i = 1; i < traitNames.length; ++i ) {
+ trait = trait.getSuperTypes().get(traitNames[i]);
+ }
+
+ String typeName = trait.getTypeName();
+
+ LOG.debug("Trait " + typeName + " for Entity id=" + entity.getId());
+
+ LOG.debug("Type: " + typeName);
+
+ LOG.debug("----- Values ------");
+
+ for (Map.Entry<String, Object> entry : trait.getValues().entrySet()) {
+ LOG.debug("Name:" + entry.getKey());
+ Object value = entry.getValue();
+ LOG.debug("Value:" + getValue(value, entities.keySet()));
+ }
+
+ LOG.debug("Super Traits");
+
+
+ for (String traitName : trait.getSuperTypes().keySet()) {
+ LOG.debug("Name=" + entity.getId() + "&trait=" + traitId + "," + traitName + ">" + traitName);
+ }
+ }
+
+ // resolve the given value if necessary
+ private String getValue(Object value, Set<String> ids) {
+ if (value == null) {
+ return "";
+ }
+ String idString = getIdValue(value, ids);
+ if (idString != null) {
+ return idString;
+ }
+
+ idString = getIdListValue(value, ids);
+ if (idString != null) {
+ return idString;
+ }
+
+ return value.toString();
+ }
+ // get an id from the given value; return null if the value is not an id type
+ private String getIdValue(Object value, Set<String> ids) {
+ if (value instanceof Map) {
+ Map map = (Map) value;
+ if (map.size() == 3 && map.containsKey("id")){
+ String id = map.get("id").toString();
+ if (ids.contains(id)) {
+ return id;
+ }
+ }
+ }
+ return null;
+ }
+ // get an id list from the given value; return null if the value is not an id list type
+ private String getIdListValue(Object value, Set<String> ids) {
+ if (value instanceof List) {
+ List list = (List) value;
+ if (list.size() > 0) {
+ StringBuilder sb = new StringBuilder();
+ for (Object o : list) {
+ String idString = getIdValue(o, ids);
+ if (idString == null) {
+ return value.toString();
+ }
+ if (sb.length() > 0) {
+ sb.append(", ");
+ }
+ sb.append(idString);
+ }
+ return sb.toString();
+ }
+ }
+ return null;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
new file mode 100644
index 0000000..925a712
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/TagFileSource.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.file;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.tagsync.model.TagSink;
+import org.apache.ranger.tagsync.model.TagSource;
+import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+
+import java.io.*;
+import java.util.Date;
+import java.util.Properties;
+
+public class TagFileSource implements TagSource, Runnable {
+ private static final Log LOG = LogFactory.getLog(TagFileSource.class);
+
+ private String sourceFileName;
+ private long lastModifiedTimeInMillis = 0L;
+
+ private Gson gson;
+ private TagSink tagSink;
+ private Properties properties;
+
+ @Override
+ public boolean initialize(Properties properties) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> TagFileSource.initialize()");
+ }
+
+ if (properties == null || MapUtils.isEmpty(properties)) {
+ LOG.error("No properties specified for TagFileSource initialization");
+ this.properties = new Properties();
+ } else {
+ this.properties = properties;
+ }
+
+ boolean ret = true;
+
+ if (ret) {
+
+ sourceFileName = TagSyncConfig.getTagSourceFileName(properties);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Provided sourceFileName=" + sourceFileName);
+ }
+
+ String realFileName = TagSyncConfig.getResourceFileName(sourceFileName);
+ if (realFileName != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Real sourceFileName=" + realFileName);
+ }
+ sourceFileName = realFileName;
+ } else {
+ LOG.error(sourceFileName + " is not a file or is not readable");
+ ret = false;
+ }
+ }
+
+ if (ret) {
+ try {
+ gson = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
+ } catch (Throwable excp) {
+ LOG.fatal("failed to create GsonBuilder object", excp);
+ ret = false;
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== TagFileSource.initialize(): sourceFileName=" + sourceFileName + ", result=" + ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public void setTagSink(TagSink sink) {
+ if (sink == null) {
+ LOG.error("Sink is null!!!");
+ } else {
+ this.tagSink = sink;
+ }
+ }
+
+ @Override
+ public Thread start() {
+
+ Thread fileMonitoringThread = null;
+
+ fileMonitoringThread = new Thread(this);
+ fileMonitoringThread.setDaemon(true);
+ fileMonitoringThread.start();
+
+ return fileMonitoringThread;
+ }
+
+ @Override
+ public void run() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> TagFileSource.run()");
+ }
+ long sleepTimeBetweenCycleInMillis = TagSyncConfig.getSleepTimeInMillisBetweenCycle(properties);
+ boolean shutdownFlag = false;
+
+ while (!shutdownFlag) {
+
+ try {
+ if (isChanged()) {
+ LOG.info("Begin: update tags from source==>sink");
+ if (TagSyncConfig.isTagSyncEnabled(properties)) {
+ updateSink();
+ LOG.info("End: update tags from source==>sink");
+ } else {
+ LOG.info("Tag-sync is not enabled.");
+ }
+ } else {
+ LOG.debug("TagFileSource: no change found for synchronization.");
+ }
+
+ LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds");
+
+ Thread.sleep(sleepTimeBetweenCycleInMillis);
+ }
+ catch (InterruptedException e) {
+ LOG.error("Failed to wait for [" + sleepTimeBetweenCycleInMillis + "] milliseconds before attempting to synchronize tag information", e);
+ shutdownFlag = true;
+ }
+ catch (Throwable t) {
+ LOG.error("tag-sync thread got an error", t);
+ }
+ }
+
+ LOG.info("Shutting down the Tag-file-source thread");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== TagFileSource.run()");
+ }
+ }
+
+ @Override
+ public void updateSink() throws Exception {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> TagFileSource.updateSink()");
+ }
+ ServiceTags serviceTags = readFromFile();
+
+ if (serviceTags != null) {
+ tagSink.uploadServiceTags(serviceTags);
+ } else {
+ LOG.error("Could not read ServiceTags from file");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== TagFileSource.updateSink()");
+ }
+ }
+
+ @Override
+ public boolean isChanged() {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> TagFileSource.isChanged()");
+ }
+ boolean ret = false;
+
+ long modificationTime = getModificationTime();
+
+ if (modificationTime > lastModifiedTimeInMillis) {
+ if (LOG.isDebugEnabled()) {
+ Date modifiedDate = new Date(modificationTime);
+ Date lastModifiedDate = new Date(lastModifiedTimeInMillis);
+ LOG.debug("File modified at " + modifiedDate + "last-modified at " + lastModifiedDate);
+ }
+ lastModifiedTimeInMillis = modificationTime;
+ ret = true;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== TagFileSource.isChanged(): result=" + ret);
+ }
+ return ret;
+ }
+
+ private ServiceTags readFromFile() {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> TagFileSource.readFromFile(): sourceFileName=" + sourceFileName);
+ }
+
+ ServiceTags ret = null;
+
+ Reader reader = null;
+ try {
+
+ reader = new InputStreamReader(TagSyncConfig.getFileInputStream(sourceFileName));
+
+ ret = gson.fromJson(reader, ServiceTags.class);
+
+ }
+ catch (FileNotFoundException exception) {
+ LOG.warn("Tag-source file does not exist or not readble '" + sourceFileName + "'");
+ }
+ catch (Exception excp) {
+ LOG.error("failed to load service-tags from Tag-source file " + sourceFileName, excp);
+ }
+ finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (Exception excp) {
+ LOG.error("error while closing opened Tag-source file " + sourceFileName, excp);
+ }
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== TagFileSource.readFromFile(): sourceFileName=" + sourceFileName);
+ }
+
+ return ret;
+ }
+
+ private long getModificationTime() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> TagFileSource.getLastModificationTime(): sourceFileName=" + sourceFileName);
+ }
+ long ret = 0L;
+
+ File sourceFile = new File(sourceFileName);
+
+ if (sourceFile.exists() && sourceFile.isFile() && sourceFile.canRead()) {
+ ret = sourceFile.lastModified();
+ } else {
+ ret = new Date().getTime();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== TagFileSource.lastModificationTime(): sourceFileName=" + sourceFileName + " result=" + new Date(ret));
+ }
+
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/tagsync/src/main/resources/application.properties b/tagsync/src/main/resources/application.properties
deleted file mode 100644
index 7c874b6..0000000
--- a/tagsync/src/main/resources/application.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-# This file is used currently to satisfy needs of Injection of EntityChangeConsumer and its
-# initialization.
-#
-# Basic configuration required to create EntityChangeConsumer
-#
-atlas.notification.kafka.bootstrap.servers=ranger-tag-policy-akulkarni-1:6667
-atlas.notification.kafka.zookeeper.connect=ranger-tag-policy-akulkarni-1:2181
-
-#
-# These properties seem to be internal to Atlas. They probably are used for generating notifications.
-atlas.notification.embedded=false
-atlas.notification.kafka.acks=1
-atlas.notification.kafka.data=${sys:atlas.home}/data/kafka
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/main/resources/ranger-tagsync-default.xml
----------------------------------------------------------------------
diff --git a/tagsync/src/main/resources/ranger-tagsync-default.xml b/tagsync/src/main/resources/ranger-tagsync-default.xml
index fabe04e..5f754f9 100644
--- a/tagsync/src/main/resources/ranger-tagsync-default.xml
+++ b/tagsync/src/main/resources/ranger-tagsync-default.xml
@@ -18,14 +18,6 @@
<configuration>
<property>
- <name>ranger.tagsync.port</name>
- <value>6161</value>
- </property>
- <property>
- <name>ranger.tagsync.ssl</name>
- <value>true</value>
- </property>
- <property>
<name>ranger.tagsync.enabled</name>
<value>true</value>
</property>
@@ -34,11 +26,6 @@
<value>./log</value>
</property>
<property>
- <name>ranger.authentication.method</name>
- <value>NONE</value>
- <description></description>
- </property>
- <property>
<name>ranger.tagsync.tagadmin.rest.url</name>
<value>http://localhost:6080</value>
<description></description>
@@ -49,12 +36,12 @@
<description></description>
</property>
<property>
- <name>ranger.tagsync.policymanager.basicauth.username</name>
+ <name>ranger.tagsync.tagadmin.basicauth.username</name>
<value>admin</value>
<description></description>
</property>
<property>
- <name>ranger.tagsync.policymanager.basicauth.password</name>
+ <name>ranger.tagsync.tagadmin.basicauth.password</name>
<value>admin</value>
<description></description>
</property>
@@ -64,28 +51,28 @@
<description></description>
</property>
<property>
- <name>ranger.tagsync.source.file</name>
+ <name>ranger.tagsync.filesource.filename</name>
<value>/etc/ranger/data/tags.json</value>
<description></description>
</property>
<property>
<name>ranger.tagsync.source.impl.class</name>
- <value>org.apache.ranger.source.file.TagFileSource</value>
+ <value>file</value>
<description></description>
</property>
<property>
<name>ranger.tagsync.sink.impl.class</name>
- <value>org.apache.ranger.sink.policymgr.TagRESTSink</value>
+ <value>tagadmin</value>
<description></description>
</property>
<property>
- <name>atlas.endpoint</name>
+ <name>ranger.tagsync.atlassource.endpoint</name>
<value>http://localhost:21000/</value>
<description></description>
</property>
- <property>
- <name>ranger.tagsync.atlas.hive.instance.c1.ranger.service</name>
- <value>cl1_hive</value>
- <description></description>
- </property>
+ <property>
+ <name>ranger.tagsync.atlas.hive.instance.c1.ranger.service</name>
+ <value>cl1_hive</value>
+ <description></description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/test/java/org/apache/ranger/process/TestTagSynchronizer.java
----------------------------------------------------------------------
diff --git a/tagsync/src/test/java/org/apache/ranger/process/TestTagSynchronizer.java b/tagsync/src/test/java/org/apache/ranger/process/TestTagSynchronizer.java
deleted file mode 100644
index e693696..0000000
--- a/tagsync/src/test/java/org/apache/ranger/process/TestTagSynchronizer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.process;
-
-
-import org.apache.ranger.model.TagSource;
-import org.apache.ranger.source.atlas.TagAtlasSource;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.*;
-import java.util.Properties;
-
-import static org.junit.Assert.*;
-
-
-public class TestTagSynchronizer {
-
- private static TagSynchronizer tagSynchronizer;
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- System.out.println("setUpBeforeClass() called");
-
- TagSyncConfig config = TagSyncConfig.getInstance();
-
- TagSyncConfig.dumpConfiguration(config, new BufferedWriter(new OutputStreamWriter(System.out)));
-
- Properties props = config.getProperties();
-
- tagSynchronizer = new TagSynchronizer(props);
-
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- System.out.println("tearDownAfterClass() called");
-
- }
-
- @Test
- public void testTagSynchronizer() {
-
- System.out.println("testTagSynchronizer() called");
-
- //tagSynchronizer.run();
-
- tagSynchronizer.shutdown("From testTagSynchronizer: time=up");
-
- System.out.println("Exiting test");
-
-
- }
-
- @Test
- public void testTagDownload() {
-
- boolean initDone = tagSynchronizer.initLoop();
-
- System.out.println("TagSynchronizer initialization result=" + initDone);
-
- /*
- TagSource tagSource = tagSynchronizer.getTagSource();
-
- try {
- TagAtlasSource tagAtlasSource = (TagAtlasSource) tagSource;
- //tagAtlasSource.printAllEntities();
- } catch (ClassCastException exception) {
- System.err.println("TagSource is not of TagAtlasSource");
- }
- */
-
- System.out.println("Exiting testTagDownload()");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/44ddd597/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
----------------------------------------------------------------------
diff --git a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
new file mode 100644
index 0000000..10be4e6
--- /dev/null
+++ b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestTagSynchronizer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.process;
+
+
+import org.apache.ranger.tagsync.model.TagSource;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+import org.apache.ranger.tagsync.process.TagSynchronizer;
+import org.apache.ranger.tagsync.source.atlas.TagAtlasSource;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+
+
+public class TestTagSynchronizer {
+
+ private static TagSynchronizer tagSynchronizer;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ System.out.println("setUpBeforeClass() called");
+
+ TagSyncConfig config = TagSyncConfig.getInstance();
+
+ TagSyncConfig.dumpConfiguration(config, new BufferedWriter(new OutputStreamWriter(System.out)));
+
+ Properties props = config.getProperties();
+
+ tagSynchronizer = new TagSynchronizer(props);
+
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ System.out.println("tearDownAfterClass() called");
+
+ }
+
+ @Test
+ public void testTagSynchronizer() {
+
+ System.out.println("testTagSynchronizer() called");
+
+ //tagSynchronizer.run();
+
+ tagSynchronizer.shutdown("From testTagSynchronizer: time=up");
+
+ System.out.println("Exiting test");
+
+
+ }
+
+ @Test
+ public void testTagDownload() {
+
+ boolean initDone = tagSynchronizer.initLoop();
+
+ System.out.println("TagSynchronizer initialization result=" + initDone);
+
+ /*
+ TagSource tagSource = tagSynchronizer.getTagSource();
+
+ try {
+ TagAtlasSource tagAtlasSource = (TagAtlasSource) tagSource;
+ //tagAtlasSource.printAllEntities();
+ } catch (ClassCastException exception) {
+ System.err.println("TagSource is not of TagAtlasSource");
+ }
+ */
+
+ System.out.println("Exiting testTagDownload()");
+ }
+}