You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by su...@apache.org on 2016/08/04 06:12:13 UTC

incubator-atlas git commit: ATLAS-1089: fix Storm hook to handle cyclic references in topology object

Repository: incubator-atlas
Updated Branches:
  refs/heads/master 9eafb165a -> a2801f0ea


ATLAS-1089: fix Storm hook to handle cyclic references in topology object


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/a2801f0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/a2801f0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/a2801f0e

Branch: refs/heads/master
Commit: a2801f0eaff29a834915f243245e2d28b86a2bc3
Parents: 9eafb16
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Wed Aug 3 17:11:56 2016 -0700
Committer: Suma Shivaprasad <su...@gmail.com>
Committed: Wed Aug 3 23:11:40 2016 -0700

----------------------------------------------------------------------
 .../apache/atlas/storm/hook/StormAtlasHook.java |   6 +-
 .../atlas/storm/hook/StormTopologyUtil.java     | 129 ++++++++++---------
 release-log.txt                                 |   1 +
 3 files changed, 73 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2801f0e/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index c4b4976..5ed4d99 100644
--- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -179,7 +179,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
     private Referenceable createDataSet(String name, String topologyOwner,
                                               Serializable instance,
                                               Map stormConf, List<Referenceable> dependentEntities) throws IllegalAccessException {
-        Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true);
+        Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null);
 
         String clusterName = null;
         Referenceable dataSetReferenceable;
@@ -298,7 +298,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
                 stormSpout.get_spout_object().get_serialized_java(), Serializable.class);
         spoutReferenceable.set("driverClass", instance.getClass().getName());
 
-        Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true);
+        Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null);
         spoutReferenceable.set("conf", flatConfigMap);
 
         return spoutReferenceable;
@@ -322,7 +322,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
                 stormBolt.get_bolt_object().get_serialized_java(), Serializable.class);
         boltReferenceable.set("driverClass", instance.getClass().getName());
 
-        Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true);
+        Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null);
         boltReferenceable.set("conf", flatConfigMap);
 
         return boltReferenceable;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2801f0e/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
index ce6a175..dc2ae57 100644
--- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
+++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java
@@ -127,86 +127,95 @@ public final class StormTopologyUtil {
     }
 
     public static Map<String, String> getFieldValues(Object instance,
-                                                     boolean prependClassName)
+                                                     boolean prependClassName,
+                                                     Set<Object> objectsToSkip)
     throws IllegalAccessException {
-        Class clazz = instance.getClass();
+        if (objectsToSkip == null) {
+            objectsToSkip = new HashSet<Object>();
+        }
+
         Map<String, String> output = new HashMap<>();
-        for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
-            Field[] fields = c.getDeclaredFields();
-            for (Field field : fields) {
-                if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
-                    continue;
-                }
 
-                String key;
-                if (prependClassName) {
-                    key = String.format("%s.%s", clazz.getSimpleName(), field.getName());
-                } else {
-                    key = field.getName();
-                }
+        if (objectsToSkip.add(instance)) {
+            Class clazz = instance.getClass();
+            for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
+                Field[] fields = c.getDeclaredFields();
+                for (Field field : fields) {
+                    if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
+                        continue;
+                    }
 
-                boolean accessible = field.isAccessible();
-                if (!accessible) {
-                    field.setAccessible(true);
-                }
-                Object fieldVal = field.get(instance);
-                if (fieldVal == null) {
-                    continue;
-                } else if (fieldVal.getClass().isPrimitive() ||
-                        isWrapperType(fieldVal.getClass())) {
-                    if (toString(fieldVal, false).isEmpty()) continue;
-                    output.put(key, toString(fieldVal, false));
-                } else if (isMapType(fieldVal.getClass())) {
-                    //TODO: check if it makes more sense to just stick to json
-                    // like structure instead of a flatten output.
-                    Map map = (Map) fieldVal;
-                    for (Object entry : map.entrySet()) {
-                        Object mapKey = ((Map.Entry) entry).getKey();
-                        Object mapVal = ((Map.Entry) entry).getValue();
-
-                        String keyStr = getString(mapKey, false);
-                        String valStr = getString(mapVal, false);
-                        if ((valStr == null) || (valStr.isEmpty())) {
-                            continue;
-                        } else {
-                            output.put(String.format("%s.%s", key, keyStr), valStr);
-                        }
+                    String key;
+                    if (prependClassName) {
+                        key = String.format("%s.%s", clazz.getSimpleName(), field.getName());
+                    } else {
+                        key = field.getName();
                     }
-                } else if (isCollectionType(fieldVal.getClass())) {
-                    //TODO check if it makes more sense to just stick to
-                    // json like structure instead of a flatten output.
-                    Collection collection = (Collection) fieldVal;
-                    if (collection.size()==0) continue;
-                    String outStr = "";
-                    for (Object o : collection) {
-                        outStr += getString(o, false) + ",";
+
+                    boolean accessible = field.isAccessible();
+                    if (!accessible) {
+                        field.setAccessible(true);
                     }
-                    if (outStr.length() > 0) {
-                        outStr = outStr.substring(0, outStr.length() - 1);
+                    Object fieldVal = field.get(instance);
+                    if (fieldVal == null) {
+                        continue;
+                    } else if (fieldVal.getClass().isPrimitive() ||
+                            isWrapperType(fieldVal.getClass())) {
+                        if (toString(fieldVal, false).isEmpty()) continue;
+                        output.put(key, toString(fieldVal, false));
+                    } else if (isMapType(fieldVal.getClass())) {
+                        //TODO: check if it makes more sense to just stick to json
+                        // like structure instead of a flatten output.
+                        Map map = (Map) fieldVal;
+                        for (Object entry : map.entrySet()) {
+                            Object mapKey = ((Map.Entry) entry).getKey();
+                            Object mapVal = ((Map.Entry) entry).getValue();
+
+                            String keyStr = getString(mapKey, false, objectsToSkip);
+                            String valStr = getString(mapVal, false, objectsToSkip);
+                            if ((valStr == null) || (valStr.isEmpty())) {
+                                continue;
+                            } else {
+                                output.put(String.format("%s.%s", key, keyStr), valStr);
+                            }
+                        }
+                    } else if (isCollectionType(fieldVal.getClass())) {
+                        //TODO check if it makes more sense to just stick to
+                        // json like structure instead of a flatten output.
+                        Collection collection = (Collection) fieldVal;
+                        if (collection.size() == 0) continue;
+                        String outStr = "";
+                        for (Object o : collection) {
+                            outStr += getString(o, false, objectsToSkip) + ",";
+                        }
+                        if (outStr.length() > 0) {
+                            outStr = outStr.substring(0, outStr.length() - 1);
+                        }
+                        output.put(key, String.format("%s", outStr));
+                    } else {
+                        Map<String, String> nestedFieldValues = getFieldValues(fieldVal, false, objectsToSkip);
+                        for (Map.Entry<String, String> entry : nestedFieldValues.entrySet()) {
+                            output.put(String.format("%s.%s", key, entry.getKey()), entry.getValue());
+                        }
                     }
-                    output.put(key, String.format("%s", outStr));
-                } else {
-                    Map<String, String> nestedFieldValues = getFieldValues(fieldVal, false);
-                    for (Map.Entry<String, String> entry : nestedFieldValues.entrySet()) {
-                        output.put(String.format("%s.%s", key, entry.getKey()), entry.getValue());
+                    if (!accessible) {
+                        field.setAccessible(false);
                     }
                 }
-                if (!accessible) {
-                    field.setAccessible(false);
-                }
             }
         }
         return output;
     }
 
     private static String getString(Object instance,
-                                    boolean wrapWithQuote) throws IllegalAccessException {
+                                    boolean wrapWithQuote,
+                                    Set<Object> objectsToSkip) throws IllegalAccessException {
         if (instance == null) {
             return null;
         } else if (instance.getClass().isPrimitive() || isWrapperType(instance.getClass())) {
             return toString(instance, wrapWithQuote);
         } else {
-            return getString(getFieldValues(instance, false), wrapWithQuote);
+            return getString(getFieldValues(instance, false, objectsToSkip), wrapWithQuote);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2801f0e/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 9424c2d..d620cc6 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
 
 
 ALL CHANGES:
+ATLAS-1089 Storm hook should handle cyclic references in topology object (mneethiraj via sumasai)
 ATLAS-1086 Build failure in hive-bridge after security fixes in ATLAS-762 (sumasai)
 ATLAS-1088 Fix /search api to default to fulltext on dsl failure (sumasai)
 ATLAS-762 Assertion in NegativeSSLAndKerberosTest.testUnsecuredClient needs to be hardened (nixonrodrigues via sumasai)